diff --git a/src/glue/jobs/json_to_parquet.py b/src/glue/jobs/json_to_parquet.py index 7ebda420..470ba466 100644 --- a/src/glue/jobs/json_to_parquet.py +++ b/src/glue/jobs/json_to_parquet.py @@ -254,6 +254,10 @@ def drop_deleted_healthkit_data( try: glue_client.get_table(DatabaseName=glue_database, Name=deleted_table_name) except glue_client.exceptions.EntityNotFoundException: + logger.warn( + f"Did not find table with name '{deleted_table_name}' ", + f"in database {glue_database}." + ) return table deleted_table_logger_context = deepcopy(logger_context) deleted_table_logger_context["labels"]["glue_table_name"] = deleted_table_name @@ -265,6 +269,9 @@ def drop_deleted_healthkit_data( record_counts=record_counts, logger_context=deleted_table_logger_context, ) + if deleted_table_raw.count() == 0: + logger.info(f"The table for data type {deleted_data_type} did not contain any records.") + return table # we use `data_type` rather than `deleted_data_type` here because they share # an index (we don't bother including `deleted_data_type` in `INDEX_FIELD_MAP`). deleted_table = drop_table_duplicates( diff --git a/src/glue/jobs/s3_to_json.py b/src/glue/jobs/s3_to_json.py index ec5fb789..130c1a88 100644 --- a/src/glue/jobs/s3_to_json.py +++ b/src/glue/jobs/s3_to_json.py @@ -18,7 +18,12 @@ import ecs_logging from awsglue.utils import getResolvedOptions -DATA_TYPES_WITH_SUBTYPE = ["HealthKitV2Samples", "HealthKitV2Statistics"] +DATA_TYPES_WITH_SUBTYPE = [ + "HealthKitV2Samples", + "HealthKitV2Statistics", + "HealthKitV2Samples_Deleted", + "HealthKitV2Statistics_Deleted" +] logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -761,16 +766,7 @@ def get_metadata(basename: str) -> dict: datetime.datetime.strptime(basename_components[-1], "%Y%m%d") if metadata["type"] in DATA_TYPES_WITH_SUBTYPE: metadata["subtype"] = basename_components[1] - if ( - metadata["type"] - in [ - "HealthKitV2Samples", - "HealthKitV2Heartbeat", - "HealthKitV2Electrocardiogram", - "HealthKitV2Workouts", - ] - and basename_components[-2] == "Deleted" - ): + if "HealthKitV2" in metadata["type"] and basename_components[-2] == "Deleted": metadata["type"] = "{}_Deleted".format(metadata["type"]) return metadata diff --git a/src/glue/resources/table_columns.yaml b/src/glue/resources/table_columns.yaml index 91e4d5ae..ed6d105e 100644 --- a/src/glue/resources/table_columns.yaml +++ b/src/glue/resources/table_columns.yaml @@ -67,6 +67,25 @@ tables: partition_keys: - Name: cohort Type: string + HealthKitV2Statistics_Deleted: + columns: + - Name: HealthKitStatisticKey + Type: string + - Name: ParticipantIdentifier + Type: string + - Name: ParticipantID + Type: string + - Name: Type + Type: string + - Name: DeletedDate + Type: string + - Name: export_start_date + Type: string + - Name: export_end_date + Type: string + partition_keys: + - Name: cohort + Type: string HealthKitV2Samples: columns: - Name: HealthKitSampleKey @@ -108,6 +127,8 @@ tables: Type: string - Name: ParticipantID Type: string + - Name: Type + Type: string - Name: DeletedDate Type: string - Name: export_start_date @@ -150,6 +171,23 @@ tables: partition_keys: - Name: cohort Type: string + HealthKitV2ActivitySummaries_Deleted: + columns: + - Name: HealthKitActivitySummaryKey + Type: string + - Name: ParticipantIdentifier + Type: string + - Name: ParticipantID + Type: string + - Name: DeletedDate + Type: string + - Name: export_start_date + Type: string + - Name: export_end_date + Type: string + partition_keys: + - Name: cohort + Type: string HealthKitV2Electrocardiogram: columns: - Name: HealthKitECGSampleKey @@ -189,6 +227,23 @@ tables: partition_keys: - Name: cohort Type: string + HealthKitV2Electrocardiogram_Deleted: + columns: + - Name: HealthKitECGSampleKey + Type: string + - Name: ParticipantIdentifier + Type: string + - Name: ParticipantID + Type: string + - Name: DeletedDate + Type: string + - Name: export_start_date + Type: string + - Name: export_end_date + Type: string + partition_keys: + - Name: cohort + Type: string HealthKitV2Workouts: columns: - Name: HealthKitWorkoutKey @@ -220,6 +275,23 @@ tables: partition_keys: - Name: cohort Type: string + HealthKitV2Workouts_Deleted: + columns: + - Name: HealthKitWorkoutKey + Type: string + - Name: ParticipantIdentifier + Type: string + - Name: ParticipantID + Type: string + - Name: DeletedDate + Type: string + - Name: export_start_date + Type: string + - Name: export_end_date + Type: string + partition_keys: + - Name: cohort + Type: string HealthKitV2Heartbeat: columns: - Name: HealthKitHeartbeatSampleKey @@ -249,6 +321,23 @@ tables: partition_keys: - Name: cohort Type: string + HealthKitV2Heartbeat_Deleted: + columns: + - Name: HealthKitHeartbeatSampleKey + Type: string + - Name: ParticipantIdentifier + Type: string + - Name: ParticipantID + Type: string + - Name: DeletedDate + Type: string + - Name: export_start_date + Type: string + - Name: export_end_date + Type: string + partition_keys: + - Name: cohort + Type: string FitbitActivityLogs: columns: - Name: ParticipantIdentifier diff --git a/tests/test_json_to_parquet.py b/tests/test_json_to_parquet.py index 60512e28..0f6c6292 100644 --- a/tests/test_json_to_parquet.py +++ b/tests/test_json_to_parquet.py @@ -126,6 +126,11 @@ def flat_data_type(glue_flat_table_name): flat_data_type = glue_flat_table_name.split("_")[1] return flat_data_type +@pytest.fixture() +def nested_data_type(glue_nested_table_name): + nested_data_type = glue_nested_table_name.split("_")[1] + return nested_data_type + @pytest.fixture() def sample_table( spark_session, sample_table_data, glue_context, glue_flat_table_name @@ -154,7 +159,6 @@ def sample_table_inserted_date( ) return sample_table_inserted_date - @pytest.fixture(scope="class") def glue_database_name(namespace): return f"{namespace}-pytest-database" @@ -184,6 +188,9 @@ def glue_flat_inserted_date_table_name(): def glue_deleted_table_name(glue_flat_table_name): return f"{glue_flat_table_name}_deleted" +def glue_deleted_nested_table_name(glue_nested_table_name) -> str: + return f"{glue_nested_table_name}_deleted" + @pytest.fixture(scope="class") def workspace_key_prefix(namespace, artifact_bucket): @@ -381,6 +388,13 @@ def glue_deleted_table_location(glue_database_path, glue_deleted_table_name): ) return glue_deleted_table_location +@pytest.fixture(scope="class") +def glue_deleted_nested_table_location(glue_database_path, glue_deleted_nested_table_name): + glue_deleted_nested_table_location = ( + os.path.join(glue_database_path, glue_deleted_nested_table_name.replace("_", "=", 1)) + + "/" + ) + return glue_deleted_nested_table_location @pytest.fixture(scope="class") def glue_deleted_table( @@ -415,6 +429,38 @@ def glue_deleted_table( ) return glue_table +@pytest.fixture(scope="class") +def glue_deleted_nested_table( + glue_database_name, glue_deleted_nested_table_name, glue_deleted_nested_table_location +): + glue_client = boto3.client("glue") + glue_table = glue_client.create_table( + DatabaseName=glue_database_name, + TableInput={ + "Name": glue_deleted_nested_table_name, + "Description": "An empty table for pytest unit tests.", + "Retention": 0, + "TableType": "EXTERNAL_TABLE", + "StorageDescriptor": { + "Location": glue_deleted_nested_table_location, + "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "Compressed": False, + "StoredAsSubDirectories": False, + "Columns": [ + {"Name": "GlobalKey", "Type": "string"}, + {"Name": "export_end_date", "Type": "string"}, + ], + }, + "PartitionKeys": [{"Name": "cohort", "Type": "string"}], + "Parameters": { + "classification": "json", + "compressionType": "none", + "typeOfData": "file", + }, + }, + ) + return glue_table def upload_test_data_to_s3(path, s3_bucket, table_location, data_cohort): s3_client = boto3.client("s3") @@ -923,7 +969,7 @@ def test_archive_existing_datasets_delete( Delete={"Objects": [{"Key": obj["Key"]} for obj in archived_objects]}, ) - def test_drop_deleted_healthkit_data( + def test_drop_deleted_healthkit_data_nonempty( self, glue_context, glue_flat_table_name, @@ -949,6 +995,34 @@ def test_drop_deleted_healthkit_data( ) assert table_after_drop.count() == 0 + def test_drop_deleted_healthkit_data_empty( + self, + glue_context, + glue_nested_table_name, + nested_data_type, + glue_database_name, + logger_context, + ): + # We do not upload any data for the deleted nested data type + # So we should receive our nested table back unaltered. + table = json_to_parquet.get_table( + table_name=glue_nested_table_name, + database_name=glue_database_name, + glue_context=glue_context, + record_counts=defaultdict(list), + logger_context=logger_context, + ) + table_after_drop = json_to_parquet.drop_deleted_healthkit_data( + glue_context=glue_context, + table=table.toDF(), + table_name=table.name, + data_type=nested_data_type, + glue_database=glue_database_name, + record_counts=defaultdict(list), + logger_context=logger_context, + ) + assert table_after_drop.count() == table.count() + def test_count_records_for_event_empty_table(self, sample_table, logger_context): spark_sample_table = sample_table.toDF() empty_table = spark_sample_table.filter(spark_sample_table.city == "Atlantis") diff --git a/tests/test_s3_to_json.py b/tests/test_s3_to_json.py index b9d8cde5..3b5a2a63 100644 --- a/tests/test_s3_to_json.py +++ b/tests/test_s3_to_json.py @@ -83,13 +83,17 @@ def json_file_basenames_dict(self): "FitbitSleepLogs": "FitbitSleepLogs_20220111-20230103.json", "GoogleFitSamples": "GoogleFitSamples_20220111-20230103.json", "HealthKitV2ActivitySummaries": "HealthKitV2ActivitySummaries_20220111-20230103.json", + "HealthKitV2ActivitySummaries_Deleted": "HealthKitV2ActivitySummaries_Deleted_20220111-20230103.json", "HealthKitV2Electrocardiogram": "HealthKitV2Electrocardiogram_Samples_20220111-20230103.json", + "HealthKitV2Electrocardiogram_Deleted": "HealthKitV2Electrocardiogram_Samples_Deleted_20220111-20230103.json", "HealthKitV2Heartbeat": "HealthKitV2Heartbeat_Samples_20220401-20230112.json", "HealthKitV2Heartbeat_Deleted": "HealthKitV2Heartbeat_Samples_Deleted_20220401-20230112.json", "HealthKitV2Samples": "HealthKitV2Samples_AbdominalCramps_20220111-20230103.json", "HealthKitV2Samples_Deleted": "HealthKitV2Samples_AbdominalCramps_Deleted_20220111-20230103.json", "HealthKitV2Statistics": "HealthKitV2Statistics_HourlySteps_20201022-20211022.json", + "HealthKitV2Statistics_Deleted": "HealthKitV2Statistics_HourlySteps_Deleted_20201022-20211022.json", "HealthKitV2Workouts": "HealthKitV2Workouts_20220111-20230103.json", + "HealthKitV2Workouts_Deleted": "HealthKitV2Workouts_Deleted_20220111-20230103.json", "SymptomLog": "SymptomLog_20220401-20230112.json", } return json_file_basenames @@ -577,8 +581,12 @@ def test_get_metadata_no_subtype(self, json_file_basenames_dict): subtypes = [ "subtype" in record.keys() for record in metadata - if record["type"] - not in ["HealthKitV2Samples", "HealthKitV2Samples_Deleted", "HealthKitV2Statistics"] + if record["type"] not in [ + "HealthKitV2Samples", + "HealthKitV2Samples_Deleted", + "HealthKitV2Statistics", + "HealthKitV2Statistics_Deleted" + ] ] assert not any(subtypes),\ "Some data types that are not HealthKitV2Samples or HealthKitV2Statistics have the metadata subtype key"