From ca472e5ee187a188f0217ce1ef21400a106bf078 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Wed, 31 Jan 2024 22:48:29 +0000 Subject: [PATCH] Only run withdraw participants code if there are participants to withdraw; write modified datasets to new temp location --- config.yml | 1 + remove_withdrawn_participants.R | 105 +++++++++++++++++--------------- 2 files changed, 57 insertions(+), 49 deletions(-) diff --git a/config.yml b/config.yml index eb14883..48d48a8 100644 --- a/config.yml +++ b/config.yml @@ -22,6 +22,7 @@ prod: PII_COLS_TO_DROP: syn52523394 DEID_VALS_TO_REVIEW: syn52409518 STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION: ./temp_staging_to_archive/ + POST_WITHDRAW_LOCATION: ./temp_post_withdraw staging: inherits: prod diff --git a/remove_withdrawn_participants.R b/remove_withdrawn_participants.R index bed4329..3bf642b 100644 --- a/remove_withdrawn_participants.R +++ b/remove_withdrawn_participants.R @@ -44,53 +44,60 @@ participants_to_withdraw <- dplyr::pull(ParticipantIdentifier) %>% unique() -# Store list of datasets that do not contain ParticipantIdentifier column -contains_pid_false <- - sapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = F), function(x) { - grepl("ParticipantIdentifier", open_dataset(x)$metadata$org.apache.spark.sql.parquet.row.metadata) - }) %>% - tibble::enframe() %>% - dplyr::filter(value==FALSE) %>% - dplyr::select(name) - -# Store mapping ID var name for corresponding datasets -contains_pid_false$mappingID <- - dplyr::case_when( - grepl("fitbitsleeplogs", contains_pid_false$name) == TRUE ~ "LogId", - grepl("healthkitv2electrocardiogram", contains_pid_false$name) == TRUE ~ "HealthKitECGSampleKey", - grepl("healthkitv2heartbeat", contains_pid_false$name) == TRUE ~ "HealthKitHeartbeatSampleKey", - grepl("healthkitv2workout", contains_pid_false$name) == TRUE ~ "HealthKitWorkoutKey", - grepl("symptomlog_value", contains_pid_false$name) == TRUE ~ "DataPointKey" - ) - -# Get values of mapping ID vars for participants to withdraw -contains_pid_false$participants_to_withdraw <- - dplyr::case_when( - grepl("fitbitsleeplogs", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_fitbitsleeplogs", "LogId")), - grepl("healthkitv2electrocardiogram", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2electrocardiogram", "HealthKitECGSampleKey")), - grepl("healthkitv2heartbeat", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2heartbeat", "HealthKitHeartbeatSampleKey")), - grepl("healthkitv2workout", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2workouts", "HealthKitWorkoutKey")), - grepl("symptomlog_value", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_symptomlog", "DataPointKey")) - ) - -# Remove data for withdrawn participants from parquet datasets based on mapping ID variables -lapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = F), function(x) { - if (x %in% contains_pid_false$name) { - tmpret <- unlist(contains_pid_false$participants_to_withdraw[x == contains_pid_false$name]) - d <- - arrow::open_dataset(x) %>% - filter(!(!!(as.symbol(contains_pid_false$mappingID[x == contains_pid_false$name]))) %in% tmpret) - } else { - d <- - arrow::open_dataset(x) %>% - filter(!ParticipantIdentifier %in% participants_to_withdraw) - } - d %>% - arrow::write_dataset( - path = x, - max_rows_per_file = 100000, - partitioning = "cohort", - existing_data_behavior = 'delete_matching', - basename_template = paste0("part-0000{i}.", as.character("parquet")) +if (length(participants_to_withdraw) > 0) { + # Store list of datasets that do not contain ParticipantIdentifier column + contains_pid_false <- + sapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = F), function(x) { + grepl("ParticipantIdentifier", open_dataset(x)$metadata$org.apache.spark.sql.parquet.row.metadata) + }) %>% + tibble::enframe() %>% + dplyr::filter(value==FALSE) %>% + dplyr::select(name) + + # Store mapping ID var name for corresponding datasets + contains_pid_false$mappingID <- + dplyr::case_when( + grepl("fitbitsleeplogs", contains_pid_false$name) == TRUE ~ "LogId", + grepl("healthkitv2electrocardiogram", contains_pid_false$name) == TRUE ~ "HealthKitECGSampleKey", + grepl("healthkitv2heartbeat", contains_pid_false$name) == TRUE ~ "HealthKitHeartbeatSampleKey", + grepl("healthkitv2workout", contains_pid_false$name) == TRUE ~ "HealthKitWorkoutKey", + grepl("symptomlog_value", contains_pid_false$name) == TRUE ~ "DataPointKey" ) -}) + + # Get values of mapping ID vars for participants to withdraw + contains_pid_false$participants_to_withdraw <- + dplyr::case_when( + grepl("fitbitsleeplogs", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_fitbitsleeplogs", "LogId")), + grepl("healthkitv2electrocardiogram", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2electrocardiogram", "HealthKitECGSampleKey")), + grepl("healthkitv2heartbeat", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2heartbeat", "HealthKitHeartbeatSampleKey")), + grepl("healthkitv2workout", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2workouts", "HealthKitWorkoutKey")), + grepl("symptomlog_value", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_symptomlog", "DataPointKey")) + ) + + # Remove data for withdrawn participants from parquet datasets based on mapping ID variables + lapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = F), function(x) { + if (x %in% contains_pid_false$name) { + tmpret <- unlist(contains_pid_false$participants_to_withdraw[x == contains_pid_false$name]) + d <- + arrow::open_dataset(x) %>% + filter(!(!!(as.symbol(contains_pid_false$mappingID[x == contains_pid_false$name]))) %in% tmpret) + } else { + d <- + arrow::open_dataset(x) %>% + filter(!ParticipantIdentifier %in% participants_to_withdraw) + } + d %>% + arrow::write_dataset( + path = file.path(POST_WITHDRAW_LOCATION, basename(x)), + max_rows_per_file = 5000000, + partitioning = "cohort", + existing_data_behavior = 'delete_matching', + basename_template = paste0("part-0000{i}.", as.character("parquet")) + ) + }) + + unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) + system(glue::glue("cp -r {POST_WITHDRAW_LOCATION} {AWS_PARQUET_DOWNLOAD_LOCATION}")) + unlink(POST_WITHDRAW_LOCATION, recursive = T, force = T) +} +