Skip to content

Commit

Permalink
make consistent naming
Browse files Browse the repository at this point in the history
  • Loading branch information
rxu17 committed Sep 6, 2024
1 parent 2ad2f61 commit 3cd0422
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions src/glue/jobs/run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,21 +249,21 @@ def get_batch_request(
def read_json(
s3: boto3.client,
s3_bucket: str,
expectations_key_prefix: str,
key_prefix: str,
) -> Dict[str, str]:
"""Reads in a json object
Args:
s3 (boto3.client): s3 client connection
s3_bucket (str): name of the s3 bucket to read from
expectations_key_prefix (str): s3 key prefix of the
location of the expectations json to read from
key_prefix (str): s3 key prefix of the
location of the json to read from
Returns:
Dict[str, str]: the expectations suite read in from json
Dict[str, str]: the data read in from json
"""
# read in the json filelist
s3_response_object = s3.get_object(Bucket=s3_bucket, Key=expectations_key_prefix)
s3_response_object = s3.get_object(Bucket=s3_bucket, Key=key_prefix)
json_content = s3_response_object["Body"].read().decode("utf-8")
expectations = json.loads(json_content)
return expectations
Expand Down Expand Up @@ -352,22 +352,31 @@ def add_validation_results_to_store(


def main():
args = read_args()
# args = read_args()
args = {
"parquet_bucket": "recover-dev-processed-data",
"shareable_artifacts_bucket": "recover-dev-shareable-artifacts-vpn",
"cfn_bucket": "recover-dev-cloudformation",
"namespace": "etl-616",
"data_type": "healthkitv2workouts",
"expectation_suite_key_prefix": "etl-616/src/glue/resources/data_values_expectations.json",
}

run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
expectation_suite_name = f"{args['data_type']}_expectations"
s3 = boto3.client("s3")
context = create_context(
s3_bucket=args["shareable_artifacts_bucket"],
namespace=args["namespace"],
report_prefix=f"great_expectation_reports/{args['data_type']}/parquet/",
key_prefix=f"great_expectation_reports/{args['data_type']}/parquet/",
)
glue_context = GlueContext(SparkContext.getOrCreate())
logger.info("get_spark_df")
spark_df = get_spark_df(
glue_context=glue_context,
parquet_bucket=args["parquet_bucket"],
namespace=args["namespace"],
datatype=args["data_type"],
data_type=args["data_type"],
)
logger.info("isNull")
null_rows = spark_df.ParticipantIdentifier.isNull()
Expand All @@ -385,12 +394,12 @@ def main():
expectations_data = read_json(
s3=s3,
s3_bucket=args["cfn_bucket"],
expectations_key_prefix=args["expectation_suite_key_prefix"],
key_prefix=args["expectation_suite_key_prefix"],
)
logger.info("adds_expectations_from_json")
add_expectations_from_json(
expectations_data=expectations_data,
data_context=context,
context=context,
data_type=args["data_type"],
)
logger.info("get_validator")
Expand Down

0 comments on commit 3cd0422

Please sign in to comment.