diff --git a/config.yml b/config.yml index c38082a..eb14883 100644 --- a/config.yml +++ b/config.yml @@ -21,6 +21,7 @@ prod: SYNAPSE_FILEVIEW_ID: syn52504776 PII_COLS_TO_DROP: syn52523394 DEID_VALS_TO_REVIEW: syn52409518 + STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION: ./temp_staging_to_archive/ staging: inherits: prod diff --git a/staging_to_archive.R b/staging_to_archive.R new file mode 100644 index 0000000..76c3d0f --- /dev/null +++ b/staging_to_archive.R @@ -0,0 +1,133 @@ +library(synapser) +library(tidyverse) + +#' Replace equal sign with underscore +#' +#' This function renames a directory path by replacing equal signs with underscores. +#' If a replacement is performed, it logs the change. +#' +#' @param directory_path The path of the directory to rename. +#' +#' @examples +#' replace_equal_with_underscore("path_with=equals") +#' +replace_equal_with_underscore <- function(directory_path) { + new_directory_path <- gsub("=", "_", directory_path) + if (directory_path != new_directory_path) { + file.rename(directory_path, new_directory_path) + return(cat("Renamed:", directory_path, "to", new_directory_path, "\n")) + } +} + +synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN')) + +base_s3_uri_staging <- + paste0('s3://', + config::get("PARQUET_BUCKET_EXTERNAL", "staging"), + '/', + config::get("PARQUET_BUCKET_BASE_KEY_ARCHIVE", "staging")) + +rm(list = names(config::get(config = "staging"))) +config::get(config = "prod") %>% list2env(envir = .GlobalEnv) + +base_s3_uri_archive <- + paste0('s3://', + PARQUET_BUCKET_EXTERNAL, + '/', + PARQUET_BUCKET_BASE_KEY_ARCHIVE) + +validated_date <- readline("Enter name of validated staging folder in yyyy-mm-dd format: ") + +# Index files in Synapse -------------------------------------------------- +if (!is.null(synFindEntityId(validated_date, config::get("PARQUET_FOLDER_ARCHIVE", "staging")))) { + sync_cmd <- glue::glue("aws s3 --profile service-catalog sync {base_s3_uri_staging}{validated_date}/ {STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION} --exclude '*owner.txt*' --exclude '*archive*'") + system(sync_cmd) + rm(sync_cmd) + sync_cmd <- glue::glue("aws s3 --profile service-catalog sync {STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION} {base_s3_uri_archive}{validated_date}/ --exclude '*owner.txt*' --exclude '*archive*'") + system(sync_cmd) + + rm(sync_cmd, validated_date) + + # Sync entire bucket to local + unlink(STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T) + unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T) + sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {base_s3_uri_archive} {AWS_ARCHIVE_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') + system(sync_cmd) + + # Modify cohort identifier in dir name + junk <- sapply(list.dirs(AWS_ARCHIVE_DOWNLOAD_LOCATION), replace_equal_with_underscore) + + # Generate manifest of existing files + SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN') + manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {PARQUET_FOLDER_ARCHIVE} --manifest ./current_manifest.tsv {AWS_ARCHIVE_DOWNLOAD_LOCATION}') + system(manifest_cmd) + + # Get a list of all files to upload and their synapse locations (parentId) + STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(AWS_ARCHIVE_DOWNLOAD_LOCATION) + + ## List all local files present (from manifest) + synapse_manifest <- + read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>% + dplyr::filter(!grepl('owner.txt', path)) %>% + dplyr::rowwise() %>% + dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_PARQUET_FINAL_LOCATION+2)) %>% + dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY_ARCHIVE, file_key)) %>% + dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>% + dplyr::ungroup() %>% + dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key), + s3_file_key = gsub("cohort_", "cohort=", s3_file_key)) + + + # List all files currently indexed in Synapse + synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() + synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() + + # Find the files in the manifest that are not yet indexed in Synapse + if (nrow(synapse_fileview)>0) { + synapse_manifest_to_upload <- + synapse_manifest %>% + dplyr::anti_join( + synapse_fileview %>% + dplyr::select(parent = parentId, + s3_file_key = dataFileKey, + md5_hash = dataFileMD5Hex)) + } else { + synapse_manifest_to_upload <- synapse_manifest + } + + # Index each file in Synapse + latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external") + latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree") + + if(nrow(synapse_manifest_to_upload) > 0){ + for(file_number in seq_len(nrow(synapse_manifest_to_upload))){ + tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")] + + absolute_file_path <- tools::file_path_as_absolute(tmp$path) + + temp_syn_obj <- + synapser::synCreateExternalS3FileHandle( + bucket_name = PARQUET_BUCKET_EXTERNAL, + s3_file_key = tmp$s3_file_key, + file_path = absolute_file_path, + parent = tmp$parent) + + new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_') + + f <- File(dataFileHandleId = temp_syn_obj$id, + parentId = tmp$parent, + name = new_fileName) + + f <- synStore(f, + activityName = "Indexing", + activityDescription = "Indexing external parquet datasets", + used = PARQUET_FOLDER_INTERNAL, + executed = latest_commit_tree_url) + + } + } +} diff --git a/sts_synindex_external.R b/sts_synindex_external.R index fd09035..abfdc38 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -92,7 +92,6 @@ replace_equal_with_underscore <- function(directory_path) { # Setup ------------------------------------------------------------------- synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN')) config::get(config = "staging") %>% list2env(envir = .GlobalEnv) -# config::get(config = "prod") %>% list2env(envir = .GlobalEnv) # Get STS credentials for input data bucket ------------------------------- @@ -146,16 +145,6 @@ date <- lubridate::today() sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {PARQUET_FINAL_LOCATION} {base_s3_uri_archive}{date}/ --exclude "*owner.txt*" --exclude "*archive*"') system(sync_cmd) - -# Recreate directory tree of parquet datasets bucket location in S -------- -# existing_dirs <- synGetChildren(PARQUET_FOLDER_ARCHIVE) %>% as.list() -# -# if(length(existing_dirs)>0) { -# for (i in seq_along(existing_dirs)) { -# synDelete(existing_dirs[[i]]$id) -# } -# } - # Sync entire bucket to local unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T)