From 3cd0422ae657895ec67a26485892f862828c0000 Mon Sep 17 00:00:00 2001 From: rxu17 <26471741+rxu17@users.noreply.github.com> Date: Thu, 5 Sep 2024 19:47:31 -0700 Subject: [PATCH] make consistent naming --- .../jobs/run_great_expectations_on_parquet.py | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/glue/jobs/run_great_expectations_on_parquet.py b/src/glue/jobs/run_great_expectations_on_parquet.py index 1773476..136cda3 100644 --- a/src/glue/jobs/run_great_expectations_on_parquet.py +++ b/src/glue/jobs/run_great_expectations_on_parquet.py @@ -249,21 +249,21 @@ def get_batch_request( def read_json( s3: boto3.client, s3_bucket: str, - expectations_key_prefix: str, + key_prefix: str, ) -> Dict[str, str]: """Reads in a json object Args: s3 (boto3.client): s3 client connection s3_bucket (str): name of the s3 bucket to read from - expectations_key_prefix (str): s3 key prefix of the - location of the expectations json to read from + key_prefix (str): s3 key prefix of the + location of the json to read from Returns: - Dict[str, str]: the expectations suite read in from json + Dict[str, str]: the data read in from json """ # read in the json filelist - s3_response_object = s3.get_object(Bucket=s3_bucket, Key=expectations_key_prefix) + s3_response_object = s3.get_object(Bucket=s3_bucket, Key=key_prefix) json_content = s3_response_object["Body"].read().decode("utf-8") expectations = json.loads(json_content) return expectations @@ -352,14 +352,23 @@ def add_validation_results_to_store( def main(): - args = read_args() + # args = read_args() + args = { + "parquet_bucket": "recover-dev-processed-data", + "shareable_artifacts_bucket": "recover-dev-shareable-artifacts-vpn", + "cfn_bucket": "recover-dev-cloudformation", + "namespace": "etl-616", + "data_type": "healthkitv2workouts", + "expectation_suite_key_prefix": "etl-616/src/glue/resources/data_values_expectations.json", + } + run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}") expectation_suite_name = f"{args['data_type']}_expectations" s3 = boto3.client("s3") context = create_context( s3_bucket=args["shareable_artifacts_bucket"], namespace=args["namespace"], - report_prefix=f"great_expectation_reports/{args['data_type']}/parquet/", + key_prefix=f"great_expectation_reports/{args['data_type']}/parquet/", ) glue_context = GlueContext(SparkContext.getOrCreate()) logger.info("get_spark_df") @@ -367,7 +376,7 @@ def main(): glue_context=glue_context, parquet_bucket=args["parquet_bucket"], namespace=args["namespace"], - datatype=args["data_type"], + data_type=args["data_type"], ) logger.info("isNull") null_rows = spark_df.ParticipantIdentifier.isNull() @@ -385,12 +394,12 @@ def main(): expectations_data = read_json( s3=s3, s3_bucket=args["cfn_bucket"], - expectations_key_prefix=args["expectation_suite_key_prefix"], + key_prefix=args["expectation_suite_key_prefix"], ) logger.info("adds_expectations_from_json") add_expectations_from_json( expectations_data=expectations_data, - data_context=context, + context=context, data_type=args["data_type"], ) logger.info("get_validator")