diff --git a/sts_params_internal.R b/sts_params_internal.R index c72b3c9..4d9f167 100644 --- a/sts_params_internal.R +++ b/sts_params_internal.R @@ -5,6 +5,8 @@ PARQUET_BUCKET <- 'recover-processed-data' PARQUET_BUCKET_BASE_KEY <- 'main/parquet/' +PARQUET_BUCKET_BASE_KEY_ARCHIVE <- 'main/archive/' + PARQUET_FOLDER <- 'syn51406699' # Local location where parquet bucket files are synced to diff --git a/sts_synindex_internal.R b/sts_synindex_internal.R index bc58a6a..205b698 100644 --- a/sts_synindex_internal.R +++ b/sts_synindex_internal.R @@ -1,15 +1,34 @@ -library(synapser) -library(arrow) -library(dplyr) -library(synapserutils) -library(rjson) +library(magrittr) + +# Functions --------------------------------------------------------------- + +#' Replace equal sign with underscore +#' +#' This function renames a directory path by replacing equal signs with underscores. +#' If a replacement is performed, it logs the change. +#' +#' @param directory_path The path of the directory to rename. +#' +#' @examples +#' replace_equal_with_underscore("path_with=equals") +#' +replace_equal_with_underscore <- function(directory_path) { + new_directory_path <- gsub("=", "_", directory_path) + if (directory_path != new_directory_path) { + file.rename(directory_path, new_directory_path) + return(cat("Renamed:", directory_path, "to", new_directory_path, "\n")) + } +} + + +# Setup ------------------------------------------------------------------- synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN')) source('~/recover-parquet-internal/sts_params_internal.R') -#### Get STS token for bucket in order to sync to local dir #### -# Get STS credentials +# Get STS credentials for processed data bucket --------------------------- + token <- synapser::synGetStsStorageToken( entity = PARQUET_FOLDER, permission = "read_only", @@ -21,51 +40,64 @@ if (PARQUET_BUCKET==token$bucket && PARQUET_BUCKET_BASE_KEY==token$baseKey) { base_s3_uri <- paste0('s3://', PARQUET_BUCKET, '/', PARQUET_BUCKET_BASE_KEY) } -# configure the environment with AWS token + +# Configure the environment with AWS token -------------------------------- + Sys.setenv('AWS_ACCESS_KEY_ID'=token$accessKeyId, 'AWS_SECRET_ACCESS_KEY'=token$secretAccessKey, 'AWS_SESSION_TOKEN'=token$sessionToken) -#### Sync bucket to local dir#### + +# Sync bucket to local dir ------------------------------------------------ + unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) sync_cmd <- glue::glue('aws s3 sync {base_s3_uri} {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') system(sync_cmd) -#### Index S3 Objects in Synapse #### -existing_dirs <- synGetChildren(PARQUET_FOLDER) %>% as.list() -if(length(existing_dirs)>0) { - for (i in seq_along(existing_dirs)) { - synDelete(existing_dirs[[i]]$id) - } -} +# Modify cohort identifier in dir name ------------------------------------ + +junk <- invisible(lapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION), replace_equal_with_underscore)) + + +# Generate manifest of existing files ------------------------------------- -# Generate manifest of existing files SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN') manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {SYNAPSE_PARENT_ID} --manifest ./current_manifest.tsv {AWS_PARQUET_DOWNLOAD_LOCATION}') system(manifest_cmd) -## Get a list of all files to upload and their synapse locations(parentId) + +# Index files in Synapse -------------------------------------------------- + +# Get a list of all files to upload and their synapse locations (parentId) STR_LEN_AWS_PARQUET_DOWNLOAD_LOCATION <- stringr::str_length(AWS_PARQUET_DOWNLOAD_LOCATION) -## All files present locally from manifest -synapse_manifest <- read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>% - dplyr::filter(path != paste0(AWS_PARQUET_DOWNLOAD_LOCATION,'/owner.txt')) %>% # need not create a dataFileHandleId for owner.txt - dplyr::rowwise() %>% - dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_AWS_PARQUET_DOWNLOAD_LOCATION+2)) %>% # location of file from home folder of S3 bucket - dplyr::mutate(s3_file_key = paste0('main/parquet/', file_key)) %>% # the namespace for files in the S3 bucket is S3::bucket/main/ - dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>% - dplyr::ungroup() +# List all local files present (from manifest) +synapse_manifest <- + read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>% + dplyr::filter(!grepl('owner.txt', path)) %>% + dplyr::rowwise() %>% + dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_AWS_PARQUET_DOWNLOAD_LOCATION+2)) %>% + dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY, file_key)) %>% + dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>% + dplyr::ungroup() %>% + dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key), + s3_file_key = gsub("cohort_", "cohort=", s3_file_key)) -## All currently indexed files in Synapse -synapse_fileview <- synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% read.csv() +# List all files currently indexed in Synapse +synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() +synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() -## find those files that are not in the fileview - files that need to be indexed +# Find the files in the manifest that are not yet indexed in Synapse if (nrow(synapse_fileview)>0) { - synapse_manifest_to_upload <- - synapse_manifest %>% + synapse_manifest_to_upload <- + synapse_manifest %>% dplyr::anti_join( - synapse_fileview %>% + synapse_fileview %>% dplyr::select(parent = parentId, s3_file_key = dataFileKey, md5_hash = dataFileMD5Hex)) @@ -73,34 +105,35 @@ if (nrow(synapse_fileview)>0) { synapse_manifest_to_upload <- synapse_manifest } -## Index in Synapse -## For each file index it in Synapse given a parent synapse folder -if(nrow(synapse_manifest_to_upload) > 0){ # there are some files to upload - for(file_number in seq(nrow(synapse_manifest_to_upload))){ - - # file and related synapse parent id - file_= synapse_manifest_to_upload$path[file_number] - parent_id = synapse_manifest_to_upload$parent[file_number] - s3_file_key = synapse_manifest_to_upload$s3_file_key[file_number] - # this would be the location of the file in the S3 bucket, in the local it is at {AWS_PARQUET_DOWNLOAD_LOCATION}/ +# Get script details for SynStore() provenance +latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-internal") +latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree") + +# Index each file in Synapse +if(nrow(synapse_manifest_to_upload) > 0){ + for(file_number in seq_len(nrow(synapse_manifest_to_upload))){ + tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")] - absolute_file_path <- tools::file_path_as_absolute(file_) # local absolute path + absolute_file_path <- tools::file_path_as_absolute(tmp$path) - temp_syn_obj <- synapser::synCreateExternalS3FileHandle( - bucket_name = PARQUET_BUCKET, - s3_file_key = s3_file_key, # - file_path = absolute_file_path, - parent = parent_id - ) + temp_syn_obj <- + synapser::synCreateExternalS3FileHandle( + bucket_name = PARQUET_BUCKET, + s3_file_key = tmp$s3_file_key, + file_path = absolute_file_path, + parent = tmp$parent) - # synapse does not accept ':' (colon) in filenames, so replacing it with '_colon_' new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_') - f <- File(dataFileHandleId=temp_syn_obj$id, - parentId=parent_id, - name = new_fileName) ## set the new file name + f <- File(dataFileHandleId = temp_syn_obj$id, + parentId = tmp$parent, + name = new_fileName) - f <- synStore(f) + f <- synStore(f, + activityName = "Indexing", + activityDescription = "Indexing internal parquet datasets", + used = PARQUET_FOLDER, + executed = latest_commit_tree_url) } }