Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditionally run withdrawal code #17

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ prod:
PII_COLS_TO_DROP: syn52523394
DEID_VALS_TO_REVIEW: syn52409518
STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION: ./temp_staging_to_archive/
POST_WITHDRAW_LOCATION: ./temp_post_withdraw

staging:
inherits: prod
Expand Down
105 changes: 56 additions & 49 deletions remove_withdrawn_participants.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,53 +44,60 @@ participants_to_withdraw <-
dplyr::pull(ParticipantIdentifier) %>%
unique()

# Store list of datasets that do not contain ParticipantIdentifier column
contains_pid_false <-
sapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = F), function(x) {
grepl("ParticipantIdentifier", open_dataset(x)$metadata$org.apache.spark.sql.parquet.row.metadata)
}) %>%
tibble::enframe() %>%
dplyr::filter(value==FALSE) %>%
dplyr::select(name)

# Store mapping ID var name for corresponding datasets
contains_pid_false$mappingID <-
dplyr::case_when(
grepl("fitbitsleeplogs", contains_pid_false$name) == TRUE ~ "LogId",
grepl("healthkitv2electrocardiogram", contains_pid_false$name) == TRUE ~ "HealthKitECGSampleKey",
grepl("healthkitv2heartbeat", contains_pid_false$name) == TRUE ~ "HealthKitHeartbeatSampleKey",
grepl("healthkitv2workout", contains_pid_false$name) == TRUE ~ "HealthKitWorkoutKey",
grepl("symptomlog_value", contains_pid_false$name) == TRUE ~ "DataPointKey"
)

# Get values of mapping ID vars for participants to withdraw
contains_pid_false$participants_to_withdraw <-
dplyr::case_when(
grepl("fitbitsleeplogs", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_fitbitsleeplogs", "LogId")),
grepl("healthkitv2electrocardiogram", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2electrocardiogram", "HealthKitECGSampleKey")),
grepl("healthkitv2heartbeat", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2heartbeat", "HealthKitHeartbeatSampleKey")),
grepl("healthkitv2workout", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2workouts", "HealthKitWorkoutKey")),
grepl("symptomlog_value", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_symptomlog", "DataPointKey"))
)

# Remove data for withdrawn participants from parquet datasets based on mapping ID variables
lapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = F), function(x) {
if (x %in% contains_pid_false$name) {
tmpret <- unlist(contains_pid_false$participants_to_withdraw[x == contains_pid_false$name])
d <-
arrow::open_dataset(x) %>%
filter(!(!!(as.symbol(contains_pid_false$mappingID[x == contains_pid_false$name]))) %in% tmpret)
} else {
d <-
arrow::open_dataset(x) %>%
filter(!ParticipantIdentifier %in% participants_to_withdraw)
}
d %>%
arrow::write_dataset(
path = x,
max_rows_per_file = 100000,
partitioning = "cohort",
existing_data_behavior = 'delete_matching',
basename_template = paste0("part-0000{i}.", as.character("parquet"))
if (length(participants_to_withdraw) > 0) {
# Store list of datasets that do not contain ParticipantIdentifier column
contains_pid_false <-
sapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = F), function(x) {
grepl("ParticipantIdentifier", open_dataset(x)$metadata$org.apache.spark.sql.parquet.row.metadata)
}) %>%
tibble::enframe() %>%
dplyr::filter(value==FALSE) %>%
dplyr::select(name)

# Store mapping ID var name for corresponding datasets
contains_pid_false$mappingID <-
dplyr::case_when(
grepl("fitbitsleeplogs", contains_pid_false$name) == TRUE ~ "LogId",
grepl("healthkitv2electrocardiogram", contains_pid_false$name) == TRUE ~ "HealthKitECGSampleKey",
grepl("healthkitv2heartbeat", contains_pid_false$name) == TRUE ~ "HealthKitHeartbeatSampleKey",
grepl("healthkitv2workout", contains_pid_false$name) == TRUE ~ "HealthKitWorkoutKey",
grepl("symptomlog_value", contains_pid_false$name) == TRUE ~ "DataPointKey"
)
})

# Get values of mapping ID vars for participants to withdraw
contains_pid_false$participants_to_withdraw <-
dplyr::case_when(
grepl("fitbitsleeplogs", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_fitbitsleeplogs", "LogId")),
grepl("healthkitv2electrocardiogram", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2electrocardiogram", "HealthKitECGSampleKey")),
grepl("healthkitv2heartbeat", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2heartbeat", "HealthKitHeartbeatSampleKey")),
grepl("healthkitv2workout", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_healthkitv2workouts", "HealthKitWorkoutKey")),
grepl("symptomlog_value", contains_pid_false$name) == TRUE ~ list(get_mappingID_vals_to_withdraw("dataset_symptomlog", "DataPointKey"))
)

# Remove data for withdrawn participants from parquet datasets based on mapping ID variables
lapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = F), function(x) {
if (x %in% contains_pid_false$name) {
tmpret <- unlist(contains_pid_false$participants_to_withdraw[x == contains_pid_false$name])
d <-
arrow::open_dataset(x) %>%
filter(!(!!(as.symbol(contains_pid_false$mappingID[x == contains_pid_false$name]))) %in% tmpret)
} else {
d <-
arrow::open_dataset(x) %>%
filter(!ParticipantIdentifier %in% participants_to_withdraw)
}
d %>%
arrow::write_dataset(
path = file.path(POST_WITHDRAW_LOCATION, basename(x)),
max_rows_per_file = 5000000,
partitioning = "cohort",
existing_data_behavior = 'delete_matching',
basename_template = paste0("part-0000{i}.", as.character("parquet"))
)
})

unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T)
system(glue::glue("cp -r {POST_WITHDRAW_LOCATION} {AWS_PARQUET_DOWNLOAD_LOCATION}"))
unlink(POST_WITHDRAW_LOCATION, recursive = T, force = T)
}

Loading