Name | spark-expectations JSON |
Version |
2.6.0
JSON |
| download |
home_page | None |
Summary | This project helps us to run Data Quality Rules in flight while spark job is being run |
upload_time | 2025-09-04 16:38:09 |
maintainer | None |
docs_url | None |
author | None |
requires_python | <=3.13,>=3.9 |
license | None |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Spark-Expectations
[](https://github.com/Nike-Inc/spark-expectations/actions/workflows/codeql-analysis.yml)
[](https://github.com/Nike-Inc/spark-expectations/actions/workflows/onpush.yml)
[](https://codecov.io/gh/Nike-Inc/spark-expectations)
[](https://github.com/psf/black)
[](http://mypy-lang.org/)
[](https://opensource.org/licenses/Apache-2.0)



<p align="center">
Spark Expectations is a specialized tool designed with the primary goal of maintaining data integrity within your processing pipeline.
By identifying and preventing malformed or incorrect data from reaching the target destination, it ensues that only quality data is
passed through. Any erroneous records are not simply ignored but are filtered into a separate error table, allowing for
detailed analysis and reporting. Additionally, Spark Expectations provides valuable statistical data on the filtered content,
empowering you with insights into your data quality.
</p>
<p align="center">
<img src=https://github.com/Nike-Inc/spark-expectations/blob/main/docs/se_diagrams/logo.png?raw=true width="400" height="400"></p>
---
The documentation for spark-expectations can be found [here](https://engineering.nike.com/spark-expectations/)
### Contributors
Thanks to all the [contributors](https://github.com/Nike-Inc/spark-expectations/blob/main/CONTRIBUTORS.md) who have helped ideate, develop and bring it to its current state
### Contributing
We're delighted that you're interested in contributing to our project! To get started,
please carefully read and follow the guidelines provided in our [contributing](https://github.com/Nike-Inc/spark-expectations/blob/main/CONTRIBUTING.md) document
### Change Log
Most recent updates will be listed in the [Change log](CHANGELOG.md)
# What is Spark Expectations?
#### Spark Expectations is a Data quality framework built in PySpark as a solution for the following problem statements:
1. The existing data quality tools validates the data in a table at rest and provides the success and error metrics. Users need to manually check the metrics to identify the error records
2. The error data is not quarantined to an error table or there are no corrective actions taken to send only the valid data to downstream
3. Users further downstream must consume the same data incorrectly, or they must perform additional calculations to eliminate records that don't comply with the data quality rules.
4. Another process is required as a corrective action to rectify the errors in the data and lot of planning is usually required for this activity
#### Spark Expectations solves these issues using the following principles:
1. All the records which fail one or more data quality rules, are by default quarantined in an _error table along with the metadata on rules that failed, job information etc. This makes it easier for analysts or product teams to view the incorrect data and collaborate with the teams responsible for correcting and reprocessing it.
2. Aggregated metrics are provided for the raw data and the cleansed data for each run along with the required metadata to prevent recalculation or computation.
3. The data that doesn't meet the data quality contract or the standards is not moved to the next level or iterations unless or otherwise specified.
---
# Features Of Spark Expectations
Please find the spark-expectations flow and feature diagrams below
<p align="center">
<img src=https://github.com/Nike-Inc/spark-expectations/blob/main/docs/se_diagrams/flow.png?raw=true width=1000></p>
<p align="center">
<img src=https://github.com/Nike-Inc/spark-expectations/blob/main/docs/se_diagrams/features.png?raw=true width=1000></p>
## Data Quality Rule Types
Spark-Expectations supports three distinct types of data quality rules:
- **Row-Level Data Quality (`row_dq`)**: Checks conditions on individual rows, such as `col1 > 10` or `col2 is not null`.
- **Aggregate Data Quality (`agg_dq`)**: Checks conditions on aggregated values, such as `sum(col3) > 20` or `avg(col1) < 25`.
- **Query-Based Data Quality (`query_dq`)**: Checks conditions using full SQL queries, such as `(select sum(col1) from test_table) > 10`.
Each rule type has its own expectation format and validation logic.
👉 **For detailed documentation and examples, see the [Data Quality Rule Types section](docs/index.md#spark-expectations-data-quality-rule-types).**
## Spark Expectation Observability Feature
This feature enhances data observability by leveraging the *stats detailed table* and *custom query table* to generate a *report table* with key metrics.
### Workflow:
1. *Automatic Trigger: The observability feature is initiated upon the completion of **Spark Expectation*, based on configurable user-defined settings.
2. Extracts relevant data from the *stats detailed table* and *custom query table*, processes it, and creates a *report table* with key insights.
3. Compiles and summarizes essential observability metrics.
4. Delivers an *alert notification* via email using a *Jinja template*.
### Template Customization:
- Users can specify a *custom Jinja template* for formatting the report.
- If no custom template is provided, a *default Jinja template* is used to generate and send the alert.
This feature enables proactive monitoring, providing timely insights to enhance data quality and system reliability.
For more details see the [observability documentation](docs/Observability_examples.md) and the [email alerts documentation](docs/email_notifications.md).
## Sample Alert Notification
Below is an example of the alert email generated by the observability feature:

# Spark - Expectations Setup
### Prerequisites
You can find the developer setup instructions in the [Setup](https://engineering.nike.com/spark-expectations) section of the documentation. This guide will help you set up your development environment and get you started with Spark Expectations.
### Configurations
In order to establish the global configuration parameter for DQ Spark Expectations, you must define and complete the
required fields within a variable. This involves creating a variable and ensuring that all the necessary information
is provided in the appropriate fields.
```python
from spark_expectations.config.user_config import Constants as user_config
se_user_conf = {
user_config.se_notifications_enable_email: False,
user_config.se_notifications_email_smtp_host: "mailhost.nike.com",
user_config.se_notifications_email_smtp_port: 25,
user_config.se_enable_obs_dq_report_result: True,
user_config.se_dq_obs_alert_flag: True,
user_config.se_dq_obs_default_email_template: "",
user_config.se_notifications_email_from: "<sender_email_id>",
user_config.se_notifications_email_to_other_nike_mail_id: "<receiver_email_id's>",
user_config.se_notifications_email_subject: "spark expectations - data quality - notifications",
user_config.se_notifications_enable_slack: True,
user_config.se_notifications_slack_webhook_url: "<slack-webhook-url>",
user_config.se_notifications_on_start: True,
user_config.se_notifications_on_completion: True,
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_rules_action_if_failed_set_ignore: True,
user_config.se_notifications_on_error_drop_threshold: 15,
#Optional
#Below two params are optional and need to be enabled to capture the detailed stats in the <stats_table_name>_detailed.
#user_config.enable_query_dq_detailed_result: True,
#user_config.enable_agg_dq_detailed_result: True,
#Below two params are optional and need to be enabled to pass the custom email body
#user_config.se_notifications_enable_custom_email_body: True,
#user_config.se_notifications_email_custom_body: "'product_id': {}",
#Below two parameters are optional and are for enabling html templates for the custom email body
#user_config.se_notifications_enable_templated_custom_email: True,
#user_config.se_notifications_email_custom_template: "",
#Below parameter is optional and needs to be enabled in case authorization is required to access smtp server.
#user_config.se_notifications_email_smtp_auth: True,
#Below parameter is optional and used to specify environment value.
#user_config.se_dq_rules_params: {"env": "prod/dev/local" }
#Below two parameters are optional and used to enable and specify the default template for basic email notifications.
#user_config.se_notifications_enable_templated_basic_email_body: True
#user_config.se_notifications_default_basic_email_template: ""
}
```
### Spark Expectations Initialization
For all the below examples the below import and SparkExpectations class instantiation is mandatory
1. Instantiate `SparkExpectations` class which has all the required functions for running data quality rules
```python
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.getOrCreate()
writer = WrappedDataFrameWriter().mode("append").format("delta")
# writer = WrappedDataFrameWriter().mode("append").format("iceberg")
# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(
product_id="your_product",
rules_df=spark.table("dq_spark_local.dq_rules"),
stats_table="dq_spark_local.dq_stats",
stats_table_writer=writer,
target_and_error_table_writer=writer,
debugger=False,
# stats_streaming_options={user_config.se_enable_streaming: False},
)
```
2. Decorate the function with `@se.with_expectations` decorator
```python
from spark_expectations.config.user_config import *
from pyspark.sql import DataFrame
import os
@se.with_expectations(
target_table="dq_spark_local.customer_order",
write_to_table=True,
user_conf=se_user_conf,
target_table_view="order",
)
def build_new() -> DataFrame:
# Return the dataframe on which Spark-Expectations needs to be run
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")
return _df_order
```
3. Spark_expectation observability enablement
```python
user_config.se_enable_obs_dq_report_result: True,
user_config.se_dq_obs_alert_flag: True,
user_config.se_dq_obs_default_email_template: "",
#for alert make sure to provide all the details related to SMTP for sending mail, else it will throw an error.
user_config.se_notifications_email_smtp_host: "mailhost.nike.com",
user_config.se_notifications_email_smtp_port: 25,
user_config.se_notifications_smtp_password: "************"
user_config.se_notifications_email_smtp_host: "smtp.se.com"
```
### Adding Certificates
To enable trusted SSL/TLS communication during Spark-Expectations testing, you may need to provide custom Certificate Authority (CA) certificates. Place any required `.crt` files in the `spark_expectations/examples/docker_scripts/certs` directory. During test container startup, all certificates in this folder will be automatically imported into the container’s trusted certificate store, ensuring that your Spark jobs and dependencies can establish secure connections as needed.
Raw data
{
"_id": null,
"home_page": null,
"name": "spark-expectations",
"maintainer": null,
"docs_url": null,
"requires_python": "<=3.13,>=3.9",
"maintainer_email": null,
"keywords": null,
"author": null,
"author_email": "Ashok Singamaneni <ashok.singamaneni@nike.com>",
"download_url": "https://files.pythonhosted.org/packages/c1/1e/813949f531e44389f1c14671a3fc293cd67d2ed67396b8ee1c4c058ac3b0/spark_expectations-2.6.0.tar.gz",
"platform": null,
"description": "# Spark-Expectations\n\n[](https://github.com/Nike-Inc/spark-expectations/actions/workflows/codeql-analysis.yml)\n[](https://github.com/Nike-Inc/spark-expectations/actions/workflows/onpush.yml)\n[](https://codecov.io/gh/Nike-Inc/spark-expectations)\n[](https://github.com/psf/black)\n[](http://mypy-lang.org/)\n[](https://opensource.org/licenses/Apache-2.0)\n\n\n\n\n<p align=\"center\">\nSpark Expectations is a specialized tool designed with the primary goal of maintaining data integrity within your processing pipeline.\nBy identifying and preventing malformed or incorrect data from reaching the target destination, it ensues that only quality data is\npassed through. Any erroneous records are not simply ignored but are filtered into a separate error table, allowing for \ndetailed analysis and reporting. Additionally, Spark Expectations provides valuable statistical data on the filtered content, \nempowering you with insights into your data quality.\n</p>\n\n<p align=\"center\">\n<img src=https://github.com/Nike-Inc/spark-expectations/blob/main/docs/se_diagrams/logo.png?raw=true width=\"400\" height=\"400\"></p>\n\n---\n\nThe documentation for spark-expectations can be found [here](https://engineering.nike.com/spark-expectations/)\n\n### Contributors\n\nThanks to all the [contributors](https://github.com/Nike-Inc/spark-expectations/blob/main/CONTRIBUTORS.md) who have helped ideate, develop and bring it to its current state \n\n### Contributing\n\nWe're delighted that you're interested in contributing to our project! To get started, \nplease carefully read and follow the guidelines provided in our [contributing](https://github.com/Nike-Inc/spark-expectations/blob/main/CONTRIBUTING.md) document\n\n### Change Log\nMost recent updates will be listed in the [Change log](CHANGELOG.md)\n\n# What is Spark Expectations?\n#### Spark Expectations is a Data quality framework built in PySpark as a solution for the following problem statements:\n\n1. The existing data quality tools validates the data in a table at rest and provides the success and error metrics. Users need to manually check the metrics to identify the error records\n2. The error data is not quarantined to an error table or there are no corrective actions taken to send only the valid data to downstream\n3. Users further downstream must consume the same data incorrectly, or they must perform additional calculations to eliminate records that don't comply with the data quality rules.\n4. Another process is required as a corrective action to rectify the errors in the data and lot of planning is usually required for this activity\n\n#### Spark Expectations solves these issues using the following principles:\n\n1. All the records which fail one or more data quality rules, are by default quarantined in an _error table along with the metadata on rules that failed, job information etc. This makes it easier for analysts or product teams to view the incorrect data and collaborate with the teams responsible for correcting and reprocessing it.\n2. Aggregated metrics are provided for the raw data and the cleansed data for each run along with the required metadata to prevent recalculation or computation.\n3. The data that doesn't meet the data quality contract or the standards is not moved to the next level or iterations unless or otherwise specified. \n\n---\n# Features Of Spark Expectations\n\nPlease find the spark-expectations flow and feature diagrams below\n\n<p align=\"center\">\n<img src=https://github.com/Nike-Inc/spark-expectations/blob/main/docs/se_diagrams/flow.png?raw=true width=1000></p>\n\n<p align=\"center\">\n<img src=https://github.com/Nike-Inc/spark-expectations/blob/main/docs/se_diagrams/features.png?raw=true width=1000></p>\n\n## Data Quality Rule Types\n\nSpark-Expectations supports three distinct types of data quality rules:\n\n- **Row-Level Data Quality (`row_dq`)**: Checks conditions on individual rows, such as `col1 > 10` or `col2 is not null`.\n- **Aggregate Data Quality (`agg_dq`)**: Checks conditions on aggregated values, such as `sum(col3) > 20` or `avg(col1) < 25`.\n- **Query-Based Data Quality (`query_dq`)**: Checks conditions using full SQL queries, such as `(select sum(col1) from test_table) > 10`.\n\nEach rule type has its own expectation format and validation logic.\n\n\ud83d\udc49 **For detailed documentation and examples, see the [Data Quality Rule Types section](docs/index.md#spark-expectations-data-quality-rule-types).**\n\n## Spark Expectation Observability Feature\n\nThis feature enhances data observability by leveraging the *stats detailed table* and *custom query table* to generate a *report table* with key metrics. \n\n### Workflow:\n1. *Automatic Trigger: The observability feature is initiated upon the completion of **Spark Expectation*, based on configurable user-defined settings. \n2. Extracts relevant data from the *stats detailed table* and *custom query table*, processes it, and creates a *report table* with key insights. \n3. Compiles and summarizes essential observability metrics. \n4. Delivers an *alert notification* via email using a *Jinja template*. \n\n### Template Customization:\n- Users can specify a *custom Jinja template* for formatting the report. \n- If no custom template is provided, a *default Jinja template* is used to generate and send the alert. \n\nThis feature enables proactive monitoring, providing timely insights to enhance data quality and system reliability.\nFor more details see the [observability documentation](docs/Observability_examples.md) and the [email alerts documentation](docs/email_notifications.md).\n\n## Sample Alert Notification \nBelow is an example of the alert email generated by the observability feature: \n\n\n\n# Spark - Expectations Setup\n\n### Prerequisites\n\nYou can find the developer setup instructions in the [Setup](https://engineering.nike.com/spark-expectations) section of the documentation. This guide will help you set up your development environment and get you started with Spark Expectations.\n\n### Configurations\n\nIn order to establish the global configuration parameter for DQ Spark Expectations, you must define and complete the \nrequired fields within a variable. This involves creating a variable and ensuring that all the necessary information \nis provided in the appropriate fields.\n\n```python\nfrom spark_expectations.config.user_config import Constants as user_config\n\nse_user_conf = {\n user_config.se_notifications_enable_email: False,\n user_config.se_notifications_email_smtp_host: \"mailhost.nike.com\",\n user_config.se_notifications_email_smtp_port: 25,\n user_config.se_enable_obs_dq_report_result: True,\n user_config.se_dq_obs_alert_flag: True,\n user_config.se_dq_obs_default_email_template: \"\",\n user_config.se_notifications_email_from: \"<sender_email_id>\",\n user_config.se_notifications_email_to_other_nike_mail_id: \"<receiver_email_id's>\",\n user_config.se_notifications_email_subject: \"spark expectations - data quality - notifications\", \n user_config.se_notifications_enable_slack: True,\n user_config.se_notifications_slack_webhook_url: \"<slack-webhook-url>\", \n user_config.se_notifications_on_start: True, \n user_config.se_notifications_on_completion: True,\n user_config.se_notifications_on_fail: True,\n user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, \n user_config.se_notifications_on_rules_action_if_failed_set_ignore: True,\n user_config.se_notifications_on_error_drop_threshold: 15,\n #Optional\n #Below two params are optional and need to be enabled to capture the detailed stats in the <stats_table_name>_detailed.\n #user_config.enable_query_dq_detailed_result: True,\n #user_config.enable_agg_dq_detailed_result: True,\n #Below two params are optional and need to be enabled to pass the custom email body\n #user_config.se_notifications_enable_custom_email_body: True,\n #user_config.se_notifications_email_custom_body: \"'product_id': {}\",\n #Below two parameters are optional and are for enabling html templates for the custom email body\n #user_config.se_notifications_enable_templated_custom_email: True,\n #user_config.se_notifications_email_custom_template: \"\",\n #Below parameter is optional and needs to be enabled in case authorization is required to access smtp server.\n #user_config.se_notifications_email_smtp_auth: True,\n #Below parameter is optional and used to specify environment value.\n #user_config.se_dq_rules_params: {\"env\": \"prod/dev/local\" }\n #Below two parameters are optional and used to enable and specify the default template for basic email notifications.\n #user_config.se_notifications_enable_templated_basic_email_body: True\n #user_config.se_notifications_default_basic_email_template: \"\"\n}\n```\n\n### Spark Expectations Initialization \n\nFor all the below examples the below import and SparkExpectations class instantiation is mandatory\n\n1. Instantiate `SparkExpectations` class which has all the required functions for running data quality rules\n\n```python\nfrom spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter\nfrom pyspark.sql import SparkSession\n\nspark: SparkSession = SparkSession.builder.getOrCreate()\nwriter = WrappedDataFrameWriter().mode(\"append\").format(\"delta\")\n# writer = WrappedDataFrameWriter().mode(\"append\").format(\"iceberg\")\n# product_id should match with the \"product_id\" in the rules table\nse: SparkExpectations = SparkExpectations(\n product_id=\"your_product\",\n rules_df=spark.table(\"dq_spark_local.dq_rules\"),\n stats_table=\"dq_spark_local.dq_stats\",\n stats_table_writer=writer,\n target_and_error_table_writer=writer,\n debugger=False,\n # stats_streaming_options={user_config.se_enable_streaming: False},\n)\n```\n\n2. Decorate the function with `@se.with_expectations` decorator\n\n```python\nfrom spark_expectations.config.user_config import *\nfrom pyspark.sql import DataFrame\nimport os\n\n\n@se.with_expectations(\n target_table=\"dq_spark_local.customer_order\",\n write_to_table=True,\n user_conf=se_user_conf,\n target_table_view=\"order\",\n)\ndef build_new() -> DataFrame:\n # Return the dataframe on which Spark-Expectations needs to be run\n _df_order: DataFrame = (\n spark.read.option(\"header\", \"true\")\n .option(\"inferSchema\", \"true\")\n .csv(os.path.join(os.path.dirname(__file__), \"resources/order.csv\"))\n )\n _df_order.createOrReplaceTempView(\"order\")\n\n return _df_order \n```\n\n3. Spark_expectation observability enablement\n\n\n ```python\n user_config.se_enable_obs_dq_report_result: True,\n user_config.se_dq_obs_alert_flag: True,\n user_config.se_dq_obs_default_email_template: \"\",\n #for alert make sure to provide all the details related to SMTP for sending mail, else it will throw an error.\n user_config.se_notifications_email_smtp_host: \"mailhost.nike.com\",\n user_config.se_notifications_email_smtp_port: 25,\n user_config.se_notifications_smtp_password: \"************\"\n user_config.se_notifications_email_smtp_host: \"smtp.se.com\"\n \n\n \n```\n \n### Adding Certificates\n\nTo enable trusted SSL/TLS communication during Spark-Expectations testing, you may need to provide custom Certificate Authority (CA) certificates. Place any required `.crt` files in the `spark_expectations/examples/docker_scripts/certs` directory. During test container startup, all certificates in this folder will be automatically imported into the container\u2019s trusted certificate store, ensuring that your Spark jobs and dependencies can establish secure connections as needed.",
"bugtrack_url": null,
"license": null,
"summary": "This project helps us to run Data Quality Rules in flight while spark job is being run",
"version": "2.6.0",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "3c3d7ef26231c8e73fa0a650a9610b8f878d35f5d320c10d612d6f36e5820f2f",
"md5": "9a97e44c4eea9e2d0a18854a72c77e8e",
"sha256": "4696191ff7ed0fd4e848512cf41f23dd186ee03118e7ddd66df0d9ff419c1a2a"
},
"downloads": -1,
"filename": "spark_expectations-2.6.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "9a97e44c4eea9e2d0a18854a72c77e8e",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<=3.13,>=3.9",
"size": 1039462,
"upload_time": "2025-09-04T16:38:08",
"upload_time_iso_8601": "2025-09-04T16:38:08.117828Z",
"url": "https://files.pythonhosted.org/packages/3c/3d/7ef26231c8e73fa0a650a9610b8f878d35f5d320c10d612d6f36e5820f2f/spark_expectations-2.6.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "c11e813949f531e44389f1c14671a3fc293cd67d2ed67396b8ee1c4c058ac3b0",
"md5": "f6e09d9d66200738b0222b684901dcee",
"sha256": "0d37fdab36166f49fa79344e04de0889d5fcf1a4757ab298d1c09504375bf15b"
},
"downloads": -1,
"filename": "spark_expectations-2.6.0.tar.gz",
"has_sig": false,
"md5_digest": "f6e09d9d66200738b0222b684901dcee",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<=3.13,>=3.9",
"size": 1454736,
"upload_time": "2025-09-04T16:38:09",
"upload_time_iso_8601": "2025-09-04T16:38:09.987884Z",
"url": "https://files.pythonhosted.org/packages/c1/1e/813949f531e44389f1c14671a3fc293cd67d2ed67396b8ee1c4c058ac3b0/spark_expectations-2.6.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-04 16:38:09",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "spark-expectations"
}