Skip to content

Commit

Permalink
ETL-580 code review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Dec 14, 2023
1 parent 0d3984e commit 8cef26c
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 73 deletions.
75 changes: 40 additions & 35 deletions src/glue/jobs/json_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
the healthkitv2heartbeat data type has a field called "SubSamples" which is an
array of objects. We will write out two parquet datasets in this case, a `healthkitv2heartbeat`
dataset and an `healthkitv2heartbeat_subsamples` dataset.
"""

import logging
import os
import sys
from collections import defaultdict
from copy import deepcopy
from enum import Enum

import boto3
import ecs_logging
Expand Down Expand Up @@ -120,7 +119,8 @@ def get_table(
logger_context: dict,
) -> DynamicFrame:
"""
Return a table as a DynamicFrame with an unambiguous schema.
Return a table as a DynamicFrame with an unambiguous schema. Additionally,
we drop any superfluous partition_* fields which are added by Glue.
Args:
table_name (str): The name of the Glue table.
Expand Down Expand Up @@ -149,11 +149,11 @@ def get_table(
for field in table.schema():
if "partition_" in field.name: # superfluous field added by Glue
partition_fields.append(field.name)
if len(partition_fields):
if len(partition_fields) > 0:
table = table.drop_fields(paths=partition_fields)
count_records_for_event(
table=table.toDF(),
event="READ",
event=CountEventType.READ,
record_counts=record_counts,
logger_context=logger_context,
)
Expand All @@ -172,8 +172,7 @@ def drop_table_duplicates(
Duplicate samples are dropped by referencing the `InsertedDate`
field, keeping the more recent sample. For any data types which
don't have this field, we drop duplicates by referencing `export_end_date`.
Additionally, we drop any superfluous partition_* fields which are
added by Glue.
Args:
table (DynamicFrame): The table from which to drop duplicates
Expand Down Expand Up @@ -207,7 +206,7 @@ def drop_table_duplicates(
)
count_records_for_event(
table=table.toDF(),
event="DROP_DUPLICATES",
event=CountEventType.DROP_DUPLICATES,
record_counts=record_counts,
logger_context=logger_context,
)
Expand Down Expand Up @@ -285,7 +284,7 @@ def drop_deleted_healthkit_data(
)
count_records_for_event(
table=table_with_deleted_samples_removed.toDF(),
event="DROP_DELETED_SAMPLES",
event=CountEventType.DROP_DELETED_SAMPLES,
record_counts=record_counts,
logger_context=logger_context,
)
Expand Down Expand Up @@ -414,9 +413,35 @@ def write_table_to_s3(
format = "parquet",
transformation_ctx="write_dynamic_frame")

class CountEventType(Enum):
"""The event associated with a count."""

"""
This table has just now been read from the Glue table catalog
and has not yet had any transformations done to it.
"""
READ = "READ"

"""
This table has just now had duplicate records dropped
(see the function `drop_table_duplicates`).
"""
DROP_DUPLICATES = "DROP_DUPLICATES"

"""
This table has just now had records which are present in its respective
"Deleted" table dropped (see the function `drop_deleted_healthkit_data`).
"""
DROP_DELETED_SAMPLES = "DROP_DELETED_SAMPLES"

"""
This table has just now been written to S3.
"""
WRITE = "WRITE"

def count_records_for_event(
table: "pyspark.sql.dataframe.DataFrame",
event: str,
event: CountEventType,
record_counts: dict[str,list],
logger_context: dict,
) -> dict[str,list]:
Expand All @@ -428,17 +453,8 @@ def count_records_for_event(
Args:
table (pyspark.sql.dataframe.DataFrame): The dataframe to derive counts from.
event (str): The event associated with this count. An event
is restricted to the following set of values:
READ - This table has just now been read from the Glue table catalog
and has not yet had any transformations done to it.
DROP_DUPLICATES - This table has just now had duplicate records dropped
(see the function `drop_table_duplicates`).
DROP_DELETED_SAMPLES - This table has just now had records which are
present in its respective "Deleted" table dropped (see the function
`drop_deleted_healthkit_data`).
WRITE - This table has just now been written to S3.
event (CountEventType): The event associated with this count. See class
`CountEventType` for allowed values and their descriptions.
record_counts (dict[str,list]): A dict mapping data types to a list
of counts, each of which corresponds to an `event` type. This object
is built up cumulatively by this function over the course of this job.
Expand All @@ -460,14 +476,6 @@ def count_records_for_event(
"""
if table.count() == 0:
return record_counts
allowed_events = {
"READ", "DROP_DUPLICATES", "DROP_DELETED_SAMPLES", "WRITE"
}
if event not in allowed_events:
raise ValueError(
f"Argument `event` = {event} not in allowed "
f"`event` values: {allowed_events}"
)
table_counts = (
table
.groupby(["export_end_date"])
Expand All @@ -476,7 +484,7 @@ def count_records_for_event(
)
table_counts["workflow_run_id"] = logger_context["process.parent.pid"]
table_counts["data_type"] = logger_context["labels"]["type"]
table_counts["event"] = event
table_counts["event"] = event.value
record_counts[logger_context["labels"]["type"]].append(table_counts)
return record_counts

Expand Down Expand Up @@ -539,7 +547,6 @@ def add_index_to_table(
table_name: str,
processed_tables: dict[str, DynamicFrame],
unprocessed_tables: dict[str, DynamicFrame],
glue_context: GlueContext,
) -> "pyspark.sql.dataframe.DataFrame":
"""Add partition and index fields to a DynamicFrame.
Expand All @@ -564,7 +571,6 @@ def add_index_to_table(
that a child table may always reference the index of its parent table.
unprocessed_tables (dict): A mapping from table keys to DynamicFrames which
don't yet have an index.
glue_context (GlueContext): The glue context
Returns:
awsglue.DynamicFrame with index columns
Expand Down Expand Up @@ -688,7 +694,6 @@ def main() -> None:
table_name=table_name,
processed_tables=tables_with_index,
unprocessed_tables=table_relationalized,
glue_context=glue_context
)
for t in tables_with_index:
clean_name = t.replace(".", "_").lower()
Expand All @@ -711,7 +716,7 @@ def main() -> None:
)
count_records_for_event(
table=tables_with_index[ordered_keys[0]],
event="WRITE",
event=CountEventType.WRITE,
record_counts=record_counts,
logger_context=logger_context,
)
Expand All @@ -729,7 +734,7 @@ def main() -> None:
)
count_records_for_event(
table=table.toDF(),
event="WRITE",
event=CountEventType.WRITE,
record_counts=record_counts,
logger_context=logger_context,
)
Expand Down
87 changes: 51 additions & 36 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import boto3
import ecs_logging
from awsglue.utils import getResolvedOptions
from typing import Generator

DATA_TYPES_WITH_SUBTYPE = ["HealthKitV2Samples", "HealthKitV2Statistics"]

Expand Down Expand Up @@ -132,25 +131,33 @@ def _log_error_transform_object_to_array_of_objects(
value_error = "Failed to cast %s to %s."
logger.error(
value_error, value, value_type,
extra={
**logger_context,
"error.message": repr(error),
"error.type": type(error).__name__,
"event.kind": "alert",
"event.category": ["configuration"],
"event.type": ["change"],
"event.outcome": "failure"
}
extra=dict(
merge_dicts(
logger_context,
{
"error.message": repr(error),
"error.type": type(error).__name__,
"event.kind": "alert",
"event.category": ["configuration"],
"event.type": ["change"],
"event.outcome": "failure"
}
)
)
)
logger.warning(
"Setting %s to None", value,
extra={
**logger_context,
"event.kind": "alert",
"event.category": ["configuration"],
"event.type": ["deletion"],
"event.outcome": "success"
}
extra=dict(
merge_dicts(
logger_context,
{
"event.kind": "alert",
"event.category": ["configuration"],
"event.type": ["deletion"],
"event.outcome": "success"
}
)
)
)

def transform_json(
Expand Down Expand Up @@ -309,15 +316,19 @@ def _cast_custom_fields_to_array(json_obj: dict, logger_context: dict) -> dict:
logger.error(
(f"Problem CustomFields.{field_name}: "
f"{json_obj['CustomFields'][field_name]}"),
extra={
**logger_context,
"error.message": repr(error),
"error.type": "json.JSONDecodeError",
"event.kind": "alert",
"event.category": ["change"],
"event.type": ["error"],
"event.outcome": "failure",
}
extra=dict(
merge_dicts(
logger_context,
{
"error.message": repr(error),
"error.type": "json.JSONDecodeError",
"event.kind": "alert",
"event.category": ["change"],
"event.type": ["error"],
"event.outcome": "failure",
}
)
)
)
json_obj["CustomFields"][field_name] = []
else:
Expand Down Expand Up @@ -646,7 +657,7 @@ def _upload_file_to_json_dataset(
os.remove(file_path)
return s3_output_key

def merge_dicts(x: dict, y: dict) -> Generator:
def merge_dicts(x: dict, y: dict) -> typing.Generator:
"""
Merge two dictionaries recursively.
Expand Down Expand Up @@ -829,15 +840,19 @@ def process_record(
}
logger.info(
"Input file attributes",
extra={
**logger_context,
"file.size": sys.getsizeof(json_path),
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "access"],
"event.action": "list-file-properties",
}
extra=dict(
merge_dicts(
logger_context,
{
"file.size": sys.getsizeof(json_path),
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "access"],
"event.action": "list-file-properties",
}
)
)
)
write_file_to_json_dataset(
z=z,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_json_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ def test_count_records_for_event_empty_table(self, sample_table, logger_context)
empty_table = spark_sample_table.filter(spark_sample_table.city == "Atlantis")
record_counts = json_to_parquet.count_records_for_event(
table=empty_table,
event="READ",
event=json_to_parquet.CountEventType.READ,
record_counts=defaultdict(list),
logger_context=logger_context
)
Expand All @@ -969,7 +969,7 @@ def test_count_records(self, sample_table, logger_context):
spark_sample_table = sample_table.toDF()
record_counts = json_to_parquet.count_records_for_event(
table=spark_sample_table,
event="READ",
event=json_to_parquet.CountEventType.READ,
record_counts=defaultdict(list),
logger_context=logger_context
)
Expand Down

0 comments on commit 8cef26c

Please sign in to comment.