Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor into proper package and use furrr package #121

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
8 changes: 6 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ License: MIT + file LICENSE
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3
Depends:
Depends:
ParallelLogger
Imports:
data.table,
Expand All @@ -25,7 +25,11 @@ Imports:
tidyr,
yaml,
zoo,
arrow
arrow,
furrr,
progressr,
tictoc,
magrittr
Suggests:
testthat (>= 3.0.0)
Config/testthat/edition: 3
Expand Down
18 changes: 18 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Generated by roxygen2: do not edit by hand

export("%>%")
export(calculate_most_frequent)
export(convert_to)
export(correct_decimal_sign)
export(create_table_longitudinal_data)
export(create_table_patient_data_monthly)
export(create_table_patient_data_static)
export(create_table_product_data)
export(cut_numeric_value)
export(export_data_as_parquet)
export(extract_patient_data)
export(extract_unit_capacity)
export(extract_year_from_age)
Expand All @@ -14,15 +20,27 @@ export(fix_id)
export(fix_sex)
export(fix_t1d_diagnosis_age)
export(fix_testing_frequency)
export(get_allowed_provinces)
export(get_files)
export(get_synonyms)
export(get_tracker_year)
export(harmonize_patient_data_columns)
export(id_2_county_hospisal)
export(init_paths)
export(link_product_patient)
export(process_patient_data)
export(process_patient_file)
export(process_product_data)
export(process_product_file)
export(process_tracker_file)
export(read_cleaned_patient_data)
export(read_column_synonyms)
export(report_empty_intersections)
export(sanitize_str)
export(select_A4D_directory)
export(set_a4d_data_root)
export(setup_file_logger)
export(setup_logger)
export(with_file_logger)
importFrom(data.table,"%like%")
importFrom(magrittr,"%>%")
2 changes: 2 additions & 0 deletions R/create_table_patient_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#' @param patient_data_files list of CSV files with cleaned patient data from step 2.
#' @param input_root root directory of the input CSV files.
#' @param output_root root directory of the output folder.
#'
#' @export
create_table_patient_data_monthly <- function(patient_data_files, input_root, output_root) {
logInfo("Start creating single csv for table patient_data_monthly.")

Expand Down
2 changes: 2 additions & 0 deletions R/create_table_patient_data_changes_only.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#' @param output_root root directory of the output folder.
#' @param variable name of the column that should be exported.
#' @param name name used to create the export file name.
#'
#' @export
create_table_longitudinal_data <-
function(patient_data_files,
input_root,
Expand Down
2 changes: 2 additions & 0 deletions R/create_table_patient_data_static.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#' @param patient_data_files list of CSV files with cleaned patient data from step 2.
#' @param input_root root directory of the input CSV files.
#' @param output_root root directory of the output folder.
#'
#' @export
create_table_patient_data_static <- function(patient_data_files, input_root, output_root) {
logInfo("Start creating single csv for table patient_data_static.")

Expand Down
1 change: 1 addition & 0 deletions R/create_table_product_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#'
#' @return This function does not return a value. It writes the merged data to a new CSV file
#' (with reordered columns according to the list of fields) in the output_root directory.
#' @export
#'
#' @examples
#' \dontrun{
Expand Down
7 changes: 6 additions & 1 deletion R/helper_main.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#' @param delete If TRUE, delete all files under output.
#'
#' @return A list with tracker_root_path and output_root path
#' @export
init_paths <- function(names, output_dir_name = "output", delete = FALSE) {
paths <- list()
tracker_root_path <- select_A4D_directory()
Expand Down Expand Up @@ -57,6 +58,7 @@ init_paths <- function(names, output_dir_name = "output", delete = FALSE) {
#' @param pattern The search pattern to filter files.
#'
#' @return A vector with file names.
#' @export
get_files <- function(tracker_root, pattern = "\\.xlsx$") {
tracker_files <- list.files(path = tracker_root, recursive = T, pattern = pattern)
tracker_files <-
Expand All @@ -70,6 +72,7 @@ get_files <- function(tracker_root, pattern = "\\.xlsx$") {
#' Read in all defined synonyms from the YAML files inside the synonyms folder.
#'
#' @return A list with both patient and product data synonyms as tibble.
#' @export
get_synonyms <- function() {
## Extract synonyms for products and patients
## If you encounter new columns, just add the synonyms to these YAML files
Expand Down Expand Up @@ -173,6 +176,7 @@ export_data <- function(data, filename, output_root, suffix) {
#' suffix = "_product_data"
#' )
#' }
#' @export
export_data_as_parquet <- function(data, filename, output_root, suffix) {
logDebug("Start export_data. Suffix = ", suffix, ".")
data %>%
Expand Down Expand Up @@ -213,8 +217,9 @@ read_raw_csv <- function(file) {
#' Read in all provinces from a YAML file inside the provinces folder.
#'
#' @return A named character vector with all allowed provinces.
#' @export
get_allowed_provinces <- function() {
## Should new countries and provinces be added, update the YAML file
provinces <- yaml::read_yaml("reference_data/provinces/allowed_provinces.yaml") %>% unlist()
provinces <- yaml::read_yaml("reference_data/provinces/allowed_provinces.yaml") |> unlist()
return(provinces)
}
151 changes: 151 additions & 0 deletions R/helper_script_1.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#' @title Process a single tracker file and extract patient and product data.
#'
#' @param tracker_file File name of the tracker.
#' @param paths a list with the paths to the tracker root dir, the patient and product output dir and the root output dir.
#' @param synonyms a list with the synonyms for patient and product data header names.
#'
#' @export
process_tracker_file <- function(tracker_file, paths, synonyms) {
tracker_name <- tools::file_path_sans_ext(basename(tracker_file))
tracker_data_file <-
file.path(paths$tracker_root, tracker_file)

logDebug("Start process_tracker_file.")
logInfo(
"Current file: ",
tracker_name
)

logfile <- paste0(tracker_name, "_", "patient")
with_file_logger(logfile,
{
tryCatch(
process_patient_data(
tracker_name = tracker_name,
tracker_data_file = tracker_data_file,
output_root = paths$patient_data_raw,
synonyms_patient = synonyms$patient
),
error = function(e) {
logError("Could not process patient data. Error = ", e$message, ".")
},
warning = function(w) {
logWarn("Could not process patient data. Warning = ", w$message, ".")
}
)
},
output_root = paths$output_root
)

logfile <- paste0(tracker_name, "_", "product")

with_file_logger(logfile,
{
tryCatch(
process_product_data(
tracker_name = tracker_name,
tracker_data_file = tracker_data_file,
output_root = paths$product_data_raw,
synonyms_product = synonyms$product
),
error = function(e) {
logError("Could not process product data. Error = ", e$message, ".")
},
warning = function(w) {
logWarn("Could not process product data. Warning = ", w$message, ".")
}
)
},
output_root = paths$output_root
)

logDebug("Finish process_tracker_file.")
}


#' @title Extract patient data.
#'
#' @param tracker_name Filename without extension.
#' @param tracker_data_file Filename of the tracker.
#' @param output_root Directory for storing extracted patient data.
#' @param synonyms_patient Synonyms for patient data header names.
#'
#' @export
process_patient_data <-
function(tracker_name,
tracker_data_file,
output_root,
synonyms_patient) {
logDebug("Start process_patient_data.")

df_raw_patient <-
reading_patient_data(
tracker_data_file = tracker_data_file,
columns_synonyms = synonyms_patient
)

df_raw_patient <- df_raw_patient %>% dplyr::mutate(file_name = tracker_name)

logDebug(
"df_raw_patient dim: ",
dim(df_raw_patient) %>% as.data.frame(),
"."
)

export_data_as_parquet(
data = df_raw_patient,
filename = tracker_name,
output_root = output_root,
suffix = "_patient_raw"
)

logDebug("Finish process_patient_data.")
}


#' @title Extract product data.
#'
#' @param tracker_name Filename without extension.
#' @param tracker_data_file Filename of the tracker.
#' @param output_root Directory for storing extracted product data.
#' @param synonyms_product Synonyms for product data header names.
#'
#' @export
process_product_data <-
function(tracker_name,
tracker_data_file,
output_root,
synonyms_product) {
logDebug("Start process_product_data.")

df_raw_product <-
reading_product_data_step1(
tracker_data_file = tracker_data_file,
columns_synonyms = synonyms_product
)

if (!is.null(df_raw_product)) {
df_raw_product <- df_raw_product %>% dplyr::mutate(file_name = tracker_name)
} else {
logDebug("Empty product data")
}

logDebug(
"df_raw_product dim: ",
dim(df_raw_product) %>% as.data.frame(),
"."
)

# product set sensitive column to NA and add tracker file name as a column
if (!is.null(df_raw_product)) {
export_data_as_parquet(
data = df_raw_product,
filename = tracker_name,
output_root = output_root,
suffix = "_product_raw"
)
} else {
logWarn("No product data in the file")
}
logDebug("Finish process_product_data.")
}
Loading