Skip to content

Commit

Permalink
Parameterized Rules (#76)
Browse files Browse the repository at this point in the history
* rules updates dynamically

* Update test_expectations.py

* Update test_context.py

* Update reader.py

* Update test_expectations.py

* dq rules parameterized and update documentation

---------

Co-authored-by: krishnam Jagadapi <[email protected]>
  • Loading branch information
jskrajareddy21 and krishnam Jagadapi authored Mar 13, 2024
1 parent 7bf3247 commit a725aaa
Show file tree
Hide file tree
Showing 20 changed files with 257 additions and 52 deletions.
2 changes: 2 additions & 0 deletions docs/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ user_conf = {
# user_config.se_notifications_on_fail: True,
# user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
# user_config.se_notifications_on_error_drop_threshold: 15,
# user_config.se_enable_error_table: True,
# user_config.se_dq_rules_params: { "env": "local", "table": "product", },
}


Expand Down
6 changes: 3 additions & 3 deletions docs/configurations/configure_rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ false, false, true)
```


Please set up rules for checking the quality of the columns in the artificially order table, using the specified format
Please set up rules for checking the quality of the columns in the artificial order table, using the specified format

```sql
insert into `catalog`.`schema`.`{product}_rules` (product_id, table_name, rule_type, rule, column_name, expectation,
Expand Down Expand Up @@ -79,15 +79,15 @@ action_if_failed, tag, description) values
--The query dq rule is established to check product_id difference between two table if difference is more than 20%
--from source table, the metadata of the rule will be captured in the statistics table as "action_if_failed" is "ignore"
,('apla_nd', '`catalog`.`schema`.customer_order', 'query_dq', 'product_missing_count_threshold', '*',
'((select count(distinct product_id) from product) - (select count(distinct product_id) from order))>
'((select count(distinct product_id) from {table}) - (select count(distinct product_id) from order))>
(select count(distinct product_id) from product)*0.2', 'ignore', 'validity', 'row count threshold difference must
be less than 20%', true, true, true)

--The query dq rule is established to check distinct product_id in the product table is less than 5, if not the
--metadata of the rule will be captured in the statistics table along with fails the job as "action_if_failed" is
--"fail" and enabled for source dataset
,('apla_nd', '`catalog`.`schema`.customer_order', 'query_dq', 'product_category', '*', '(select count(distinct category)
from product) < 5', 'fail', 'validity', 'distinct product category must be less than 5', true, False, true)
from {table}) < 5', 'fail', 'validity', 'distinct product category must be less than 5', true, False, true)

--The query dq rule is established to check count of the dataset should be less than 10000 other wise the metadata
--of the rule will be captured in the statistics table as "action_if_failed" is "ignore" and enabled only for target dataset
Expand Down
4 changes: 2 additions & 2 deletions docs/configurations/databricks_setup_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ This section provides instructions on how to set up a sample notebook in the Dat
#### Prerequisite:

1. Recommended Databricks run time environment for better experience - DBS 11.0 and above
3. Please install the Kafka jar using the path `dbfs:/kafka-jars/databricks-shaded-strimzi-kafka-oauth-client-1.1.jar`, If the jar is not available in the dbfs location, please raise a ticket with GAP Support team to add the jar to your workspace
2. Please follow the steps provided [here](TODO) to integrate and clone repo from git Databricks
2. Please install the Kafka jar using the path `dbfs:/kafka-jars/databricks-shaded-strimzi-kafka-oauth-client-1.1.jar`, If the jar is not available in the dbfs location, please raise a ticket with Platform team to add the jar to your workspace
3. Please follow the steps provided [here](TODO) to integrate and clone repo from git Databricks
4. Please follow the steps to create the webhook-hook URL for team-specific channel [here](TODO)
22 changes: 11 additions & 11 deletions docs/configurations/migration_versions_comparison.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ Please find the difference in the changes with different version, latest three v



| stage | 0.8.0 | 1.0.0 | 1.2.0 |
|:----------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------|
| rules table schema changes | added additional two column <br> 1.`enable_error_drop_alert(boolean)` <br> 2.`error_drop_threshold(int)` <br><br> documentation found [here](https://engineering.nike.com/spark-expectations/0.8.1/getting-started/setup/) | Remains same | Remains same |
| rule table creation required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required |
| stats table schema changes | remains same | Remains Same | Remains same. Additionally all row dq rules stats get in row dq rules summary |
| stats table creation required | automated | Remains Same | Remains same |
| notification config setting | remains same | Remains Same | Remains same |
| secret store and Kafka authentication details | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | Remains Same. You can disable streaming if needed, in SparkExpectations class | Remains same |
| spark expectations initialization | create spark expectations class object using `SparkExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
| with_expectations decorator | remains same | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
| WrappedDataFrameWriter | Doesn't exist | This is new and users need to provider the writer object to record the spark conf that need to be used while writing - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
| stage | 0.8.0 | 1.0.0 | 1.2.0 |
|:----------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------|
| rules table schema changes | added additional two column <br> 1.`enable_error_drop_alert(boolean)` <br> 2.`error_drop_threshold(int)` <br><br> documentation found [here](https://engineering.nike.com/spark-expectations/0.8.1/getting-started/setup/) | Remains same | Remains same |
| rule table creation required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required | Remains same. Additionally dq rules dynamically updates based on the parameter passed from externally |
| stats table schema changes | remains same | Remains Same | Remains same. Additionally all row dq rules stats get in row dq rules summary |
| stats table creation required | automated | Remains Same | Remains same |
| notification config setting | remains same | Remains Same | Remains same |
| secret store and Kafka authentication details | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | Remains Same. You can disable streaming if needed, in SparkExpectations class | Remains same |
| spark expectations initialization | create spark expectations class object using `SparkExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
| with_expectations decorator | remains same | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
| WrappedDataFrameWriter | Doesn't exist | This is new and users need to provider the writer object to record the spark conf that need to be used while writing - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |



2 changes: 2 additions & 0 deletions docs/delta.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ user_conf = {
# user_config.se_notifications_on_fail: True,
# user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
# user_config.se_notifications_on_error_drop_threshold: 15,
# user_config.se_enable_error_table: True,
# user_config.se_dq_rules_params: { "env": "local", "table": "product", },
}


Expand Down
7 changes: 6 additions & 1 deletion docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ se_user_conf = {
user_config.se_notifications_on_fail: True, # (11)!
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, # (12)!
user_config.se_notifications_on_error_drop_threshold: 15, # (13)!
user_config.se_enable_error_table: True, # (14)!
user_config.se_enable_error_table: True, # (14)!
user_config.se_dq_rules_params: {
"env": "local",
"table": "product",
}, # (15)!
}
}
```
Expand All @@ -39,6 +43,7 @@ se_user_conf = {
12. When `user_config.se_notifications_on_error_drop_exceeds_threshold_breach` parameter set to `True` enables notification when error threshold reaches above the configured value
13. The `user_config.se_notifications_on_error_drop_threshold` parameter captures error drop threshold value
14. The `user_config.se_enable_error_table` parameter, which controls whether error data to load into error table, is set to true by default
15. The `user_config.se_dq_rules_params` parameter, which are required to dynamically update dq rules

### Spark Expectations Initialization

Expand Down
2 changes: 2 additions & 0 deletions docs/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ user_conf = {
# user_config.se_notifications_on_fail: True,
# user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
# user_config.se_notifications_on_error_drop_threshold: 15,
# user_config.se_enable_error_table: True,
# user_config.se_dq_rules_params: { "env": "local", "table": "product", },
}


Expand Down
1 change: 1 addition & 0 deletions spark_expectations/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Constants:

se_enable_streaming = "se.enable.streaming"
se_enable_error_table = "se.enable.error.table"
se_dq_rules_params = "se.dq.rules.params"

secret_type = "se.streaming.secret.type"

Expand Down
25 changes: 23 additions & 2 deletions spark_expectations/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __post_init__(self) -> None:
self._dq_run_status: str = "Failed"
self._dq_expectations: Optional[Dict[str, str]] = None
self._se_enable_error_table: bool = True
self._dq_rules_params: Dict[str, str] = {}

# above configuration variable value has to be set to python
self._dq_project_env_name = "spark_expectations"
Expand Down Expand Up @@ -1566,7 +1567,7 @@ def get_stats_table_writer_config(self) -> dict:
"""
return self._stats_table_writer_config

def set_se_enable_error_table(self, se_enable_error_table: bool) -> None:
def set_se_enable_error_table(self, _enable_error_table: bool) -> None:
"""
Args:
Expand All @@ -1575,7 +1576,7 @@ def set_se_enable_error_table(self, se_enable_error_table: bool) -> None:
Returns:
"""
self._se_enable_error_table = se_enable_error_table
self._se_enable_error_table = _enable_error_table

@property
def get_se_enable_error_table(self) -> bool:
Expand All @@ -1585,3 +1586,23 @@ def get_se_enable_error_table(self) -> bool:
"""
return self._se_enable_error_table

def set_dq_rules_params(self, _dq_rules_params: dict) -> None:
"""
This function set params for dq rules
Args:
_se_dq_rules_params:
Returns:
"""
self._dq_rules_params = _dq_rules_params

@property
def get_dq_rules_params(self) -> dict:
"""
This function returns params which are mapping in dq rules
Returns: _dq_rules_params(dict)
"""
return self._dq_rules_params
Loading

0 comments on commit a725aaa

Please sign in to comment.