Skip to content

Commit

Permalink
Feature aplad 9194 (#18)
Browse files Browse the repository at this point in the history
* added error threshold capturing functionality and fixed  input count issue, target_table_name issue

* added test cases

* corrected docs
  • Loading branch information
Umeshsp22 authored Aug 23, 2023
1 parent 78c4759 commit a13208a
Show file tree
Hide file tree
Showing 11 changed files with 609 additions and 117 deletions.
2 changes: 1 addition & 1 deletion docs/configurations/adoption_versions_comparsion.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Please find the difference in the changes with different version, latest three v
| stats table schema changes | refer rule table creation [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/) | added additional columns <br> 1. `source_query_dq_results` <br> 2. `final_query_dq_results` <br> 3. `row_dq_res_summary` <br> 4. `dq_run_time` <br> 5. `dq_rules` <br><br> renamed columns <br> 1. `runtime` to `meta_dq_run_time` <br> 2. `run_date` to `meta_dq_run_date` <br> 3. `run_id` to `meta_dq_run_id` <br><br> documentation found [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/)| remains same |
| stats table creation required | yes | yes - creation not required if you're upgrading from old version but schema changes required | automated |
| notification config setting | define global notification param, register as env variable and place in the `__init__.py` file for multiple usage, [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | Define a global notification parameter in the `__init__.py` file to be used in multiple instances where the spark_conf parameter needs to be passed within the with_expectations function. [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | remains same |
| secret store and kafka/ nsp authentication details | not applicable | not applicable | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.8.0/examples/) |
| secret store and kafka authentication details | not applicable | not applicable | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.8.0/examples/) |
| spark expectations initialisation | create SparkExpectations class object using the `SparkExpectations` library and by passing the `product_id` | create spark expectations class object using `SpakrExpectations` by passing `product_id` and optional parameter `debugger` [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | create spark expectations class object using `SpakrExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.8.0/examples/) |
| spark expectations decorator | The decorator allows for configuration by passing individual parameters to each decorator. However, registering a DataFrame view within a decorated function is not supported for implementations of query_dq [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | The decorator allows configurations to be logically grouped through a dictionary passed as a parameter to the decorator. Additionally, registering a DataFrame view within a decorated function is supported for implementations of query_dq. [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | remains same |

Expand Down
1 change: 1 addition & 0 deletions docs/getting-started/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ create table if not exists `catalog`.`schema`.`dq_stats` (
source_query_dq_results array<map<string, string>>,
final_query_dq_results array<map<string, string>>,
row_dq_res_summary array<map<string, string>>,
row_dq_error_threshold array<map<string, string>>,
dq_status map<string, string>,
dq_run_time map<string, float>,
dq_rules map<string, map<string,int>>,
Expand Down
25 changes: 17 additions & 8 deletions spark_expectations/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def __post_init__(self) -> None:
}
self._num_dq_rules: int = 0
self._summarised_row_dq_res: Optional[List[Dict[str, str]]] = None
self._rules_error_per: Optional[List[dict]] = None

@property
def get_run_id(self) -> str:
Expand Down Expand Up @@ -570,12 +571,7 @@ def get_input_count(self) -> int:
int: Returns _input_count(int)
"""
if self._input_count:
return self._input_count
raise SparkExpectationsMiscException(
"""The spark expectations context is not set completely, please assign '_input_count' before
accessing it"""
)
return self._input_count

def set_error_count(self, error_count: int = 0) -> None:
self._error_count = error_count
Expand Down Expand Up @@ -730,7 +726,7 @@ def get_error_percentage(self) -> float:
Returns:
float: error percentage
"""
if self._input_count:
if self._input_count > 0:
return round((self.get_error_count / self.get_input_count) * 100, 2)
return 0.0

Expand All @@ -742,7 +738,7 @@ def get_output_percentage(self) -> float:
float: output percentage
"""
if self._input_count:
if self._input_count > 0:
return round((self.get_output_count / self.get_input_count) * 100, 2)
return 0.0

Expand Down Expand Up @@ -1460,3 +1456,16 @@ def get_summarised_row_dq_res(self) -> Optional[List[Dict[str, str]]]:
"""
return self._summarised_row_dq_res

def set_rules_exceeds_threshold(self, rules: Optional[List[dict]] = None) -> None:
"""
This function implements error percentage for each rule type
"""
self._rules_error_per = rules

@property
def get_rules_exceeds_threshold(self) -> Optional[List[dict]]:
"""
This function returns error percentage for each rule
"""
return self._rules_error_per
2 changes: 1 addition & 1 deletion spark_expectations/core/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
_log.info("The function dataframe is getting created")
# _df: DataFrame = func(*args, **kwargs)
_df: DataFrame = func(*args, **kwargs)
table_name: str = expectations.pop("target_table_name")
table_name: str = self._context.get_table_name

_input_count = _df.count()
_output_count: int = 0
Expand Down
72 changes: 67 additions & 5 deletions spark_expectations/sinks/utils/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
when,
array,
to_timestamp,
round,
round as sql_round,
create_map,
explode,
)
Expand Down Expand Up @@ -184,6 +184,7 @@ def write_error_stats(self) -> None:
if final_query_dq_result and len(final_query_dq_result) > 0
else None,
self._context.get_summarised_row_dq_res,
self._context.get_rules_exceeds_threshold,
{
"run_status": self._context.get_dq_run_status,
"source_agg_dq": self._context.get_source_agg_dq_status,
Expand Down Expand Up @@ -266,6 +267,11 @@ def write_error_stats(self) -> None:
ArrayType(MapType(StringType(), StringType())),
True,
),
StructField(
"row_dq_error_threshold",
ArrayType(MapType(StringType(), StringType())),
True,
),
StructField("dq_status", MapType(StringType(), StringType()), True),
StructField(
"dq_run_time", MapType(StringType(), FloatType()), True
Expand All @@ -287,9 +293,9 @@ def write_error_stats(self) -> None:
self._context.print_dataframe_with_debugger(df)

df = (
df.withColumn("output_percentage", round(df.output_percentage, 2))
.withColumn("success_percentage", round(df.success_percentage, 2))
.withColumn("error_percentage", round(df.error_percentage, 2))
df.withColumn("output_percentage", sql_round(df.output_percentage, 2))
.withColumn("success_percentage", sql_round(df.success_percentage, 2))
.withColumn("error_percentage", sql_round(df.error_percentage, 2))
)
_log.info(
"Writing metrics to the stats table: %s, started",
Expand Down Expand Up @@ -456,7 +462,7 @@ def update_dict(accumulator: dict) -> dict: # pragma: no cover
.rdd.map(
lambda rule_meta_dict: (
rule_meta_dict[0]["rule"],
rule_meta_dict[0],
{**rule_meta_dict[0], "failed_row_count": 1},
)
)
.reduceByKey(lambda acc, itr: update_dict(acc))
Expand All @@ -470,3 +476,59 @@ def update_dict(accumulator: dict) -> dict: # pragma: no cover
raise SparkExpectationsMiscException(
f"error occurred created summarised row dq statistics {e}"
)

def generate_rules_exceeds_threshold(self, rules: dict) -> None:
"""
This function implements/supports summarising row dq error threshold
Args:
rules: accepts rule metadata within dict
Returns:
None
"""
try:
error_threshold_list = []
rules_failed_row_count: Dict[str, int] = {}
if self._context.get_summarised_row_dq_res is None:
return None

rules_failed_row_count = {
itr["rule"]: int(itr["failed_row_count"])
for itr in self._context.get_summarised_row_dq_res
}

for rule in rules[f"{self._context.get_row_dq_rule_type_name}_rules"]:
# if (
# not rule["enable_error_drop_alert"]
# or rule["rule"] not in rules_failed_row_count.keys()
# ):
# continue # pragma: no cover

rule_name = rule["rule"]
rule_action = rule["action_if_failed"]
if rule_name in rules_failed_row_count.keys():
failed_row_count = int(rules_failed_row_count[rule_name])
else:
failed_row_count = 0

if failed_row_count is not None and failed_row_count > 0:
error_drop_percentage = round(
(failed_row_count / self._context.get_input_count) * 100, 2
)
error_threshold_list.append(
{
"rule_name": rule_name,
"action_if_failed": rule_action,
"description": rule["description"],
"rule_type": rule["rule_type"],
"error_drop_threshold": str(rule["error_drop_threshold"]),
"error_drop_percentage": str(error_drop_percentage),
}
)

if len(error_threshold_list) > 0:
self._context.set_rules_exceeds_threshold(error_threshold_list)

except Exception as e:
raise SparkExpectationsMiscException(
f"An error occurred while creating error threshold list : {e}"
)
4 changes: 3 additions & 1 deletion spark_expectations/utils/regulate_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ def func_process(
spark_conf,
options_error_table,
)
_notification.notify_rules_exceeds_threshold(expectations)
if _context.get_summarised_row_dq_res:
_notification.notify_rules_exceeds_threshold(expectations)
_writer.generate_rules_exceeds_threshold(expectations)

_context.print_dataframe_with_debugger(_error_df)

Expand Down
57 changes: 50 additions & 7 deletions tests/core/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,13 +567,13 @@ def test_get_table_name_expection():
context.get_table_name


def test_get_input_count():
context = SparkExpectationsContext(product_id="product1")
context._input_count = 0
with pytest.raises(SparkExpectationsMiscException,
match="The spark expectations context is not set completely, please assign "
"'_input_count' before \n accessing it"):
context.get_input_count
# def test_get_input_count():
# context = SparkExpectationsContext(product_id="product1")
# context._input_count = 0
# with pytest.raises(SparkExpectationsMiscException,
# match="The spark expectations context is not set completely, please assign "
# "'_input_count' before \n accessing it"):
# context.get_input_count


# def test_get_nsp_stats_topic_name_exception():
Expand Down Expand Up @@ -1311,3 +1311,46 @@ def test_get_se_streaming_stats_topic_name_exception():
'_se_streaming_stats_topic_name' before
accessing it"""):
context.get_se_streaming_stats_topic_name

def test_set_rules_exceeds_threshold():
context = SparkExpectationsContext(product_id="product1")
context.set_rules_exceeds_threshold([
{
"rule_name": 'rule_1',
"action_if_failed": 'ignore',
"description": 'description1',
"rule_type": 'row_dq',
"error_drop_threshold": '10',
"error_drop_percentage": '10.0',
}
])
assert context.get_rules_exceeds_threshold == [{
"rule_name": 'rule_1',
"action_if_failed": 'ignore',
"description": 'description1',
"rule_type": 'row_dq',
"error_drop_threshold": '10',
"error_drop_percentage": '10.0',
}]

def test_get_rules_exceds_threshold():
context = SparkExpectationsContext(product_id="product1")
context._rules_error_per=[
{
"rule_name": 'rule_1',
"action_if_failed": 'ignore',
"description": 'description1',
"rule_type": 'row_dq',
"error_drop_threshold": '10',
"error_drop_percentage": '10.0',
}
]

assert context.get_rules_exceeds_threshold == [{
"rule_name": 'rule_1',
"action_if_failed": 'ignore',
"description": 'description1',
"rule_type": 'row_dq',
"error_drop_threshold": '10',
"error_drop_percentage": '10.0',
}]
Loading

0 comments on commit a13208a

Please sign in to comment.