Skip to content

Commit

Permalink
Merge pull request #14 from pranavanba/staging-to-archive
Browse files Browse the repository at this point in the history
Add new workflow to move validated data from a staging location to an archive location
  • Loading branch information
pranavanba authored Nov 10, 2023
2 parents 791c9a4 + 6a223cf commit c2383c1
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 11 deletions.
1 change: 1 addition & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ prod:
SYNAPSE_FILEVIEW_ID: syn52504776
PII_COLS_TO_DROP: syn52523394
DEID_VALS_TO_REVIEW: syn52409518
STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION: ./temp_staging_to_archive/

staging:
inherits: prod
Expand Down
133 changes: 133 additions & 0 deletions staging_to_archive.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
library(synapser)
library(tidyverse)

#' Replace equal sign with underscore
#'
#' This function renames a directory path by replacing equal signs with underscores.
#' If a replacement is performed, it logs the change.
#'
#' @param directory_path The path of the directory to rename.
#'
#' @examples
#' replace_equal_with_underscore("path_with=equals")
#'
replace_equal_with_underscore <- function(directory_path) {
new_directory_path <- gsub("=", "_", directory_path)
if (directory_path != new_directory_path) {
file.rename(directory_path, new_directory_path)
return(cat("Renamed:", directory_path, "to", new_directory_path, "\n"))
}
}

synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN'))

base_s3_uri_staging <-
paste0('s3://',
config::get("PARQUET_BUCKET_EXTERNAL", "staging"),
'/',
config::get("PARQUET_BUCKET_BASE_KEY_ARCHIVE", "staging"))

rm(list = names(config::get(config = "staging")))
config::get(config = "prod") %>% list2env(envir = .GlobalEnv)

base_s3_uri_archive <-
paste0('s3://',
PARQUET_BUCKET_EXTERNAL,
'/',
PARQUET_BUCKET_BASE_KEY_ARCHIVE)

validated_date <- readline("Enter name of validated staging folder in yyyy-mm-dd format: ")

# Index files in Synapse --------------------------------------------------
if (!is.null(synFindEntityId(validated_date, config::get("PARQUET_FOLDER_ARCHIVE", "staging")))) {
sync_cmd <- glue::glue("aws s3 --profile service-catalog sync {base_s3_uri_staging}{validated_date}/ {STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION} --exclude '*owner.txt*' --exclude '*archive*'")
system(sync_cmd)
rm(sync_cmd)
sync_cmd <- glue::glue("aws s3 --profile service-catalog sync {STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION} {base_s3_uri_archive}{validated_date}/ --exclude '*owner.txt*' --exclude '*archive*'")
system(sync_cmd)

rm(sync_cmd, validated_date)

# Sync entire bucket to local
unlink(STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T)
unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T)
sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {base_s3_uri_archive} {AWS_ARCHIVE_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"')
system(sync_cmd)

# Modify cohort identifier in dir name
junk <- sapply(list.dirs(AWS_ARCHIVE_DOWNLOAD_LOCATION), replace_equal_with_underscore)

# Generate manifest of existing files
SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN')
manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {PARQUET_FOLDER_ARCHIVE} --manifest ./current_manifest.tsv {AWS_ARCHIVE_DOWNLOAD_LOCATION}')
system(manifest_cmd)

# Get a list of all files to upload and their synapse locations (parentId)
STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(AWS_ARCHIVE_DOWNLOAD_LOCATION)

## List all local files present (from manifest)
synapse_manifest <-
read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>%
dplyr::filter(!grepl('owner.txt', path)) %>%
dplyr::rowwise() %>%
dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_PARQUET_FINAL_LOCATION+2)) %>%
dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY_ARCHIVE, file_key)) %>%
dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>%
dplyr::ungroup() %>%
dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key),
s3_file_key = gsub("cohort_", "cohort=", s3_file_key))


# List all files currently indexed in Synapse
synapse_fileview <-
synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>%
read.csv()
synapse_fileview <-
synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>%
read.csv()

# Find the files in the manifest that are not yet indexed in Synapse
if (nrow(synapse_fileview)>0) {
synapse_manifest_to_upload <-
synapse_manifest %>%
dplyr::anti_join(
synapse_fileview %>%
dplyr::select(parent = parentId,
s3_file_key = dataFileKey,
md5_hash = dataFileMD5Hex))
} else {
synapse_manifest_to_upload <- synapse_manifest
}

# Index each file in Synapse
latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external")
latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree")

if(nrow(synapse_manifest_to_upload) > 0){
for(file_number in seq_len(nrow(synapse_manifest_to_upload))){
tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")]

absolute_file_path <- tools::file_path_as_absolute(tmp$path)

temp_syn_obj <-
synapser::synCreateExternalS3FileHandle(
bucket_name = PARQUET_BUCKET_EXTERNAL,
s3_file_key = tmp$s3_file_key,
file_path = absolute_file_path,
parent = tmp$parent)

new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_')

f <- File(dataFileHandleId = temp_syn_obj$id,
parentId = tmp$parent,
name = new_fileName)

f <- synStore(f,
activityName = "Indexing",
activityDescription = "Indexing external parquet datasets",
used = PARQUET_FOLDER_INTERNAL,
executed = latest_commit_tree_url)

}
}
}
11 changes: 0 additions & 11 deletions sts_synindex_external.R
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ replace_equal_with_underscore <- function(directory_path) {
# Setup -------------------------------------------------------------------
synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN'))
config::get(config = "staging") %>% list2env(envir = .GlobalEnv)
# config::get(config = "prod") %>% list2env(envir = .GlobalEnv)


# Get STS credentials for input data bucket -------------------------------
Expand Down Expand Up @@ -146,16 +145,6 @@ date <- lubridate::today()
sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {PARQUET_FINAL_LOCATION} {base_s3_uri_archive}{date}/ --exclude "*owner.txt*" --exclude "*archive*"')
system(sync_cmd)


# Recreate directory tree of parquet datasets bucket location in S --------
# existing_dirs <- synGetChildren(PARQUET_FOLDER_ARCHIVE) %>% as.list()
#
# if(length(existing_dirs)>0) {
# for (i in seq_along(existing_dirs)) {
# synDelete(existing_dirs[[i]]$id)
# }
# }

# Sync entire bucket to local
unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T)
unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T)
Expand Down

0 comments on commit c2383c1

Please sign in to comment.