From cf46e205a0f61cca7bd85cb03e4fbc72175e9396 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 10 Nov 2023 20:32:04 +0000 Subject: [PATCH 1/9] Remove unused, commented code --- sts_synindex_external.R | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sts_synindex_external.R b/sts_synindex_external.R index fd09035..abfdc38 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -92,7 +92,6 @@ replace_equal_with_underscore <- function(directory_path) { # Setup ------------------------------------------------------------------- synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN')) config::get(config = "staging") %>% list2env(envir = .GlobalEnv) -# config::get(config = "prod") %>% list2env(envir = .GlobalEnv) # Get STS credentials for input data bucket ------------------------------- @@ -146,16 +145,6 @@ date <- lubridate::today() sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {PARQUET_FINAL_LOCATION} {base_s3_uri_archive}{date}/ --exclude "*owner.txt*" --exclude "*archive*"') system(sync_cmd) - -# Recreate directory tree of parquet datasets bucket location in S -------- -# existing_dirs <- synGetChildren(PARQUET_FOLDER_ARCHIVE) %>% as.list() -# -# if(length(existing_dirs)>0) { -# for (i in seq_along(existing_dirs)) { -# synDelete(existing_dirs[[i]]$id) -# } -# } - # Sync entire bucket to local unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T) From b67215072a68fc6af205b78ce0844adb3f9166b5 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 10 Nov 2023 21:05:56 +0000 Subject: [PATCH 2/9] Initial commit --- staging_to_archive.R | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 staging_to_archive.R diff --git a/staging_to_archive.R b/staging_to_archive.R new file mode 100644 index 0000000..5e50320 --- /dev/null +++ b/staging_to_archive.R @@ -0,0 +1,6 @@ +rm(list = names(config::get(config = "staging"))) +config::get(config = "prod") %>% list2env(envir = .GlobalEnv) + +validated_date <- readline("Enter name of validated staging folder in yyyy-mm-dd format: ") +cmd <- glue::glue("aws s3 --profile service-catalog cp {base_s3_uri_staging}/{validated_date} {base_s3_uri_archive}") +rm(validated_date) From 12b642e99107e3d242a470e41367d88bfddf51af Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 10 Nov 2023 21:10:29 +0000 Subject: [PATCH 3/9] Get s3 URIs for staging and archive and replace staging params with prod params --- staging_to_archive.R | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/staging_to_archive.R b/staging_to_archive.R index 5e50320..23fe1d6 100644 --- a/staging_to_archive.R +++ b/staging_to_archive.R @@ -1,6 +1,11 @@ +base_s3_uri_staging <- paste0('s3://', PARQUET_BUCKET_EXTERNAL, '/', PARQUET_BUCKET_BASE_KEY_ARCHIVE) + rm(list = names(config::get(config = "staging"))) config::get(config = "prod") %>% list2env(envir = .GlobalEnv) +base_s3_uri_archive <- paste0('s3://', PARQUET_BUCKET_EXTERNAL, '/', PARQUET_BUCKET_BASE_KEY_ARCHIVE) + validated_date <- readline("Enter name of validated staging folder in yyyy-mm-dd format: ") -cmd <- glue::glue("aws s3 --profile service-catalog cp {base_s3_uri_staging}/{validated_date} {base_s3_uri_archive}") +cmd <- glue::glue("aws s3 --profile service-catalog cp {base_s3_uri_staging}{validated_date} {base_s3_uri_archive}") rm(validated_date) + From 494f131c6003d563e7071e042c48d1b69fd746ee Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 10 Nov 2023 21:16:43 +0000 Subject: [PATCH 4/9] Move selected staging folder to archive and then index in synapse --- staging_to_archive.R | 90 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/staging_to_archive.R b/staging_to_archive.R index 23fe1d6..ea71736 100644 --- a/staging_to_archive.R +++ b/staging_to_archive.R @@ -1,3 +1,7 @@ +library(synapser) + +synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN')) + base_s3_uri_staging <- paste0('s3://', PARQUET_BUCKET_EXTERNAL, '/', PARQUET_BUCKET_BASE_KEY_ARCHIVE) rm(list = names(config::get(config = "staging"))) @@ -6,6 +10,90 @@ config::get(config = "prod") %>% list2env(envir = .GlobalEnv) base_s3_uri_archive <- paste0('s3://', PARQUET_BUCKET_EXTERNAL, '/', PARQUET_BUCKET_BASE_KEY_ARCHIVE) validated_date <- readline("Enter name of validated staging folder in yyyy-mm-dd format: ") -cmd <- glue::glue("aws s3 --profile service-catalog cp {base_s3_uri_staging}{validated_date} {base_s3_uri_archive}") +cmd <- glue::glue("aws s3 --profile service-catalog cp {base_s3_uri_staging}{validated_date} {base_s3_uri_archive} --exclude '*owner.txt*' --exclude '*archive*'") rm(validated_date) +# Sync entire bucket to local +unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) +unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T) +sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {base_s3_uri_archive} {AWS_ARCHIVE_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') +system(sync_cmd) + +# Modify cohort identifier in dir name +junk <- sapply(list.dirs(AWS_ARCHIVE_DOWNLOAD_LOCATION), replace_equal_with_underscore) + +# Generate manifest of existing files +SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN') +manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {PARQUET_FOLDER_ARCHIVE} --manifest ./current_manifest.tsv {AWS_ARCHIVE_DOWNLOAD_LOCATION}') +system(manifest_cmd) + + +# Index files in Synapse -------------------------------------------------- +# Get a list of all files to upload and their synapse locations (parentId) +STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(AWS_ARCHIVE_DOWNLOAD_LOCATION) + +## List all local files present (from manifest) +synapse_manifest <- + read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>% + dplyr::filter(!grepl('owner.txt', path)) %>% + dplyr::rowwise() %>% + dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_PARQUET_FINAL_LOCATION+2)) %>% + dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY_ARCHIVE, file_key)) %>% + dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>% + dplyr::ungroup() %>% + dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key), + s3_file_key = gsub("cohort_", "cohort=", s3_file_key)) + + +# List all files currently indexed in Synapse +synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() +synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() + +# Find the files in the manifest that are not yet indexed in Synapse +if (nrow(synapse_fileview)>0) { + synapse_manifest_to_upload <- + synapse_manifest %>% + dplyr::anti_join( + synapse_fileview %>% + dplyr::select(parent = parentId, + s3_file_key = dataFileKey, + md5_hash = dataFileMD5Hex)) +} else { + synapse_manifest_to_upload <- synapse_manifest +} + +# Index each file in Synapse +latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external") +latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree") + +if(nrow(synapse_manifest_to_upload) > 0){ + for(file_number in seq_len(nrow(synapse_manifest_to_upload))){ + tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")] + + absolute_file_path <- tools::file_path_as_absolute(tmp$path) + + temp_syn_obj <- + synapser::synCreateExternalS3FileHandle( + bucket_name = PARQUET_BUCKET_EXTERNAL, + s3_file_key = tmp$s3_file_key, + file_path = absolute_file_path, + parent = tmp$parent) + + new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_') + + f <- File(dataFileHandleId = temp_syn_obj$id, + parentId = tmp$parent, + name = new_fileName) + + f <- synStore(f, + activityName = "Indexing", + activityDescription = "Indexing external parquet datasets", + used = PARQUET_FOLDER_INTERNAL, + executed = latest_commit_tree_url) + + } +} From a18ea1deec9cc73c558c3c7e675d6c83dfbdbc88 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 10 Nov 2023 21:21:02 +0000 Subject: [PATCH 5/9] Get staging params directly from config file --- staging_to_archive.R | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/staging_to_archive.R b/staging_to_archive.R index ea71736..ded8b1e 100644 --- a/staging_to_archive.R +++ b/staging_to_archive.R @@ -1,13 +1,22 @@ library(synapser) +library(tidyverse) synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN')) -base_s3_uri_staging <- paste0('s3://', PARQUET_BUCKET_EXTERNAL, '/', PARQUET_BUCKET_BASE_KEY_ARCHIVE) +base_s3_uri_staging <- + paste0('s3://', + config::get("PARQUET_BUCKET_EXTERNAL", "staging"), + '/', + config::get("PARQUET_BUCKET_BASE_KEY_ARCHIVE", "staging")) rm(list = names(config::get(config = "staging"))) config::get(config = "prod") %>% list2env(envir = .GlobalEnv) -base_s3_uri_archive <- paste0('s3://', PARQUET_BUCKET_EXTERNAL, '/', PARQUET_BUCKET_BASE_KEY_ARCHIVE) +base_s3_uri_archive <- + paste0('s3://', + PARQUET_BUCKET_EXTERNAL, + '/', + PARQUET_BUCKET_BASE_KEY_ARCHIVE) validated_date <- readline("Enter name of validated staging folder in yyyy-mm-dd format: ") cmd <- glue::glue("aws s3 --profile service-catalog cp {base_s3_uri_staging}{validated_date} {base_s3_uri_archive} --exclude '*owner.txt*' --exclude '*archive*'") From 9a47179c860203e177210506ef592ff0fff7a293 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 10 Nov 2023 21:25:17 +0000 Subject: [PATCH 6/9] Check if selected folder exists in staging folder in Synapse before continuing --- staging_to_archive.R | 171 ++++++++++++++++++++++--------------------- 1 file changed, 87 insertions(+), 84 deletions(-) diff --git a/staging_to_archive.R b/staging_to_archive.R index ded8b1e..b335456 100644 --- a/staging_to_archive.R +++ b/staging_to_archive.R @@ -19,90 +19,93 @@ base_s3_uri_archive <- PARQUET_BUCKET_BASE_KEY_ARCHIVE) validated_date <- readline("Enter name of validated staging folder in yyyy-mm-dd format: ") -cmd <- glue::glue("aws s3 --profile service-catalog cp {base_s3_uri_staging}{validated_date} {base_s3_uri_archive} --exclude '*owner.txt*' --exclude '*archive*'") -rm(validated_date) -# Sync entire bucket to local -unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) -unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T) -sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {base_s3_uri_archive} {AWS_ARCHIVE_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') -system(sync_cmd) - -# Modify cohort identifier in dir name -junk <- sapply(list.dirs(AWS_ARCHIVE_DOWNLOAD_LOCATION), replace_equal_with_underscore) - -# Generate manifest of existing files -SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN') -manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {PARQUET_FOLDER_ARCHIVE} --manifest ./current_manifest.tsv {AWS_ARCHIVE_DOWNLOAD_LOCATION}') -system(manifest_cmd) - - -# Index files in Synapse -------------------------------------------------- -# Get a list of all files to upload and their synapse locations (parentId) -STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(AWS_ARCHIVE_DOWNLOAD_LOCATION) - -## List all local files present (from manifest) -synapse_manifest <- - read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>% - dplyr::filter(!grepl('owner.txt', path)) %>% - dplyr::rowwise() %>% - dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_PARQUET_FINAL_LOCATION+2)) %>% - dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY_ARCHIVE, file_key)) %>% - dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>% - dplyr::ungroup() %>% - dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key), - s3_file_key = gsub("cohort_", "cohort=", s3_file_key)) - - -# List all files currently indexed in Synapse -synapse_fileview <- - synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% - read.csv() -synapse_fileview <- - synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% - read.csv() - -# Find the files in the manifest that are not yet indexed in Synapse -if (nrow(synapse_fileview)>0) { - synapse_manifest_to_upload <- - synapse_manifest %>% - dplyr::anti_join( - synapse_fileview %>% - dplyr::select(parent = parentId, - s3_file_key = dataFileKey, - md5_hash = dataFileMD5Hex)) -} else { - synapse_manifest_to_upload <- synapse_manifest -} - -# Index each file in Synapse -latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external") -latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree") - -if(nrow(synapse_manifest_to_upload) > 0){ - for(file_number in seq_len(nrow(synapse_manifest_to_upload))){ - tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")] - - absolute_file_path <- tools::file_path_as_absolute(tmp$path) - - temp_syn_obj <- - synapser::synCreateExternalS3FileHandle( - bucket_name = PARQUET_BUCKET_EXTERNAL, - s3_file_key = tmp$s3_file_key, - file_path = absolute_file_path, - parent = tmp$parent) - - new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_') - - f <- File(dataFileHandleId = temp_syn_obj$id, - parentId = tmp$parent, - name = new_fileName) - - f <- synStore(f, - activityName = "Indexing", - activityDescription = "Indexing external parquet datasets", - used = PARQUET_FOLDER_INTERNAL, - executed = latest_commit_tree_url) - +if (!is.null(synFindEntityId(validated_date, config::get("PARQUET_FOLDER_ARCHIVE", "staging")))) { + cmd <- glue::glue("aws s3 --profile service-catalog cp {base_s3_uri_staging}{validated_date} {base_s3_uri_archive} --exclude '*owner.txt*' --exclude '*archive*'") + rm(validated_date) + + # Sync entire bucket to local + unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) + unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T) + sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {base_s3_uri_archive} {AWS_ARCHIVE_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') + system(sync_cmd) + + # Modify cohort identifier in dir name + junk <- sapply(list.dirs(AWS_ARCHIVE_DOWNLOAD_LOCATION), replace_equal_with_underscore) + + # Generate manifest of existing files + SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN') + manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {PARQUET_FOLDER_ARCHIVE} --manifest ./current_manifest.tsv {AWS_ARCHIVE_DOWNLOAD_LOCATION}') + system(manifest_cmd) + + + # Index files in Synapse -------------------------------------------------- + # Get a list of all files to upload and their synapse locations (parentId) + STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(AWS_ARCHIVE_DOWNLOAD_LOCATION) + + ## List all local files present (from manifest) + synapse_manifest <- + read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>% + dplyr::filter(!grepl('owner.txt', path)) %>% + dplyr::rowwise() %>% + dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_PARQUET_FINAL_LOCATION+2)) %>% + dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY_ARCHIVE, file_key)) %>% + dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>% + dplyr::ungroup() %>% + dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key), + s3_file_key = gsub("cohort_", "cohort=", s3_file_key)) + + + # List all files currently indexed in Synapse + synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() + synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() + + # Find the files in the manifest that are not yet indexed in Synapse + if (nrow(synapse_fileview)>0) { + synapse_manifest_to_upload <- + synapse_manifest %>% + dplyr::anti_join( + synapse_fileview %>% + dplyr::select(parent = parentId, + s3_file_key = dataFileKey, + md5_hash = dataFileMD5Hex)) + } else { + synapse_manifest_to_upload <- synapse_manifest + } + + # Index each file in Synapse + latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external") + latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree") + + if(nrow(synapse_manifest_to_upload) > 0){ + for(file_number in seq_len(nrow(synapse_manifest_to_upload))){ + tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")] + + absolute_file_path <- tools::file_path_as_absolute(tmp$path) + + temp_syn_obj <- + synapser::synCreateExternalS3FileHandle( + bucket_name = PARQUET_BUCKET_EXTERNAL, + s3_file_key = tmp$s3_file_key, + file_path = absolute_file_path, + parent = tmp$parent) + + new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_') + + f <- File(dataFileHandleId = temp_syn_obj$id, + parentId = tmp$parent, + name = new_fileName) + + f <- synStore(f, + activityName = "Indexing", + activityDescription = "Indexing external parquet datasets", + used = PARQUET_FOLDER_INTERNAL, + executed = latest_commit_tree_url) + + } } } From 80787bd827713df1431a9f7d74da7ca2883b06df Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 10 Nov 2023 21:50:34 +0000 Subject: [PATCH 7/9] Sync validated date s3 folder to local to archive s3 folder; remove temporary dirs and vars --- staging_to_archive.R | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/staging_to_archive.R b/staging_to_archive.R index b335456..1199627 100644 --- a/staging_to_archive.R +++ b/staging_to_archive.R @@ -21,11 +21,16 @@ base_s3_uri_archive <- validated_date <- readline("Enter name of validated staging folder in yyyy-mm-dd format: ") if (!is.null(synFindEntityId(validated_date, config::get("PARQUET_FOLDER_ARCHIVE", "staging")))) { - cmd <- glue::glue("aws s3 --profile service-catalog cp {base_s3_uri_staging}{validated_date} {base_s3_uri_archive} --exclude '*owner.txt*' --exclude '*archive*'") - rm(validated_date) + sync_cmd <- glue::glue("aws s3 --profile service-catalog sync {base_s3_uri_staging}{validated_date}/ {STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION} --exclude '*owner.txt*' --exclude '*archive*'") + system(sync_cmd) + rm(sync_cmd) + sync_cmd <- glue::glue("aws s3 --profile service-catalog sync {STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION} {base_s3_uri_archive}{validated_date}/ --exclude '*owner.txt*' --exclude '*archive*'") + system(sync_cmd) + + rm(sync_cmd, validated_date) # Sync entire bucket to local - unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) + unlink(STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T) unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T) sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {base_s3_uri_archive} {AWS_ARCHIVE_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') system(sync_cmd) From 3fc72f1e3bc6d71fa3cca3b9b669987e30aaa32b Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 10 Nov 2023 21:51:45 +0000 Subject: [PATCH 8/9] New param for path to use for syncing to/from for staging to archive ops --- config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/config.yml b/config.yml index c38082a..eb14883 100644 --- a/config.yml +++ b/config.yml @@ -21,6 +21,7 @@ prod: SYNAPSE_FILEVIEW_ID: syn52504776 PII_COLS_TO_DROP: syn52523394 DEID_VALS_TO_REVIEW: syn52409518 + STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION: ./temp_staging_to_archive/ staging: inherits: prod From 6a223cf71fa8dcce2a90d220baeec91a60a3b00f Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Fri, 10 Nov 2023 22:02:21 +0000 Subject: [PATCH 9/9] Add necessary function --- staging_to_archive.R | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/staging_to_archive.R b/staging_to_archive.R index 1199627..76c3d0f 100644 --- a/staging_to_archive.R +++ b/staging_to_archive.R @@ -1,6 +1,24 @@ library(synapser) library(tidyverse) +#' Replace equal sign with underscore +#' +#' This function renames a directory path by replacing equal signs with underscores. +#' If a replacement is performed, it logs the change. +#' +#' @param directory_path The path of the directory to rename. +#' +#' @examples +#' replace_equal_with_underscore("path_with=equals") +#' +replace_equal_with_underscore <- function(directory_path) { + new_directory_path <- gsub("=", "_", directory_path) + if (directory_path != new_directory_path) { + file.rename(directory_path, new_directory_path) + return(cat("Renamed:", directory_path, "to", new_directory_path, "\n")) + } +} + synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN')) base_s3_uri_staging <- @@ -20,6 +38,7 @@ base_s3_uri_archive <- validated_date <- readline("Enter name of validated staging folder in yyyy-mm-dd format: ") +# Index files in Synapse -------------------------------------------------- if (!is.null(synFindEntityId(validated_date, config::get("PARQUET_FOLDER_ARCHIVE", "staging")))) { sync_cmd <- glue::glue("aws s3 --profile service-catalog sync {base_s3_uri_staging}{validated_date}/ {STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION} --exclude '*owner.txt*' --exclude '*archive*'") system(sync_cmd) @@ -43,8 +62,6 @@ if (!is.null(synFindEntityId(validated_date, config::get("PARQUET_FOLDER_ARCHIVE manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {PARQUET_FOLDER_ARCHIVE} --manifest ./current_manifest.tsv {AWS_ARCHIVE_DOWNLOAD_LOCATION}') system(manifest_cmd) - - # Index files in Synapse -------------------------------------------------- # Get a list of all files to upload and their synapse locations (parentId) STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(AWS_ARCHIVE_DOWNLOAD_LOCATION)