Skip to content

Commit

Permalink
Merge pull request #7 from pranavanba/get-filtering-params-from-synapse
Browse files Browse the repository at this point in the history
RMHDR-200 Get filtering params from synapse
  • Loading branch information
pranavanba authored Nov 1, 2023
2 parents 04fd456 + 1da30bd commit 1222b8c
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 17 deletions.
70 changes: 53 additions & 17 deletions filtering.R
Original file line number Diff line number Diff line change
@@ -1,37 +1,73 @@
# Calculate age from DoB -----------------------------------------------------
dob2age <- function(dataset, column, output=PARQUET_FILTERED_LOCATION) {
if (dataset %in% list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, full.names = F)) {
input_path <- paste0(AWS_PARQUET_DOWNLOAD_LOCATION, '/', dataset)
# Functions ---------------------------------------------------------------
#' Calculate age from Date of Birth
#'
#' This function calculates the age of individuals based on their date of birth.
#'
#' @param dataset The name of the dataset to process.
#' @param column The name of the column in the dataset that contains Date of Birth (DoB) information.
#' @param input The location where the Parquet dataset is stored. Default is AWS_PARQUET_DOWNLOAD_LOCATION.
#'
#' @return None (invisibly returns the filtered dataset)
#'
#' @examples
#' dob2age("my_dataset", "date_of_birth_column")
#'
dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, partitions = NULL) {
if (dataset %in% list.dirs(input, full.names = F)) {
input_path <- paste0(input, '/', dataset)

arrow::open_dataset(sources = input_path) %>%
dplyr::mutate(age = lubridate::year(lubridate::today())-lubridate::year(lubridate::as_date(!!sym(column)))) %>%
arrow::write_dataset(path = input_path,
max_rows_per_file = 100000,
partitioning = c('cohort'),
partitioning = partitions,
existing_data_behavior = 'delete_matching')
}
}

dob2age("dataset_enrolledparticipants", "DateOfBirth")

# Drop columns with potentially identifying info --------------------------
drop_cols_datasets <- function(dataset, columns=c(), output=PARQUET_FILTERED_LOCATION) {
if (dataset %in% list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION, full.names = F)) {
input_path <- paste0(AWS_PARQUET_DOWNLOAD_LOCATION, '/', dataset)
#' Drop columns with potentially identifying information
#'
#' This function removes specified columns from a dataset to eliminate potentially identifying information.
#'
#' @param dataset The name of the dataset to process.
#' @param columns A character vector of column names to be dropped from the dataset.
#' @param input The location where the Parquet dataset is stored. Default is AWS_PARQUET_DOWNLOAD_LOCATION.
#' @param output The location where the filtered Parquet dataset will be saved. Default is PARQUET_FILTERED_LOCATION.
#'
#' @return None (invisibly returns the filtered dataset)
#'
#' @examples
#' drop_cols_datasets("my_dataset", c("column1", "column2"), input = "./temp1", output = "./temp2")
#'
# Drop columns with potentially identifying info
drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOAD_LOCATION, output=PARQUET_FILTERED_LOCATION, partitions = NULL) {
if (dataset %in% list.dirs(input, full.names = F)) {
input_path <- paste0(input, '/', dataset)
final_path <- paste0(output, '/', dataset, '/')

arrow::open_dataset(sources = input_path) %>%
dplyr::select(!columns) %>%
arrow::write_dataset(path = final_path,
max_rows_per_file = 100000,
partitioning = c('cohort'),
partitioning = partitions,
existing_data_behavior = 'delete_matching')
}
}

# unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T)

lapply(seq_along(datasets_to_filter), function(i) {
cat("Dropping", cols_to_drop[[i]], "from", datasets_to_filter[i], "\n")
drop_cols_datasets(dataset = datasets_to_filter[i], columns = cols_to_drop[[i]])
}) %>% invisible()
# Filtering ---------------------------------------------------------------
dob2age("dataset_enrolledparticipants", "DateOfBirth")

unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T)

synLogin()

pii_to_drop <- synGet('syn52523394')$path %>% read.csv()

tmp <-
lapply(seq_len(nrow(pii_to_drop)), function(i) {
cat(i, "Dropping", pii_to_drop$column_to_be_dropped[[i]], "from", pii_to_drop$dataset[[i]], "\n")
drop_cols_datasets(dataset = pii_to_drop$dataset[[i]], columns = pii_to_drop$column_to_be_dropped[[i]])
})

rm(pii_to_drop)
38 changes: 38 additions & 0 deletions tests/testthat/test-filtering.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
library(testthat)

test_that("dob2age correctly calculates age from Date of Birth", {
test_dataset <- data.frame(
date_of_birth = as.Date(c("1990-01-15", "1985-05-10", "2000-12-30"))
)

arrow::write_dataset(test_dataset, path = "test_dob2age")

dob2age("test_dob2age", "date_of_birth", input = ".", partitions = NULL)

modified_dataset <- arrow::open_dataset("test_dob2age")

expect_true("age" %in% names(modified_dataset))
expect_equal((modified_dataset %>% collect %>% pull(age)), c(33, 38, 23))

unlink("test_dob2age/part-0.parquet")
})

test_that("drop_cols_datasets correctly drops specified columns", {
test_dataset <- data.frame(
column1 = c(1, 2, 3),
column2 = c("A", "B", "C"),
column3 = c(0.1, 0.2, 0.3)
)

arrow::write_dataset(test_dataset, path = "test_drop_cols")

columns_to_drop <- c("column1", "column2")

drop_cols_datasets("test_drop_cols", columns = columns_to_drop, input = ".", output = ".")

modified_dataset <- arrow::open_dataset("test_drop_cols")

expect_true(all(!names(modified_dataset) %in% columns_to_drop))

unlink("test_drop_cols/part-0.parquet")
})

0 comments on commit 1222b8c

Please sign in to comment.