Skip to content

Commit

Permalink
Feature remove mandatory args (#40)
Browse files Browse the repository at this point in the history
* making changes and refactoring the code. 

* Adding Wrapped DataFrameWriter and unitests for the same

* ignoring agg_dq and query_dq

* Rearranging lot of code, removed delta. Enabled custom writers in different formats

* fixing tests for the sinks and updating README

* fixing kafka configurations

* Feature add reader

* changing rules table along with few other changes

* reader and writer modifications

---------

Co-authored-by: phanikumarvemuri <[email protected]>

* Updating formatting, removing delta as dependency

* Adding examples for expectations

* adding documentation

Co-authored-by: phanikumarvemuri <[email protected]>

* Updating tests

* Updating documentation

* Updating WrappedDataframeWriter

---------

Co-authored-by: phanikumarvemuri <[email protected]>
  • Loading branch information
asingamaneni and phanikumarvemuri authored Sep 28, 2023
1 parent 1bcce5a commit ca1ceb5
Show file tree
Hide file tree
Showing 51 changed files with 2,984 additions and 3,338 deletions.
3 changes: 2 additions & 1 deletion CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
Thanks to the contributors who helped on this project apart from the authors
* [Teja Dogiparthi](https://github.com/Tejadogiparthi)
* [Phani Kumar Vemuri](https://www.linkedin.com/in/vemuriphani/)
* [Sarath Chandra Bandaru](https://www.linkedin.com/in/sarath-chandra-bandaru/)
* [Holden Karau](https://www.linkedin.com/in/holdenkarau/)

# Honorary Mentions
Thanks to the team below for invaluable insights and support throughout the initial release of this project
Expand All @@ -15,4 +17,3 @@ Thanks to the team below for invaluable insights and support throughout the init
* [Aditya Chaturvedi](https://www.linkedin.com/in/chaturvediaditya/)
* [Scott Haines](https://www.linkedin.com/in/scotthaines/)
* [Arijit Banerjee](https://www.linkedin.com/in/massborn/)
* [Sarath Chandra Bandaru](https://www.linkedin.com/in/sarath-chandra-bandaru/)
78 changes: 31 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
[![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
![PYPI version](https://img.shields.io/pypi/v/spark-expectations.svg)

![PYPI - Downloads](https://static.pepy.tech/badge/spark-expectations)
![PYPI - Python Version](https://img.shields.io/pypi/pyversions/spark-expectations.svg)

<p align="center">
Spark Expectations is a specialized tool designed with the primary goal of maintaining data integrity within your processing pipeline.
Expand Down Expand Up @@ -70,7 +71,7 @@ is provided in the appropriate fields.
```python
from spark_expectations.config.user_config import *

se_global_spark_Conf = {
se_user_conf = {
se_notifications_enable_email: False,
se_notifications_email_smtp_host: "mailhost.nike.com",
se_notifications_email_smtp_port: 25,
Expand All @@ -91,66 +92,49 @@ se_global_spark_Conf = {

For all the below examples the below import and SparkExpectations class instantiation is mandatory

```python
from spark_expectations.core.expectations import SparkExpectations
1. Instantiate `SparkExpectations` class which has all the required functions for running data quality rules

```python
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
from pyspark.sql import SparkSession

spark: SparkSession = SparkSession.builder.getOrCreate()
writer = WrappedDataFrameWriter().mode("append").format("delta")
# writer = WrappedDataFrameWriter().mode("append").format("iceberg")
# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(product_id="your-products-id")
se: SparkExpectations = SparkExpectations(
product_id="your_product",
rules_df=spark.table("dq_spark_local.dq_rules"),
stats_table="dq_spark_local.dq_stats",
stats_table_writer=writer,
target_and_error_table_writer=writer,
debugger=False,
# stats_streaming_options={user_config.se_enable_streaming: False},
)
```

1. Instantiate `SparkExpectations` class which has all the required functions for running data quality rules

2. Decorate the function with `@se.with_expectations` decorator

```python
from spark_expectations.config.user_config import *


@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
table_name="pilot_nonpub.dq_employee.employee",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
write_to_table=True,
write_to_temp_table=True,
row_dq=True,
agg_dq={
se_agg_dq: True,
se_source_agg_dq: True,
se_final_agg_dq: True,
},
query_dq={
se_query_dq: True,
se_source_query_dq: True,
se_final_query_dq: True,
se_target_table_view: "order",
},
spark_conf=se_global_spark_Conf,
from spark_expectations.config.user_config import *
from pyspark.sql import DataFrame
import os


@se.with_expectations(
target_table="dq_spark_local.customer_order",
write_to_table=True,
user_conf=se_user_conf,
target_table_view="order",
)
def build_new() -> DataFrame:
# Return the dataframe on which Spark-Expectations needs to be run
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")

_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")

_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)

_df_customer.createOrReplaceTempView("customer")
_df_order.createOrReplaceTempView("order")

return _df_order
```
11 changes: 0 additions & 11 deletions docs/api/delta_sink_plugin.md

This file was deleted.

6 changes: 1 addition & 5 deletions docs/api/sample_dq.md → docs/api/sample_dq_bigquery.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
---
search:
exclude: true
---

::: spark_expectations.examples.sample_dq
::: spark_expectations.examples.sample_dq_bigquery
handler: python
options:
filters:
Expand Down
8 changes: 8 additions & 0 deletions docs/api/sample_dq_delta.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

::: spark_expectations.examples.sample_dq_delta
handler: python
options:
filters:
- "!^_[^_]"
- "!^__[^__]"

8 changes: 8 additions & 0 deletions docs/api/sample_dq_iceberg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

::: spark_expectations.examples.sample_dq_iceberg
handler: python
options:
filters:
- "!^_[^_]"
- "!^__[^__]"

92 changes: 92 additions & 0 deletions docs/bigquery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
### Example - Write to Delta

Setup SparkSession for bigquery to test in your local environment. Configure accordingly for higher environments.
Refer to Examples in [base_setup.py](../spark_expectations/examples/base_setup.py) and
[delta.py](../spark_expectations/examples/sample_dq_bigquery.py)

```python title="spark_session"
from pyspark.sql import SparkSession

builder = (
SparkSession.builder.config(
"spark.jars.packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.30.0",
)
)
spark = builder.getOrCreate()

spark._jsc.hadoopConfiguration().set(
"fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
)
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "<temp_dataset>")
```

Below is the configuration that can be used to run SparkExpectations and write to DeltaLake

```python title="iceberg_write"
import os
from pyspark.sql import DataFrame
from spark_expectations.core.expectations import (
SparkExpectations,
WrappedDataFrameWriter,
)
from spark_expectations.config.user_config import Constants as user_config

os.environ[
"GOOGLE_APPLICATION_CREDENTIALS"
] = "path_to_your_json_credential_file" # This is needed for spark write to bigquery
writer = (
WrappedDataFrameWriter().mode("overwrite")
.format("bigquery")
.option("createDisposition", "CREATE_IF_NEEDED")
.option("writeMethod", "direct")
)

se: SparkExpectations = SparkExpectations(
product_id="your_product",
rules_df=spark.read.format("bigquery").load(
"<project_id>.<dataset_id>.<rules_table>"
),
stats_table="<project_id>.<dataset_id>.<stats_table>",
stats_table_writer=writer,
target_and_error_table_writer=writer,
debugger=False,
stats_streaming_options={user_config.se_enable_streaming: False}
)


# Commented fields are optional or required when notifications are enabled
user_conf = {
user_config.se_notifications_enable_email: False,
# user_config.se_notifications_email_smtp_host: "mailhost.com",
# user_config.se_notifications_email_smtp_port: 25,
# user_config.se_notifications_email_from: "",
# user_config.se_notifications_email_to_other_mail_id: "",
# user_config.se_notifications_email_subject: "spark expectations - data quality - notifications",
user_config.se_notifications_enable_slack: False,
# user_config.se_notifications_slack_webhook_url: "",
# user_config.se_notifications_on_start: True,
# user_config.se_notifications_on_completion: True,
# user_config.se_notifications_on_fail: True,
# user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
# user_config.se_notifications_on_error_drop_threshold: 15,
}


@se.with_expectations(
target_table="<project_id>.<dataset_id>.<target_table_name>",
write_to_table=True,
user_conf=user_conf,
target_table_view="<project_id>.<dataset_id>.<target_table_view_name>",
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")

return _df_order
```
21 changes: 11 additions & 10 deletions docs/configurations/adoption_versions_comparsion.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ Please find the difference in the changes with different version, latest three v



| stage | 0.6.0 | 0.7.0 | 0.8.0 |
| :------------------| :----------- | :----- | ------------------ |
| rules table schema changes | refer rule table creation [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/) | added three additional column <br> 1.`enable_for_source_dq_validation(boolean)` <br> 2.`enable_for_target_dq_validation(boolean)` <br> 3.`is_active(boolean)` <br> <br> documentation found [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/) | added additional two column <br> 1.`enable_error_drop_alert(boolean)` <br> 2.`error_drop_thresholdt(int)` <br><br> documentation found [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.8.0/getting-started/setup/)|
| rule table creation required | yes | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required |
| stats table schema changes | refer rule table creation [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/) | added additional columns <br> 1. `source_query_dq_results` <br> 2. `final_query_dq_results` <br> 3. `row_dq_res_summary` <br> 4. `dq_run_time` <br> 5. `dq_rules` <br><br> renamed columns <br> 1. `runtime` to `meta_dq_run_time` <br> 2. `run_date` to `meta_dq_run_date` <br> 3. `run_id` to `meta_dq_run_id` <br><br> documentation found [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/)| remains same |
| stats table creation required | yes | yes - creation not required if you're upgrading from old version but schema changes required | automated |
| notification config setting | define global notification param, register as env variable and place in the `__init__.py` file for multiple usage, [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | Define a global notification parameter in the `__init__.py` file to be used in multiple instances where the spark_conf parameter needs to be passed within the with_expectations function. [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | remains same |
| secret store and kafka authentication details | not applicable | not applicable | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.8.0/examples/) |
| spark expectations initialisation | create SparkExpectations class object using the `SparkExpectations` library and by passing the `product_id` | create spark expectations class object using `SpakrExpectations` by passing `product_id` and optional parameter `debugger` [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | create spark expectations class object using `SpakrExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.8.0/examples/) |
| spark expectations decorator | The decorator allows for configuration by passing individual parameters to each decorator. However, registering a DataFrame view within a decorated function is not supported for implementations of query_dq [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | The decorator allows configurations to be logically grouped through a dictionary passed as a parameter to the decorator. Additionally, registering a DataFrame view within a decorated function is supported for implementations of query_dq. [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | remains same |
| stage | 0.8.0 | 1.0.0 |
|:----------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|
| rules table schema changes | added additional two column <br> 1.`enable_error_drop_alert(boolean)` <br> 2.`error_drop_thresholdt(int)` <br><br> documentation found [here](https://engineering.nike.com/spark-expectations/0.8.1/getting-started/setup/) | Remains same |
| rule table creation required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required |
| stats table schema changes | remains same | Remains Same |
| stats table creation required | automated | Remains Same |
| notification config setting | remains same | Remains Same |
| secret store and kafka authentication details | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | Remains Same. You can disable streaming if needed, in SparkExpectations class |
| spark expectations initialisation | create spark expectations class object using `SpakrExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) |
| with_expectations decorator | remains same | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) |
| WrappedDataFrameWriter | Doesn't exist | This is new and users need to provider the writer object to record the spark conf that need to be used while writing - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) |



Loading

0 comments on commit ca1ceb5

Please sign in to comment.