Skip to content

Commit

Permalink
Add corresponding deleted data type for all HealthKit data types
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Jun 3, 2024
1 parent fc290e3 commit e796e1f
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 17 deletions.
11 changes: 9 additions & 2 deletions src/glue/jobs/json_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,12 @@ def drop_deleted_healthkit_data(
deleted_data_type = f"{data_type}_deleted"
try:
glue_client.get_table(DatabaseName=glue_database, Name=deleted_table_name)
except glue_client.exceptions.EntityNotFoundException:
return table
except glue_client.exceptions.EntityNotFoundException as error:
logger.error(
f"Did not find table with name '{deleted_table_name}' ",
f"in database {glue_database}."
)
raise(error)
deleted_table_logger_context = deepcopy(logger_context)
deleted_table_logger_context["labels"]["glue_table_name"] = deleted_table_name
deleted_table_logger_context["labels"]["type"] = deleted_data_type
Expand All @@ -265,6 +269,9 @@ def drop_deleted_healthkit_data(
record_counts=record_counts,
logger_context=deleted_table_logger_context,
)
if deleted_table_raw.count() == 0:
logger.info(f"The table for data type {deleted_data_type} did not contain any records.")
return table
# we use `data_type` rather than `deleted_data_type` here because they share
# an index (we don't bother including `deleted_data_type` in `INDEX_FIELD_MAP`).
deleted_table = drop_table_duplicates(
Expand Down
18 changes: 7 additions & 11 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import ecs_logging
from awsglue.utils import getResolvedOptions

DATA_TYPES_WITH_SUBTYPE = ["HealthKitV2Samples", "HealthKitV2Statistics"]
DATA_TYPES_WITH_SUBTYPE = [
"HealthKitV2Samples",
"HealthKitV2Statistics",
"HealthKitV2Samples_Deleted",
"HealthKitV2Statistics_Deleted"
]

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -761,16 +766,7 @@ def get_metadata(basename: str) -> dict:
datetime.datetime.strptime(basename_components[-1], "%Y%m%d")
if metadata["type"] in DATA_TYPES_WITH_SUBTYPE:
metadata["subtype"] = basename_components[1]
if (
metadata["type"]
in [
"HealthKitV2Samples",
"HealthKitV2Heartbeat",
"HealthKitV2Electrocardiogram",
"HealthKitV2Workouts",
]
and basename_components[-2] == "Deleted"
):
if "HealthKitV2" in metadata["type"] and basename_components[-2] == "Deleted":
metadata["type"] = "{}_Deleted".format(metadata["type"])
return metadata

Expand Down
89 changes: 89 additions & 0 deletions src/glue/resources/table_columns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ tables:
partition_keys:
- Name: cohort
Type: string
HealthKitV2Statistics_Deleted:
columns:
- Name: HealthKitStatisticKey
Type: string
- Name: ParticipantIdentifier
Type: string
- Name: ParticipantID
Type: string
- Name: Type
Type: string
- Name: DeletedDate
Type: string
- Name: export_start_date
Type: string
- Name: export_end_date
Type: string
partition_keys:
- Name: cohort
Type: string
HealthKitV2Samples:
columns:
- Name: HealthKitSampleKey
Expand Down Expand Up @@ -108,6 +127,8 @@ tables:
Type: string
- Name: ParticipantID
Type: string
- Name: Type
Type: string
- Name: DeletedDate
Type: string
- Name: export_start_date
Expand Down Expand Up @@ -150,6 +171,23 @@ tables:
partition_keys:
- Name: cohort
Type: string
HealthKitV2ActivitySummaries_Deleted:
columns:
- Name: HealthKitActivitySummaryKey
Type: string
- Name: ParticipantIdentifier
Type: string
- Name: ParticipantID
Type: string
- Name: DeletedDate
Type: string
- Name: export_start_date
Type: string
- Name: export_end_date
Type: string
partition_keys:
- Name: cohort
Type: string
HealthKitV2Electrocardiogram:
columns:
- Name: HealthKitECGSampleKey
Expand Down Expand Up @@ -189,6 +227,23 @@ tables:
partition_keys:
- Name: cohort
Type: string
HealthKitV2Electrocardiogram_Deleted:
columns:
- Name: HealthKitECGSampleKey
Type: string
- Name: ParticipantIdentifier
Type: string
- Name: ParticipantID
Type: string
- Name: DeletedDate
Type: string
- Name: export_start_date
Type: string
- Name: export_end_date
Type: string
partition_keys:
- Name: cohort
Type: string
HealthKitV2Workouts:
columns:
- Name: HealthKitWorkoutKey
Expand Down Expand Up @@ -220,6 +275,23 @@ tables:
partition_keys:
- Name: cohort
Type: string
HealthKitV2Workouts_Deleted:
columns:
- Name: HealthKitWorkoutKey
Type: string
- Name: ParticipantIdentifier
Type: string
- Name: ParticipantID
Type: string
- Name: DeletedDate
Type: string
- Name: export_start_date
Type: string
- Name: export_end_date
Type: string
partition_keys:
- Name: cohort
Type: string
HealthKitV2Heartbeat:
columns:
- Name: HealthKitHeartbeatSampleKey
Expand Down Expand Up @@ -249,6 +321,23 @@ tables:
partition_keys:
- Name: cohort
Type: string
HealthKitV2Heartbeat_Deleted:
columns:
- Name: HealthKitHeartbeatSampleKey
Type: string
- Name: ParticipantIdentifier
Type: string
- Name: ParticipantID
Type: string
- Name: DeletedDate
Type: string
- Name: export_start_date
Type: string
- Name: export_end_date
Type: string
partition_keys:
- Name: cohort
Type: string
FitbitActivityLogs:
columns:
- Name: ParticipantIdentifier
Expand Down
78 changes: 76 additions & 2 deletions tests/test_json_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ def flat_data_type(glue_flat_table_name):
flat_data_type = glue_flat_table_name.split("_")[1]
return flat_data_type

@pytest.fixture()
def nested_data_type(glue_nested_table_name):
nested_data_type = glue_nested_table_name.split("_")[1]
return nested_data_type

@pytest.fixture()
def sample_table(
spark_session, sample_table_data, glue_context, glue_flat_table_name
Expand Down Expand Up @@ -154,7 +159,6 @@ def sample_table_inserted_date(
)
return sample_table_inserted_date


@pytest.fixture(scope="class")
def glue_database_name(namespace):
return f"{namespace}-pytest-database"
Expand Down Expand Up @@ -184,6 +188,9 @@ def glue_flat_inserted_date_table_name():
def glue_deleted_table_name(glue_flat_table_name):
return f"{glue_flat_table_name}_deleted"

def glue_deleted_nested_table_name(glue_nested_table_name) -> str:
return f"{glue_nested_table_name}_deleted"


@pytest.fixture(scope="class")
def workspace_key_prefix(namespace, artifact_bucket):
Expand Down Expand Up @@ -381,6 +388,13 @@ def glue_deleted_table_location(glue_database_path, glue_deleted_table_name):
)
return glue_deleted_table_location

@pytest.fixture(scope="class")
def glue_deleted_nested_table_location(glue_database_path, glue_deleted_nested_table_name):
glue_deleted_nested_table_location = (
os.path.join(glue_database_path, glue_deleted_nested_table_name.replace("_", "=", 1))
+ "/"
)
return glue_deleted_nested_table_location

@pytest.fixture(scope="class")
def glue_deleted_table(
Expand Down Expand Up @@ -415,6 +429,38 @@ def glue_deleted_table(
)
return glue_table

@pytest.fixture(scope="class")
def glue_deleted_nested_table(
glue_database_name, glue_deleted_nested_table_name, glue_deleted_nested_table_location
):
glue_client = boto3.client("glue")
glue_table = glue_client.create_table(
DatabaseName=glue_database_name,
TableInput={
"Name": glue_deleted_nested_table_name,
"Description": "An empty table for pytest unit tests.",
"Retention": 0,
"TableType": "EXTERNAL_TABLE",
"StorageDescriptor": {
"Location": glue_deleted_nested_table_location,
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"Compressed": False,
"StoredAsSubDirectories": False,
"Columns": [
{"Name": "GlobalKey", "Type": "string"},
{"Name": "export_end_date", "Type": "string"},
],
},
"PartitionKeys": [{"Name": "cohort", "Type": "string"}],
"Parameters": {
"classification": "json",
"compressionType": "none",
"typeOfData": "file",
},
},
)
return glue_table

def upload_test_data_to_s3(path, s3_bucket, table_location, data_cohort):
s3_client = boto3.client("s3")
Expand Down Expand Up @@ -923,7 +969,7 @@ def test_archive_existing_datasets_delete(
Delete={"Objects": [{"Key": obj["Key"]} for obj in archived_objects]},
)

def test_drop_deleted_healthkit_data(
def test_drop_deleted_healthkit_data_nonempty(
self,
glue_context,
glue_flat_table_name,
Expand All @@ -949,6 +995,34 @@ def test_drop_deleted_healthkit_data(
)
assert table_after_drop.count() == 0

def test_drop_deleted_healthkit_data_empty(
self,
glue_context,
glue_nested_table_name,
nested_data_type,
glue_database_name,
logger_context,
):
# We do not upload any data for the deleted nested data type
# So we should receive our nested table back unaltered.
table = json_to_parquet.get_table(
table_name=glue_nested_table_name,
database_name=glue_database_name,
glue_context=glue_context,
record_counts=defaultdict(list),
logger_context=logger_context,
)
table_after_drop = json_to_parquet.drop_deleted_healthkit_data(
glue_context=glue_context,
table=table.toDF(),
table_name=table.name,
data_type=nested_data_type,
glue_database=glue_database_name,
record_counts=defaultdict(list),
logger_context=logger_context,
)
assert table_after_drop.count() == table.count()

def test_count_records_for_event_empty_table(self, sample_table, logger_context):
spark_sample_table = sample_table.toDF()
empty_table = spark_sample_table.filter(spark_sample_table.city == "Atlantis")
Expand Down
12 changes: 10 additions & 2 deletions tests/test_s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,17 @@ def json_file_basenames_dict(self):
"FitbitSleepLogs": "FitbitSleepLogs_20220111-20230103.json",
"GoogleFitSamples": "GoogleFitSamples_20220111-20230103.json",
"HealthKitV2ActivitySummaries": "HealthKitV2ActivitySummaries_20220111-20230103.json",
"HealthKitV2ActivitySummaries_Deleted": "HealthKitV2ActivitySummaries_Deleted_20220111-20230103.json",
"HealthKitV2Electrocardiogram": "HealthKitV2Electrocardiogram_Samples_20220111-20230103.json",
"HealthKitV2Electrocardiogram_Deleted": "HealthKitV2Electrocardiogram_Samples_Deleted_20220111-20230103.json",
"HealthKitV2Heartbeat": "HealthKitV2Heartbeat_Samples_20220401-20230112.json",
"HealthKitV2Heartbeat_Deleted": "HealthKitV2Heartbeat_Samples_Deleted_20220401-20230112.json",
"HealthKitV2Samples": "HealthKitV2Samples_AbdominalCramps_20220111-20230103.json",
"HealthKitV2Samples_Deleted": "HealthKitV2Samples_AbdominalCramps_Deleted_20220111-20230103.json",
"HealthKitV2Statistics": "HealthKitV2Statistics_HourlySteps_20201022-20211022.json",
"HealthKitV2Statistics_Deleted": "HealthKitV2Statistics_HourlySteps_Deleted_20201022-20211022.json",
"HealthKitV2Workouts": "HealthKitV2Workouts_20220111-20230103.json",
"HealthKitV2Workouts_Deleted": "HealthKitV2Workouts_Deleted_20220111-20230103.json",
"SymptomLog": "SymptomLog_20220401-20230112.json",
}
return json_file_basenames
Expand Down Expand Up @@ -577,8 +581,12 @@ def test_get_metadata_no_subtype(self, json_file_basenames_dict):
subtypes = [
"subtype" in record.keys()
for record in metadata
if record["type"]
not in ["HealthKitV2Samples", "HealthKitV2Samples_Deleted", "HealthKitV2Statistics"]
if record["type"] not in [
"HealthKitV2Samples",
"HealthKitV2Samples_Deleted",
"HealthKitV2Statistics",
"HealthKitV2Statistics_Deleted"
]
]
assert not any(subtypes),\
"Some data types that are not HealthKitV2Samples or HealthKitV2Statistics have the metadata subtype key"
Expand Down

0 comments on commit e796e1f

Please sign in to comment.