From 70070d0474ba68c52a156fc0ff42997e24b40c4d Mon Sep 17 00:00:00 2001 From: sudeep7978 <111543327+sudeep7978@users.noreply.github.com> Date: Mon, 18 Nov 2024 01:48:07 +0530 Subject: [PATCH] Feature: Added new column to stats_detailed table for enhanced observability. --- tests/utils/test_actions.py | 573 +++++++++++++++++++----------------- 1 file changed, 300 insertions(+), 273 deletions(-) diff --git a/tests/utils/test_actions.py b/tests/utils/test_actions.py index 5ae34efa..79ab1000 100644 --- a/tests/utils/test_actions.py +++ b/tests/utils/test_actions.py @@ -1,3 +1,4 @@ + from unittest.mock import Mock from unittest.mock import patch @@ -13,6 +14,7 @@ spark = get_spark_session() + @pytest.fixture(name="_fixture_df") def fixture_df(): # Create a sample input dataframe @@ -23,7 +25,7 @@ def fixture_df(): {"row_id": 2, "col1": 3, "col2": "c"}, ] ) - + return _fixture_df @@ -32,7 +34,7 @@ def fixture_mock_context(): # fixture for mock context mock_object = Mock(spec=SparkExpectationsContext) mock_object.product_id = "product1" - mock_object.spark=spark + mock_object.spark = spark mock_object.get_row_dq_rule_type_name = "row_dq" mock_object.get_agg_dq_rule_type_name = "agg_dq" mock_object.get_query_dq_rule_type_name = "query_dq" @@ -40,21 +42,22 @@ def fixture_mock_context(): mock_object.get_query_dq_detailed_stats_status = True mock_object.get_querydq_secondary_queries = { - 'product_1|test_table|table_row_count_gt_1' : + 'product_1|test_table|table_row_count_gt_1': - { - 'source_f1': 'select count(*) from query_test_table','target_f1': 'select count(*) from query_test_table_target' - }, - - 'product_1|test_table|table_distinct_count' : - - { - 'source_f1': 'select distinct col1, col2 from query_test_table','target_f1': 'elect distinct col1, col2 from query_test_table_target' - } + { + 'source_f1': 'select count(*) from query_test_table', + 'target_f1': 'select count(*) from query_test_table_target' + }, + 'product_1|test_table|table_distinct_count': + { + 'source_f1': 'select distinct col1, col2 from query_test_table', + 'target_f1': 'elect distinct col1, col2 from query_test_table_target' } + } + mock_object.get_supported_df_query_dq = spark.createDataFrame( [ { @@ -70,7 +73,7 @@ def fixture_mock_context_without_detailed_stats(): # fixture for mock context without_detailed_stats mock_object = Mock(spec=SparkExpectationsContext) mock_object.product_id = "product1" - mock_object.spark=spark + mock_object.spark = spark mock_object.get_row_dq_rule_type_name = "row_dq" mock_object.get_agg_dq_rule_type_name = "agg_dq" mock_object.get_query_dq_rule_type_name = "query_dq" @@ -86,57 +89,59 @@ def fixture_mock_context_without_detailed_stats(): return mock_object - @pytest.fixture(name="_fixture_agg_dq_rule") def fixture_agg_dq_rule(): # Define the expectations for the data quality rules return { - "rule_type": "agg_dq", - "rule": "col1_sum_gt_eq_6", - "expectation": "sum(col1)>=6", - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_source_dq_validation": True, - "description": "col1 sum gt 1", - "product_id": "product_1" - } + "rule_type": "agg_dq", + "rule": "col1_sum_gt_eq_6", + "column_name": "col1", + "expectation": "sum(col1)>=6", + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_source_dq_validation": True, + "description": "col1 sum gt 1", + "product_id": "product_1" + } @pytest.fixture(name="_fixture_agg_dq_rule_type_range") def _fixture_agg_dq_rule_type_range(): # Define the expectations for the data quality rules - return { - "rule_type": "agg_dq", - "rule": "col1_sum_gt_6_and_lt_10", - "expectation": "sum(col1)>6 and sum(col1)<10", - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_source_dq_validation": True, - "description": "sum of col1 is greater than 6 and sum of col1 is less than 10", - "product_id": "product_1" - } - + return { + "rule_type": "agg_dq", + "rule": "col1_sum_gt_6_and_lt_10", + "column_name": "col1", + "expectation": "sum(col1)>6 and sum(col1)<10", + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_source_dq_validation": True, + "description": "sum of col1 is greater than 6 and sum of col1 is less than 10", + "product_id": "product_1" + } + @pytest.fixture(name="_fixture_query_dq_rule") def fixture_query_dq_rule(): # Define the expectations for the data quality rules return { - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_row_count_gt_1", - "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1", - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_target_dq_validation": True, - "enable_for_source_dq_validation": True, - "enable_querydq_custom_output": True, - "expectation_source_f1": "select count(*) from query_test_table", - "expectation_target_f1": "select count(*) from query_test_table_target", - "description": "table count should be greater than 1" - } + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_row_count_gt_1", + "column_name": "col1", + "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1", + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_target_dq_validation": True, + "enable_for_source_dq_validation": True, + "enable_querydq_custom_output": True, + "expectation_source_f1": "select count(*) from query_test_table", + "expectation_target_f1": "select count(*) from query_test_table_target", + "description": "table count should be greater than 1" + } @pytest.fixture(name="_fixture_expectations") @@ -148,6 +153,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "row_dq", "rule": "col1_gt_eq_1", + "column_name": "col1", "expectation": "col1 >=1", "action_if_failed": "ignore", "table_name": "test_table", @@ -158,6 +164,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "row_dq", "rule": "col1_gt_eq_2", + "column_name": "col1", "expectation": "col1 >= 2", "action_if_failed": "drop", "table_name": "test_table", @@ -168,6 +175,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "row_dq", "rule": "col1_gt_eq_3", + "column_name": "col1", "expectation": "col1 >= 3", "action_if_failed": "fail", "table_name": "test_table", @@ -180,6 +188,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "agg_dq", "rule": "col1_sum_gt_eq_6", + "column_name": "col1", "expectation": "sum(col1)>=6", "action_if_failed": "ignore", "table_name": "test_table", @@ -192,6 +201,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "agg_dq", "rule": "col2_unique_value_gt_3", + "column_name": "col1", "expectation": "count(distinct col2)>3", "action_if_failed": "fail", "table_name": "test_table", @@ -204,6 +214,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "agg_dq", "rule": "col1_sum_gt_6_and_lt_10", + "column_name": "col1", "expectation": "sum(col1)>6 and sum(col1)<10", "action_if_failed": "fail", "table_name": "test_table", @@ -218,6 +229,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "query_dq", "rule": "table_row_count_gt_1", + "column_name": "col1", "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1", "enable_querydq_custom_output": True, "action_if_failed": "ignore", @@ -233,6 +245,7 @@ def fixture_expectations(): "product_id": "product_1", "rule_type": "query_dq", "rule": "table_distinct_count", + "column_name": "col1", "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>3", "enable_querydq_custom_output": False, "action_if_failed": "fail", @@ -253,66 +266,67 @@ def fixture_expectations(): def fixture_agg_dq_detailed_expected_result(): # define the expected result for row dq operations return { - "result": + "result": { "product_id": "product_1", "table_name": "test_table", "rule_type": "agg_dq", "rule": "col1_sum_gt_eq_6", + "column_name": "col1", "expectation": "sum(col1)>=6", "tag": "validity", "status": "pass", "description": "col1 sum gt 1", - "actual_value" : 6, - "expected_value" : '>=6' - + "actual_value": 6, + "expected_value": '>=6' + }, - "result_query_dq": + "result_query_dq": { "product_id": "product_1", "table_name": "test_table", "rule_type": "query_dq", "rule": "table_row_count_gt_1", + "column_name": "col1", "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>1", "tag": "validity", "status": "fail", "description": "table count should be greater than 1", - "actual_value" : 0, - "expected_value" : '>1' - + "actual_value": 0, + "expected_value": '>1' + }, - "result_without_context": + "result_without_context": { "product_id": "product_1", "table_name": "test_table", "rule_type": "agg_dq", "rule": "col1_sum_gt_eq_6", + "column_name": "col1", "expectation": "sum(col1)>=6", "tag": "validity", "status": None, "description": "col1 sum gt 1", - "actual_value" : None, - "expected_value" : None - + "actual_value": None, + "expected_value": None + }, - "result_without_context1": + "result_without_context1": { "product_id": "product_1", "table_name": "test_table", "rule_type": "agg_dq", - "rule":"col1_sum_gt_6_and_lt_10", + "rule": "col1_sum_gt_6_and_lt_10", + "column_name": "col1", "expectation": "sum(col1)>6 and sum(col1)<10", "tag": "validity", "status": "fail", "description": "sum of col1 is greater than 6 and sum of col1 is less than 10", - "actual_value" : 6, - "expected_value" : '6>6 and 6<10' - - } - } - - + "actual_value": 6, + "expected_value": '6>6 and 6<10' + } + } @pytest.fixture(name="_fixture_row_dq_expected_result") @@ -369,13 +383,13 @@ def fixture_agg_dq_expected_result(): "tag": "accuracy", "description": "col2 unique value grater than 3" }, - { - "rule_type": "agg_dq", - "rule": "col1_sum_gt_6_and_lt_10", - "action_if_failed": "fail", - "tag": "accuracy", - "description": "sum of col1 value grater than 6 and less than 10" - } + { + "rule_type": "agg_dq", + "rule": "col1_sum_gt_6_and_lt_10", + "action_if_failed": "fail", + "tag": "accuracy", + "description": "sum of col1 value grater than 6 and less than 10" + } ] } @@ -386,22 +400,22 @@ def fixture_query_dq_expected_result(): return { "result": [ - { - 'rule': 'table_row_count_gt_1', - 'description': 'table count should be greater than 1', - 'rule_type': 'query_dq', - 'tag': 'validity', - 'action_if_failed': 'ignore' - }, - { - 'rule': 'table_distinct_count', - 'description': 'table distinct row count should be greater than 3', - 'rule_type': 'query_dq', - 'tag': 'accuracy', - 'action_if_failed': 'fail' - } - ] - } + { + 'rule': 'table_row_count_gt_1', + 'description': 'table count should be greater than 1', + 'rule_type': 'query_dq', + 'tag': 'validity', + 'action_if_failed': 'ignore' + }, + { + 'rule': 'table_distinct_count', + 'description': 'table distinct row count should be greater than 3', + 'rule_type': 'query_dq', + 'tag': 'accuracy', + 'action_if_failed': 'fail' + } + ] + } import pytest @@ -474,140 +488,146 @@ def compare_result(_actual_output, _expected_output): compare_result(actual_output, expected_output) - - @pytest.mark.parametrize("_query_dq_rule, query_dq_detailed_expected_result, _source_dq_status,_target_dq_status", [ # expectations rule ({ - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_row_count_gt_1", - "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", - "enable_querydq_custom_output": True, - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_target_dq_validation": True, - "enable_for_source_dq_validation": True, - "description": "table count should be greater than 1", - "expectation_source_f1": "select count(*) from query_test_table", - "expectation_target_f1": "select count(*) from query_test_table_target" - }, + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_row_count_gt_1", + "column_name": "col1", + "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", + "enable_querydq_custom_output": True, + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_target_dq_validation": True, + "enable_for_source_dq_validation": True, + "description": "table count should be greater than 1", + "expectation_source_f1": "select count(*) from query_test_table", + "expectation_target_f1": "select count(*) from query_test_table_target" + }, # result in spark col object { - "product_id": "product_1", - "table_name": "test_table", - "rule_type": "query_dq", - "rule": "table_row_count_gt_1", - "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", - "tag": "validity", - "status": "fail", - "description": "table count should be greater than 1", - "actual_value" : 0, - "expected_value" : '>3' - },True,False), + "product_id": "product_1", + "table_name": "test_table", + "rule_type": "query_dq", + "rule": "table_row_count_gt_1", + "column_name": "col1", + "expectation": "((select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", + "tag": "validity", + "status": "fail", + "description": "table count should be greater than 1", + "actual_value": 0, + "expected_value": '>3' + }, True, False), # expectations rule ({ - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_distinct_count", - "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", - "enable_querydq_custom_output": False, - "action_if_failed": "fail", - "table_name": "test_table", - "tag": "accuracy", - "enable_for_target_dq_validation": True, - "enable_for_source_dq_validation": True, - "description": "table distinct row count should be greater than 3", - "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)", - "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)" - }, + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_distinct_count", + "column_name": "col1", + "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", + "enable_querydq_custom_output": False, + "action_if_failed": "fail", + "table_name": "test_table", + "tag": "accuracy", + "enable_for_target_dq_validation": True, + "enable_for_source_dq_validation": True, + "description": "table distinct row count should be greater than 3", + "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)", + "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)" + }, # result in spark col object { - "product_id": "product_1", - "table_name": "test_table", - "rule_type": "query_dq", - "rule": "table_distinct_count", - "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", - "tag": "accuracy", - "status": "fail", - "description": "table distinct row count should be greater than 3", - "actual_value" : 0, - "expected_value" : '>3' - },False, True - ), + "product_id": "product_1", + "table_name": "test_table", + "rule_type": "query_dq", + "rule": "table_distinct_count", + "column_name": "col1", + "expectation": "((select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", + "tag": "accuracy", + "status": "fail", + "description": "table distinct row count should be greater than 3", + "actual_value": 0, + "expected_value": '>3' + }, False, True + ), ]) def test_agg_query_dq_detailed_result_with_querdq_v2(_fixture_df, - _query_dq_rule, - query_dq_detailed_expected_result, - _fixture_mock_context,_source_dq_status,_target_dq_status): - + _query_dq_rule, + query_dq_detailed_expected_result, + _fixture_mock_context, _source_dq_status, _target_dq_status): _fixture_df.createOrReplaceTempView("query_test_table") _fixture_df.createOrReplaceTempView("query_test_table_target") - result_out,result_output = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _query_dq_rule,_fixture_df,[],_source_dq_status=_source_dq_status,_target_dq_status=_target_dq_status - ) - print("result_df:",result_output) - print("query_dq_detailed_expected_result:",query_dq_detailed_expected_result) - + result_out, result_output = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, + _query_dq_rule, _fixture_df, [], + _source_dq_status=_source_dq_status, + _target_dq_status=_target_dq_status + ) + print("result_df:", result_output) + print("query_dq_detailed_expected_result:", query_dq_detailed_expected_result) assert result_output[1] == query_dq_detailed_expected_result.get("product_id") assert result_output[2] == query_dq_detailed_expected_result.get("table_name") assert result_output[3] == query_dq_detailed_expected_result.get("rule_type") assert result_output[4] == query_dq_detailed_expected_result.get("rule") - assert result_output[5] == query_dq_detailed_expected_result.get("expectation") - assert result_output[6] == query_dq_detailed_expected_result.get("tag") - assert result_output[7] == query_dq_detailed_expected_result.get("description") - assert result_output[8] == query_dq_detailed_expected_result.get("status") - - assert result_output[9] == query_dq_detailed_expected_result.get("actual_value") - assert result_output[10] == query_dq_detailed_expected_result.get("expected_value") + assert result_output[5] == query_dq_detailed_expected_result.get("column_name") + assert result_output[6] == query_dq_detailed_expected_result.get("expectation") + assert result_output[7] == query_dq_detailed_expected_result.get("tag") + assert result_output[8] == query_dq_detailed_expected_result.get("description") + assert result_output[9] == query_dq_detailed_expected_result.get("status") + assert result_output[10] == query_dq_detailed_expected_result.get("actual_value") + assert result_output[11] == query_dq_detailed_expected_result.get("expected_value") @pytest.mark.parametrize("_query_dq_rule_exception", [ # expectations rule ({ - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_row_count_gt_1", - "expectation": "(select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", - "enable_querydq_custom_output": True, - "action_if_failed": "ignore", - "table_name": "test_table", - "tag": "validity", - "enable_for_target_dq_validation": True, - "description": "table count should be greater than 1", - "expectation_source_f1": "select count(*) from query_test_table", - "expectation_target_f1": "select count(*) from query_test_table_target" + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_row_count_gt_1", + "column_name": "col1", + "expectation": "(select count(*) from query_test_table)-(select count(*) from query_test_table_target))>(select count(*) from query_test_table)", + "enable_querydq_custom_output": True, + "action_if_failed": "ignore", + "table_name": "test_table", + "tag": "validity", + "enable_for_target_dq_validation": True, + "description": "table count should be greater than 1", + "expectation_source_f1": "select count(*) from query_test_table", + "expectation_target_f1": "select count(*) from query_test_table_target" } ), # expectations rule ({ - "product_id": "product_1", - "rule_type": "query_dq", - "rule": "table_distinct_count", - "expectation": "(select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", - "enable_querydq_custom_output": False, - "action_if_failed": "fail", - "table_name": "test_table", - "tag": "accuracy", - "enable_for_target_dq_validation": True, - "enable_for_source_dq_validation": True, - "description": "table distinct row count should be greater than 3", - "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)", - "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)" + "product_id": "product_1", + "rule_type": "query_dq", + "rule": "table_distinct_count", + "column_name": "col1", + "expectation": "(select count(*) from (select distinct col1, col2 from query_test_table))-(select count(*) from (select distinct col1, col2 from query_test_table_target)))>(select count(*) from (select distinct col1, col2 from query_test_table_target))", + "enable_querydq_custom_output": False, + "action_if_failed": "fail", + "table_name": "test_table", + "tag": "accuracy", + "enable_for_target_dq_validation": True, + "enable_for_source_dq_validation": True, + "description": "table distinct row count should be greater than 3", + "expectation_source_f1": "select count(*) from (select distinct col1, col2 from query_test_table)", + "expectation_target_f1": "select count(*) from (select distinct col1, col2 from query_test_table_target)" } - ), - + ), + ]) def test_agg_query_dq_detailed_result_exception_v2(_fixture_df, - _query_dq_rule_exception,_fixture_mock_context): + _query_dq_rule_exception, _fixture_mock_context): # faulty user input is given to test the exception functionality of the agg_query_dq_detailed_result _fixture_df.createOrReplaceTempView("query_test_table") _fixture_df.createOrReplaceTempView("query_test_table_target") with pytest.raises(SparkExpectationsMiscException, match=r"(error occurred while running agg_query_dq_detailed_result Sql query is invalid. *)|(error occurred while running agg_query_dq_detailed_result Regex match not found. *)"): - SparkExpectationsActions().agg_query_dq_detailed_result(_fixture_mock_context, _query_dq_rule_exception,_fixture_df,[] ) + SparkExpectationsActions().agg_query_dq_detailed_result(_fixture_mock_context, _query_dq_rule_exception, + _fixture_df, []) @pytest.mark.parametrize("input_df, rule_type_name, expected_output", @@ -645,7 +665,8 @@ def test_create_agg_dq_results(input_df, rule_type_name, expected_output, _fixture_mock_context): # unit test case on create_agg_dq_results - assert SparkExpectationsActions().create_agg_dq_results(_fixture_mock_context,input_df, rule_type_name, ) == expected_output + assert SparkExpectationsActions().create_agg_dq_results(_fixture_mock_context, input_df, + rule_type_name, ) == expected_output @pytest.mark.parametrize("input_df", @@ -666,100 +687,107 @@ def test_create_agg_dq_results_exception(input_df, def test_agg_query_dq_detailed_result_exception(_fixture_df, - _fixture_query_dq_rule): + _fixture_query_dq_rule): _mock_object_context = Mock(spec=SparkExpectationsContext) # faulty user input is given to test the exception functionality of the agg_query_dq_detailed_result - + with pytest.raises(SparkExpectationsMiscException, match=r"error occurred while running agg_query_dq_detailed_result .*"): - SparkExpectationsActions().agg_query_dq_detailed_result(_mock_object_context, "_fixture_query_dq_rule","",[] ) + SparkExpectationsActions().agg_query_dq_detailed_result(_mock_object_context, "_fixture_query_dq_rule", "", + []) def test_agg_query_dq_detailed_result(_fixture_df, - _fixture_agg_dq_rule, - _fixture_agg_dq_detailed_expected_result, - _fixture_mock_context): - result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _fixture_agg_dq_rule,_fixture_df,[] - ) - - + _fixture_agg_dq_rule, + _fixture_agg_dq_detailed_expected_result, + _fixture_mock_context): + result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, + _fixture_agg_dq_rule, _fixture_df, [] + ) + assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result").get("product_id") assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result").get("table_name") assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result").get("rule_type") assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result").get("rule") - assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result").get("expectation") - assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result").get("tag") - assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result").get("description") - assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result").get("status") - - assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result").get("actual_value") - assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result").get("expected_value") + assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result").get("column_name") + assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result").get("expectation") + assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result").get("tag") + assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result").get("description") + assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result").get("status") + + assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result").get("actual_value") + assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result").get("expected_value") def test_agg_query_dq_detailed_result_with_range_rule_type(_fixture_df, - _fixture_agg_dq_rule_type_range, - _fixture_agg_dq_detailed_expected_result, - _fixture_mock_context): - result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _fixture_agg_dq_rule_type_range,_fixture_df,[] - ) - + _fixture_agg_dq_rule_type_range, + _fixture_agg_dq_detailed_expected_result, + _fixture_mock_context): + result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, + _fixture_agg_dq_rule_type_range, + _fixture_df, [] + ) + assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("product_id") assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("table_name") assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("rule_type") assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("rule") - assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("expectation") - assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("tag") - assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("description") - assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("status") - - assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("actual_value") - assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("expected_value") + assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("column_name") + assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("expectation") + assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("tag") + assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("description") + assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("status") + + assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get("actual_value") + assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result_without_context1").get( + "expected_value") def test_agg_query_dq_detailed_result_with_querdq(_fixture_df, - _fixture_query_dq_rule, - _fixture_agg_dq_detailed_expected_result, - _fixture_mock_context): - + _fixture_query_dq_rule, + _fixture_agg_dq_detailed_expected_result, + _fixture_mock_context): _fixture_df.createOrReplaceTempView("query_test_table") _fixture_df.createOrReplaceTempView("query_test_table_target") - result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, _fixture_query_dq_rule,_fixture_df,[] - ) - + result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context, + _fixture_query_dq_rule, _fixture_df, + [] + ) + assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("product_id") assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("table_name") assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("rule_type") assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("rule") - assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expectation") - assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("tag") - assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("description") - assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("status") - - assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("actual_value") - assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expected_value") + assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("column_name") + assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expectation") + assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("tag") + assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("description") + assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("status") + + assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("actual_value") + assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result_query_dq").get("expected_value") def test_agg_query_dq_detailed_result_without_detailed_context(_fixture_df, - _fixture_agg_dq_rule, - _fixture_agg_dq_detailed_expected_result, - _fixture_mock_context_without_detailed_stats): - result_out,result_df = SparkExpectationsActions.agg_query_dq_detailed_result(_fixture_mock_context_without_detailed_stats, _fixture_agg_dq_rule,_fixture_df,[] - ) - + _fixture_agg_dq_rule, + _fixture_agg_dq_detailed_expected_result, + _fixture_mock_context_without_detailed_stats): + result_out, result_df = SparkExpectationsActions.agg_query_dq_detailed_result( + _fixture_mock_context_without_detailed_stats, _fixture_agg_dq_rule, _fixture_df, [] + ) - assert result_df[1] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("product_id") assert result_df[2] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("table_name") assert result_df[3] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("rule_type") assert result_df[4] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("rule") - assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expectation") - assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("tag") - assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("description") - assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("status") - - assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("actual_value") - assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expected_value") + assert result_df[5] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("column_name") + assert result_df[6] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expectation") + assert result_df[7] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("tag") + assert result_df[8] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("description") + assert result_df[9] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("status") + assert result_df[10] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("actual_value") + assert result_df[11] == _fixture_agg_dq_detailed_expected_result.get("result_without_context").get("expected_value") def test_run_dq_rules_row(_fixture_df, @@ -790,14 +818,14 @@ def test_run_dq_rules_row(_fixture_df, (True, False), (False, True), ]) - def test_run_dq_rules_agg(_fixture_df, _fixture_expectations, _fixture_agg_dq_expected_result, - _fixture_mock_context,agg_dq_source_dq_status,agg_dq_target_dq_status): + _fixture_mock_context, agg_dq_source_dq_status, agg_dq_target_dq_status): # Apply the data quality rules - result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations,"agg_dq",agg_dq_source_dq_status,agg_dq_target_dq_status) + result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations, + "agg_dq", agg_dq_source_dq_status, agg_dq_target_dq_status) # Assert that the result dataframe has the expected number of columns assert len(result_df.columns) == 1 @@ -809,8 +837,6 @@ def test_run_dq_rules_agg(_fixture_df, assert row.meta_agg_dq_results == _fixture_agg_dq_expected_result.get("result") - - @pytest.mark.parametrize("query_dq_source_dq_status,query_dq_target_dq_status", [ (True, False), (False, True), @@ -818,12 +844,13 @@ def test_run_dq_rules_agg(_fixture_df, def test_run_dq_rules_query(_fixture_df, _fixture_expectations, _fixture_query_dq_expected_result, - _fixture_mock_context,query_dq_source_dq_status,query_dq_target_dq_status): + _fixture_mock_context, query_dq_source_dq_status, query_dq_target_dq_status): # Apply the data quality rules _fixture_df.createOrReplaceTempView("query_test_table") _fixture_df.createOrReplaceTempView("query_test_table_target") - - result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations,"query_dq",query_dq_source_dq_status,query_dq_target_dq_status) + + result_df = SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _fixture_expectations, + "query_dq", query_dq_source_dq_status, query_dq_target_dq_status) # Assert that the result dataframe has the expected number of columns assert len(result_df.columns) == 1 @@ -1534,6 +1561,7 @@ def test_run_dq_rules_condition_expression_exception(_fixture_df, { "rule_type": "query_dq", "rule": "table_row_count_gt_1", + "column_name": "col1", "expectation": "(select count(*) from query_test_table)>1", "action_if_failed": "ignore", "table_name": "test_table", @@ -1541,26 +1569,25 @@ def test_run_dq_rules_condition_expression_exception(_fixture_df, "enable_for_target_dq_validation": False, "description": "table count should be greater than 1" }, - - ], - } + + ], + } _fixture_df.createOrReplaceTempView("query_test_table") with pytest.raises(SparkExpectationsMiscException, match=r"error occurred while running expectations .*"): - SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _expectations, - "query_dq", False, True) + "query_dq", False, True) @pytest.mark.parametrize("_rule_test", [ - + ({"rule_type": "query"}), - + ]) def test_run_dq_rules_condition_expression_dynamic_exception(_fixture_df, - _fixture_query_dq_expected_result, - _fixture_mock_context,_rule_test): + _fixture_query_dq_expected_result, + _fixture_mock_context, _rule_test): # Apply the data quality rules _expectations = {"query_rules": [ { @@ -1573,13 +1600,13 @@ def test_run_dq_rules_condition_expression_dynamic_exception(_fixture_df, "enable_for_target_dq_validation": False, "description": "table count should be greater than 1" }, - - ], - } + + ], + } _fixture_df.createOrReplaceTempView("query_test_table") with pytest.raises(SparkExpectationsMiscException, match=r"error occurred while running expectations .*"): - _rule_type= _rule_test.get("rule_type") + _rule_type = _rule_test.get("rule_type") SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _expectations, - _rule_type, False, True) \ No newline at end of file + _rule_type, False, True)