From acbd51f9af7b383d3137812795c6dcfc1c08a05a Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Thu, 26 Oct 2023 22:16:04 +0000 Subject: [PATCH 01/10] Get cols to drop from Synapse instead of params.R --- filtering.R | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/filtering.R b/filtering.R index a96a935..c5452c8 100644 --- a/filtering.R +++ b/filtering.R @@ -31,7 +31,13 @@ drop_cols_datasets <- function(dataset, columns=c(), output=PARQUET_FILTERED_LOC # unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T) -lapply(seq_along(datasets_to_filter), function(i) { - cat("Dropping", cols_to_drop[[i]], "from", datasets_to_filter[i], "\n") - drop_cols_datasets(dataset = datasets_to_filter[i], columns = cols_to_drop[[i]]) -}) %>% invisible() +synLogin() + +pii_to_drop <- synGet('syn52523394')$path %>% read.csv() + +lapply(seq_len(nrow(pii_to_drop)), function(i) { + cat("Dropping", pii_to_drop$column_to_be_dropped[[i]], "from", pii_to_drop$dataset[[i]], "\n") + drop_cols_datasets(dataset = pii_to_drop$dataset[[i]], columns = pii_to_drop$column_to_be_dropped[[i]]) +}) + +rm(pii_to_drop) From 07fbb31b22fec41388a48a32cba350eeb84aa9de Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Thu, 26 Oct 2023 22:17:42 +0000 Subject: [PATCH 02/10] Use input variable instead of global variable in function --- filtering.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/filtering.R b/filtering.R index c5452c8..e2ec964 100644 --- a/filtering.R +++ b/filtering.R @@ -15,9 +15,9 @@ dob2age <- function(dataset, column, output=PARQUET_FILTERED_LOCATION) { dob2age("dataset_enrolledparticipants", "DateOfBirth") # Drop columns with potentially identifying info -------------------------- -drop_cols_datasets <- function(dataset, columns=c(), output=PARQUET_FILTERED_LOCATION) { - if (dataset %in% list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, full.names = F)) { - input_path <- paste0(AWS_PARQUET_DOWNLOAD_LOCATION, '/', dataset) +drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION) { + if (dataset %in% list.dirs(input, full.names = F)) { + input_path <- paste0(input, '/', dataset) final_path <- paste0(output, '/', dataset, '/') arrow::open_dataset(sources = input_path) %>% From 856f90050c5a0adcfbdc7ec5204ec6582c5b1488 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Thu, 26 Oct 2023 22:18:33 +0000 Subject: [PATCH 03/10] Unlink PARQUET_FILTERED_LOCATION prior to applying filtering --- filtering.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filtering.R b/filtering.R index e2ec964..097f6ac 100644 --- a/filtering.R +++ b/filtering.R @@ -29,7 +29,7 @@ drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOA } } -# unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T) +unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T) synLogin() From 374ed294cb2d45d3a1d7f1c652675efa491355d2 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Thu, 26 Oct 2023 22:42:12 +0000 Subject: [PATCH 04/10] Use input variable in function instead of global variable --- filtering.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/filtering.R b/filtering.R index 097f6ac..c4ef27c 100644 --- a/filtering.R +++ b/filtering.R @@ -1,7 +1,7 @@ # Calculate age from DoB ----------------------------------------------------- -dob2age <- function(dataset, column, output=PARQUET_FILTERED_LOCATION) { - if (dataset %in% list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, full.names = F)) { - input_path <- paste0(AWS_PARQUET_DOWNLOAD_LOCATION, '/', dataset) +dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION) { + if (dataset %in% list.dirs(input, full.names = F)) { + input_path <- paste0(input, '/', dataset) arrow::open_dataset(sources = input_path) %>% dplyr::mutate(age = lubridate::year(lubridate::today())-lubridate::year(lubridate::as_date(!!sym(column)))) %>% From 2b53da392edce73a77a3427f5dd9feb34a18d8ad Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Thu, 26 Oct 2023 22:43:48 +0000 Subject: [PATCH 05/10] Organize code --- filtering.R | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/filtering.R b/filtering.R index c4ef27c..56281b3 100644 --- a/filtering.R +++ b/filtering.R @@ -1,4 +1,5 @@ -# Calculate age from DoB ----------------------------------------------------- +# Functions --------------------------------------------------------------- +# Calculate age from DoB dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION) { if (dataset %in% list.dirs(input, full.names = F)) { input_path <- paste0(input, '/', dataset) @@ -12,9 +13,7 @@ dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, outp } } -dob2age("dataset_enrolledparticipants", "DateOfBirth") - -# Drop columns with potentially identifying info -------------------------- +# Drop columns with potentially identifying info drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION) { if (dataset %in% list.dirs(input, full.names = F)) { input_path <- paste0(input, '/', dataset) @@ -29,6 +28,10 @@ drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOA } } + +# Filtering --------------------------------------------------------------- +dob2age("dataset_enrolledparticipants", "DateOfBirth") + unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T) synLogin() From c11552d21c801bfac79278ca708f0c3ae99827d8 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Thu, 26 Oct 2023 22:53:55 +0000 Subject: [PATCH 06/10] Add roxygen2 style documentation for functions --- filtering.R | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/filtering.R b/filtering.R index 56281b3..f1126de 100644 --- a/filtering.R +++ b/filtering.R @@ -1,5 +1,18 @@ # Functions --------------------------------------------------------------- -# Calculate age from DoB +#' Calculate age from Date of Birth +#' +#' This function calculates the age of individuals based on their date of birth. +#' +#' @param dataset The name of the dataset to process. +#' @param column The name of the column in the dataset that contains Date of Birth (DoB) information. +#' @param input The location where the Parquet dataset is stored. Default is AWS_PARQUET_DOWNLOAD_LOCATION. +#' @param output The location where the filtered Parquet dataset will be saved. Default is PARQUET_FILTERED_LOCATION. +#' +#' @return None (invisibly returns the filtered dataset) +#' +#' @examples +#' dob2age("my_dataset", "date_of_birth_column") +#' dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION) { if (dataset %in% list.dirs(input, full.names = F)) { input_path <- paste0(input, '/', dataset) @@ -13,6 +26,20 @@ dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, outp } } +#' Drop columns with potentially identifying information +#' +#' This function removes specified columns from a dataset to eliminate potentially identifying information. +#' +#' @param dataset The name of the dataset to process. +#' @param columns A character vector of column names to be dropped from the dataset. +#' @param input The location where the Parquet dataset is stored. Default is AWS_PARQUET_DOWNLOAD_LOCATION. +#' @param output The location where the filtered Parquet dataset will be saved. Default is PARQUET_FILTERED_LOCATION. +#' +#' @return None (invisibly returns the filtered dataset) +#' +#' @examples +#' drop_cols_datasets("my_dataset", c("column1", "column2")) +#' # Drop columns with potentially identifying info drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION) { if (dataset %in% list.dirs(input, full.names = F)) { From b9aedd045385cb42d098a8fc294e0c8064e3115a Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Thu, 26 Oct 2023 23:30:12 +0000 Subject: [PATCH 07/10] Store lapply output for neater output/printing --- filtering.R | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/filtering.R b/filtering.R index f1126de..2ad639e 100644 --- a/filtering.R +++ b/filtering.R @@ -38,7 +38,7 @@ dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, outp #' @return None (invisibly returns the filtered dataset) #' #' @examples -#' drop_cols_datasets("my_dataset", c("column1", "column2")) +#' drop_cols_datasets("my_dataset", c("column1", "column2"), input = "./temp1", output = "./temp2") #' # Drop columns with potentially identifying info drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION) { @@ -65,9 +65,10 @@ synLogin() pii_to_drop <- synGet('syn52523394')$path %>% read.csv() -lapply(seq_len(nrow(pii_to_drop)), function(i) { - cat("Dropping", pii_to_drop$column_to_be_dropped[[i]], "from", pii_to_drop$dataset[[i]], "\n") - drop_cols_datasets(dataset = pii_to_drop$dataset[[i]], columns = pii_to_drop$column_to_be_dropped[[i]]) -}) +tmp <- + lapply(seq_len(nrow(pii_to_drop)), function(i) { + cat(i, "Dropping", pii_to_drop$column_to_be_dropped[[i]], "from", pii_to_drop$dataset[[i]], "\n") + drop_cols_datasets(dataset = pii_to_drop$dataset[[i]], columns = pii_to_drop$column_to_be_dropped[[i]]) + }) rm(pii_to_drop) From 45f80dda11e9bce347b4631f4f85b6241976bf68 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Thu, 26 Oct 2023 23:33:37 +0000 Subject: [PATCH 08/10] Initial commit --- tests/testthat/test-filtering.R | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/testthat/test-filtering.R diff --git a/tests/testthat/test-filtering.R b/tests/testthat/test-filtering.R new file mode 100644 index 0000000..e69de29 From 8bfb6074f3c6ce51adcdccf7711756e13f63f451 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 27 Oct 2023 00:15:41 +0000 Subject: [PATCH 09/10] Use partitions argument in function defitions --- filtering.R | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/filtering.R b/filtering.R index 2ad639e..a8397d0 100644 --- a/filtering.R +++ b/filtering.R @@ -6,14 +6,13 @@ #' @param dataset The name of the dataset to process. #' @param column The name of the column in the dataset that contains Date of Birth (DoB) information. #' @param input The location where the Parquet dataset is stored. Default is AWS_PARQUET_DOWNLOAD_LOCATION. -#' @param output The location where the filtered Parquet dataset will be saved. Default is PARQUET_FILTERED_LOCATION. #' #' @return None (invisibly returns the filtered dataset) #' #' @examples #' dob2age("my_dataset", "date_of_birth_column") #' -dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION) { +dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, partitions = NULL) { if (dataset %in% list.dirs(input, full.names = F)) { input_path <- paste0(input, '/', dataset) @@ -21,7 +20,7 @@ dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, outp dplyr::mutate(age = lubridate::year(lubridate::today())-lubridate::year(lubridate::as_date(!!sym(column)))) %>% arrow::write_dataset(path = input_path, max_rows_per_file = 100000, - partitioning = c('cohort'), + partitioning = partitions, existing_data_behavior = 'delete_matching') } } @@ -41,7 +40,7 @@ dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, outp #' drop_cols_datasets("my_dataset", c("column1", "column2"), input = "./temp1", output = "./temp2") #' # Drop columns with potentially identifying info -drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION) { +drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION, partitions = NULL) { if (dataset %in% list.dirs(input, full.names = F)) { input_path <- paste0(input, '/', dataset) final_path <- paste0(output, '/', dataset, '/') @@ -50,7 +49,7 @@ drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOA dplyr::select(!columns) %>% arrow::write_dataset(path = final_path, max_rows_per_file = 100000, - partitioning = c('cohort'), + partitioning = partitions, existing_data_behavior = 'delete_matching') } } From 1da30bd0096d6fc2f70b70ec046873c1e5074d6a Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 27 Oct 2023 00:15:54 +0000 Subject: [PATCH 10/10] Add unit tests for functions --- tests/testthat/test-filtering.R | 38 +++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/testthat/test-filtering.R b/tests/testthat/test-filtering.R index e69de29..f3c7cca 100644 --- a/tests/testthat/test-filtering.R +++ b/tests/testthat/test-filtering.R @@ -0,0 +1,38 @@ +library(testthat) + +test_that("dob2age correctly calculates age from Date of Birth", { + test_dataset <- data.frame( + date_of_birth = as.Date(c("1990-01-15", "1985-05-10", "2000-12-30")) + ) + + arrow::write_dataset(test_dataset, path = "test_dob2age") + + dob2age("test_dob2age", "date_of_birth", input = ".", partitions = NULL) + + modified_dataset <- arrow::open_dataset("test_dob2age") + + expect_true("age" %in% names(modified_dataset)) + expect_equal((modified_dataset %>% collect %>% pull(age)), c(33, 38, 23)) + + unlink("test_dob2age/part-0.parquet") +}) + +test_that("drop_cols_datasets correctly drops specified columns", { + test_dataset <- data.frame( + column1 = c(1, 2, 3), + column2 = c("A", "B", "C"), + column3 = c(0.1, 0.2, 0.3) + ) + + arrow::write_dataset(test_dataset, path = "test_drop_cols") + + columns_to_drop <- c("column1", "column2") + + drop_cols_datasets("test_drop_cols", columns = columns_to_drop, input = ".", output = ".") + + modified_dataset <- arrow::open_dataset("test_drop_cols") + + expect_true(all(!names(modified_dataset) %in% columns_to_drop)) + + unlink("test_drop_cols/part-0.parquet") +})