Skip to content

Commit

Permalink
rewrote generate_summarised_row_dq_res (#66)
Browse files Browse the repository at this point in the history
* rewrote generate_summarised_row_dq_res

* make fmt

* edit test
  • Loading branch information
IMC07 authored Jan 22, 2024
1 parent 1c87e21 commit 373f4ac
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
46 changes: 24 additions & 22 deletions spark_expectations/sinks/utils/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
create_map,
explode,
to_json,
col,
)
from spark_expectations import _log
from spark_expectations.core.exceptions import (
Expand Down Expand Up @@ -440,32 +441,33 @@ def generate_summarised_row_dq_res(self, df: DataFrame, rule_type: str) -> None:
"""
try:
df_exploded = df.select(
explode(f"meta_{rule_type}_results").alias("row_dq_res")
)

def update_dict(accumulator: dict) -> dict: # pragma: no cover
if accumulator.get("failed_row_count") is None: # pragma: no cover
accumulator["failed_row_count"] = str(2) # pragma: no cover
else: # pragma: no cover
accumulator["failed_row_count"] = str( # pragma: no cover
int(accumulator["failed_row_count"]) + 1 # pragma: no cover
) # pragma: no cover

return accumulator # pragma: no cover

summarised_row_dq_dict: Dict[str, Dict[str, str]] = (
df.select(explode(f"meta_{rule_type}_results").alias("row_dq_res"))
.rdd.map(
lambda rule_meta_dict: (
rule_meta_dict[0]["rule"],
{**rule_meta_dict[0], "failed_row_count": 1},
)
)
.reduceByKey(lambda acc, itr: update_dict(acc))
).collectAsMap()
keys = (
df_exploded.select(explode("row_dq_res"))
.select("key")
.distinct()
.rdd.flatMap(lambda x: x)
.collect()
)
nested_keys = [col("row_dq_res").getItem(k).alias(k) for k in keys]

self._context.set_summarised_row_dq_res(
list(summarised_row_dq_dict.values())
df_select = df_exploded.select(*nested_keys)
df_pivot = (
df_select.groupBy(df_select.columns)
.count()
.withColumnRenamed("count", "failed_row_count")
)

keys += ["failed_row_count"]
summarised_row_dq_list = df_pivot.rdd.map(
lambda x: {i: x[i] for i in keys}
).collect()

self._context.set_summarised_row_dq_res(summarised_row_dq_list)

except Exception as e:
raise SparkExpectationsMiscException(
f"error occurred created summarised row dq statistics {e}"
Expand Down
6 changes: 3 additions & 3 deletions tests/sinks/utils/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,8 +635,8 @@ def test_write_error_records_final_dependent(save_df_as_table,
{"meta_row_dq_results": [{"rule": "rule2"}]},
],
[
{"rule": "rule1", "failed_row_count": "2"},
{"rule": "rule2", "failed_row_count": "2"},
{"rule": "rule1", "failed_row_count": 2},
{"rule": "rule2", "failed_row_count": 2},
]
),
(
Expand All @@ -645,7 +645,7 @@ def test_write_error_records_final_dependent(save_df_as_table,
{"meta_row_dq_results": [{"rule": "rule1"}]},
],
[
{"rule": "rule1", "failed_row_count": "2"},
{"rule": "rule1", "failed_row_count": 2},
]
)
])
Expand Down

0 comments on commit 373f4ac

Please sign in to comment.