Skip to content

Commit

Permalink
Merge pull request #76 from Sage-Bionetworks/etl-533
Browse files Browse the repository at this point in the history
[ETL-533] Add `cohort` to data as partition
  • Loading branch information
philerooski authored Sep 7, 2023
2 parents f729901 + c4922d2 commit faa1350
Show file tree
Hide file tree
Showing 17 changed files with 300 additions and 71 deletions.
2 changes: 2 additions & 0 deletions config/develop/namespaced/glue-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ dependencies:
- develop/namespaced/glue-job-S3ToJsonS3.yaml
- develop/namespaced/glue-job-JSONToParquet.yaml
- develop/namespaced/glue-job-compare-parquet.yaml
- develop/glue-job-role.yaml
parameters:
Namespace: {{ stack_group_config.namespace }}
JsonBucketName: {{ stack_group_config.intermediate_bucket_name }}
ParquetBucketName: {{ stack_group_config.processed_data_bucket_name }}
GlueDatabase: !stack_output_external "{{ stack_group_config.namespace }}-glue-tables::DatabaseName"
CrawlerRole: !stack_output_external glue-job-role::RoleArn
S3ToJsonJobName: !stack_output_external "{{ stack_group_config.namespace }}-glue-job-S3ToJsonS3::JobName"
CompareParquetStagingNamespace: {{ stack_group_config.namespace }}
CompareParquetMainNamespace: "main"
Expand Down
2 changes: 2 additions & 0 deletions config/prod/namespaced/glue-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ dependencies:
- prod/namespaced/glue-job-S3ToJsonS3.yaml
- prod/namespaced/glue-job-JSONToParquet.yaml
- prod/namespaced/glue-job-compare-parquet.yaml
- prod/glue-job-role.yaml
parameters:
Namespace: {{ stack_group_config.namespace }}
JsonBucketName: {{ stack_group_config.intermediate_bucket_name }}
ParquetBucketName: {{ stack_group_config.processed_data_bucket_name }}
GlueDatabase: !stack_output_external "{{ stack_group_config.namespace }}-glue-tables::DatabaseName"
CrawlerRole: !stack_output_external glue-job-role::RoleArn
S3ToJsonJobName: !stack_output_external "{{ stack_group_config.namespace }}-glue-job-S3ToJsonS3::JobName"
CompareParquetStagingNamespace: "staging"
CompareParquetMainNamespace: "main"
Expand Down
28 changes: 17 additions & 11 deletions src/glue/jobs/json_to_parquet.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""
This script runs as a Glue job and converts a collection of JSON files
(whose schema is defined by a Glue table), to a parquet dataset partitioned by
assessmentid / year / month / day. Additionally, if the table has nested data,
cohort. Additionally, if the table has nested data,
it will be separated into its own dataset with a predictable name. For example,
the info table (derived from info.json) has a field called "files" which is an
array of objects. We will write out two parquet datasets in this case, an `info`
dataset and an `info_files` dataset.
the healthkitv2heartbeat data type has a field called "SubSamples" which is an
array of objects. We will write out two parquet datasets in this case, a `healthkitv2heartbeat`
dataset and an `healthkitv2heartbeat_subsamples` dataset.
Before writing our tables to parquet datasets, we add the recordid,
assessmentid, year, month, and day to each record in each table.
"""

import os
Expand Down Expand Up @@ -108,6 +107,8 @@ def get_table(
Duplicate samples are dropped by referencing the `InsertedDate`
field, keeping the more recent sample. For any data types which
don't have this field, we drop duplicates by referencing `export_end_date`.
Additionally, we drop any superfluous partition_* fields which are
added by Glue.
Args:
table_name (str): The name of the Glue table.
Expand All @@ -125,6 +126,7 @@ def get_table(
glue_context.create_dynamic_frame.from_catalog(
database=database_name,
table_name=table_name,
additional_options={"groupFiles": "inPartition"},
transformation_ctx="create_dynamic_frame"
)
.resolveChoice(
Expand All @@ -135,9 +137,11 @@ def get_table(
)
if table.count() == 0:
return table
field_names = [field.name for field in table.schema().fields]
spark_df = table.toDF()
if "InsertedDate" in field_names:
for c in spark_df.columns:
if "partition_" in c: # superfluous field added by Glue
spark_df = spark_df.drop(c)
if "InsertedDate" in spark_df.columns:
sorted_spark_df = spark_df.sort(spark_df.InsertedDate.desc())
else:
sorted_spark_df = spark_df.sort(spark_df.export_end_date.desc())
Expand Down Expand Up @@ -322,7 +326,8 @@ def write_table_to_s3(
frame = dynamic_frame,
connection_type = "s3",
connection_options = {
"path": s3_write_path
"path": s3_write_path,
"partitionKeys": ["cohort"]
},
format = "parquet",
transformation_ctx="write_dynamic_frame")
Expand Down Expand Up @@ -390,8 +395,9 @@ def add_index_to_table(
logger.info(f"Adding index to {original_field_name}")
parent_index = (parent_table
.select(
[selectable_original_field_name] + INDEX_FIELD_MAP[table_data_type])
.distinct())
([selectable_original_field_name, "cohort"]
+ INDEX_FIELD_MAP[table_data_type])
).distinct())
this_index = parent_index.withColumnRenamed(original_field_name, "id")
df_with_index = this_table.join(
this_index,
Expand Down
60 changes: 48 additions & 12 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

DATA_TYPES_WITH_SUBTYPE = ["HealthKitV2Samples", "HealthKitV2Statistics"]

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(levelname)s:%(name)s:%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

def transform_object_to_array_of_objects(
json_obj_to_replace: dict,
Expand Down Expand Up @@ -96,13 +101,15 @@ def transform_object_to_array_of_objects(
def transform_json(
json_obj: dict,
dataset_identifier: str,
cohort: str,
metadata: dict,) -> dict:
"""
Perform the following transformations:
For every JSON:
- Add an export_start_date property (may be None)
- Add an export_end_date property (may be None)
- Add a cohort property
For JSON whose data types have a subtype:
- Add subtype as "Type" property
Expand All @@ -120,6 +127,7 @@ def transform_json(
Args:
json_obj (str): A JSON object sourced from the JSON file of this data type.
dataset_identifier (str): The data type of `json_obj`.
cohort (str): The cohort which this data associates with.
metadata (dict): Metadata derived from the file basename.
Returns:
Expand All @@ -130,6 +138,7 @@ def transform_json(
else:
json_obj["export_start_date"] = None
json_obj["export_end_date"] = metadata.get("end_date").isoformat()
json_obj["cohort"] = cohort
if dataset_identifier in DATA_TYPES_WITH_SUBTYPE:
# This puts the `Type` property back where Apple intended it to be
json_obj["Type"] = metadata["subtype"]
Expand Down Expand Up @@ -269,6 +278,7 @@ def get_output_filename(metadata: dict, part_number: int) -> str:
def transform_block(
input_json: typing.IO,
dataset_identifier: str,
cohort: str,
metadata: dict,
block_size: int=10000):
"""
Expand All @@ -283,6 +293,7 @@ def transform_block(
input_json (typing.IO): A file-like object of the JSON to be transformed.
dataset_identifier (str): The data type of `input_json`.
metadata (dict): Metadata derived from the file basename. See `get_metadata`.
cohort (str): The cohort which this data associates with.
block_size (int, optional): The number of records to process in each block.
Default is 10000.
Expand All @@ -295,6 +306,7 @@ def transform_block(
json_obj = transform_json(
json_obj=json_obj,
dataset_identifier=dataset_identifier,
cohort=cohort,
metadata=metadata
)
block.append(json_obj)
Expand All @@ -308,6 +320,7 @@ def write_file_to_json_dataset(
z: zipfile.ZipFile,
json_path: str,
dataset_identifier: str,
cohort: str,
metadata: dict,
workflow_run_properties: dict,
delete_upon_successful_upload: bool=True,
Expand All @@ -325,6 +338,7 @@ def write_file_to_json_dataset(
z (zipfile.Zipfile): The zip archive as provided by the data provider.
json_path (str): A JSON path relative to the root of `z`.
dataset_identifier (str): The data type of `json_path`.
cohort (str): The cohort which this data associates with.
metadata (dict): Metadata derived from the file basename.
workflow_run_properties (dict): The workflow arguments
delete_upon_successful_upload (bool): Whether to delete the local
Expand All @@ -337,7 +351,9 @@ def write_file_to_json_dataset(
list: A list of files uploaded to S3
"""
s3_client = boto3.client("s3")
os.makedirs(dataset_identifier, exist_ok=True)
part_dir = os.path.join(
f"dataset={dataset_identifier}", f"cohort={cohort}")
os.makedirs(part_dir, exist_ok=True)
s3_metadata = metadata.copy()
if s3_metadata["start_date"] is None:
s3_metadata.pop("start_date")
Expand All @@ -348,37 +364,36 @@ def write_file_to_json_dataset(
output_path = get_part_path(
metadata=metadata,
part_number=part_number,
dataset_identifier=dataset_identifier,
part_dir=part_dir,
touch=True
)
with z.open(json_path, "r") as input_json:
current_output_path = output_path
for transformed_block in transform_block(
input_json=input_json,
dataset_identifier=dataset_identifier,
cohort=cohort,
metadata=metadata
):
current_file_size = os.path.getsize(current_output_path)
if current_file_size > file_size_limit:
part_number += 1
print(f"!!! File is too large, creating new part {part_number}")
current_output_path = get_part_path(
metadata=metadata,
part_number=part_number,
dataset_identifier=dataset_identifier,
part_dir=part_dir,
touch=True
)
with open(current_output_path, "a") as f_out:
for transformed_record in transformed_block:
f_out.write("{}\n".format(json.dumps(transformed_record)))
uploaded_files = []
for part_file in os.listdir(dataset_identifier):
output_path = os.path.join(dataset_identifier, part_file)
for part_file in os.listdir(part_dir):
output_path = os.path.join(part_dir, part_file)
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
f"dataset={dataset_identifier}",
part_file
output_path
)
logger.debug(
"Uploading %s to %s",
Expand All @@ -398,7 +413,11 @@ def write_file_to_json_dataset(
os.remove(output_path)
return uploaded_files

def get_part_path(metadata: dict, part_number: int, dataset_identifier: str, touch: bool):
def get_part_path(
metadata: dict,
part_number: int,
part_dir: str,
touch: bool,):
"""
A helper function for `write_file_to_json_dataset`
Expand All @@ -408,7 +427,7 @@ def get_part_path(metadata: dict, part_number: int, dataset_identifier: str, tou
Args:
metadata (dict): Metadata derived from the file basename.
part_number (int): Which part we need a file name for.
dataset_identifier (str): The data type of `json_path`.
part_dir (str): The directory to which we write the part file.
touch (bool): Whether to create an empty file at the part path
Returns:
Expand All @@ -422,8 +441,9 @@ def get_part_path(metadata: dict, part_number: int, dataset_identifier: str, tou
metadata=metadata,
part_number=part_number
)
output_path = os.path.join(dataset_identifier, output_filename)
output_path = os.path.join(part_dir, output_filename)
if touch:
os.makedirs(part_dir, exist_ok=True)
with open(output_path, "x") as initial_file:
# create file
pass
Expand Down Expand Up @@ -478,6 +498,7 @@ def get_metadata(basename: str) -> dict:

def process_record(
s3_obj: dict,
cohort: str,
workflow_run_properties: dict) -> None:
"""
Write the contents of a .zip archive stored on S3 to their respective
Expand All @@ -488,6 +509,7 @@ def process_record(
Args:
s3_obj (dict): An S3 object as returned by `boto3.get_object`.
cohort (str): The cohort which this data associates with.
workflow_run_properties (dict): The workflow arguments
Returns:
Expand All @@ -511,6 +533,7 @@ def process_record(
z=z,
json_path=json_path,
dataset_identifier=dataset_identifier,
cohort=cohort,
metadata=metadata,
workflow_run_properties=workflow_run_properties)

Expand Down Expand Up @@ -544,8 +567,21 @@ def main() -> None:
Key = message["source_key"]
)
s3_obj["Body"] = s3_obj["Body"].read()
cohort = None
if "adults_v1" in message["source_key"]:
cohort = "adults_v1"
elif "pediatric_v1" in message["source_key"]:
cohort = "pediatric_v1"
else:
logger.warning(
"Could not determine the cohort of object at %s"
"This file will not be written to a JSON dataset.",
f"s3://{message['source_bucket']}/{message['source_key']}. "
)
continue
process_record(
s3_obj=s3_obj,
cohort=cohort,
workflow_run_properties=workflow_run_properties
)

Expand Down
Loading

0 comments on commit faa1350

Please sign in to comment.