Skip to content

Commit

Permalink
Merge pull request #3 from pranavanba/change-index-to-manifest
Browse files Browse the repository at this point in the history
Update indexing code
  • Loading branch information
pranavanba authored Apr 9, 2024
2 parents 40b7aab + 20d2975 commit 2e5d3a5
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 54 deletions.
2 changes: 2 additions & 0 deletions sts_params_internal.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ PARQUET_BUCKET <- 'recover-processed-data'

PARQUET_BUCKET_BASE_KEY <- 'main/parquet/'

PARQUET_BUCKET_BASE_KEY_ARCHIVE <- 'main/archive/'

PARQUET_FOLDER <- 'syn51406699'

# Local location where parquet bucket files are synced to
Expand Down
141 changes: 87 additions & 54 deletions sts_synindex_internal.R
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
library(synapser)
library(arrow)
library(dplyr)
library(synapserutils)
library(rjson)
library(magrittr)

# Functions ---------------------------------------------------------------

#' Replace equal sign with underscore
#'
#' This function renames a directory path by replacing equal signs with underscores.
#' If a replacement is performed, it logs the change.
#'
#' @param directory_path The path of the directory to rename.
#'
#' @examples
#' replace_equal_with_underscore("path_with=equals")
#'
replace_equal_with_underscore <- function(directory_path) {
new_directory_path <- gsub("=", "_", directory_path)
if (directory_path != new_directory_path) {
file.rename(directory_path, new_directory_path)
return(cat("Renamed:", directory_path, "to", new_directory_path, "\n"))
}
}


# Setup -------------------------------------------------------------------

synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN'))
source('~/recover-parquet-internal/sts_params_internal.R')

#### Get STS token for bucket in order to sync to local dir ####

# Get STS credentials
# Get STS credentials for processed data bucket ---------------------------

token <- synapser::synGetStsStorageToken(
entity = PARQUET_FOLDER,
permission = "read_only",
Expand All @@ -21,86 +40,100 @@ if (PARQUET_BUCKET==token$bucket && PARQUET_BUCKET_BASE_KEY==token$baseKey) {
base_s3_uri <- paste0('s3://', PARQUET_BUCKET, '/', PARQUET_BUCKET_BASE_KEY)
}

# configure the environment with AWS token

# Configure the environment with AWS token --------------------------------

Sys.setenv('AWS_ACCESS_KEY_ID'=token$accessKeyId,
'AWS_SECRET_ACCESS_KEY'=token$secretAccessKey,
'AWS_SESSION_TOKEN'=token$sessionToken)

#### Sync bucket to local dir####

# Sync bucket to local dir ------------------------------------------------

unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T)
sync_cmd <- glue::glue('aws s3 sync {base_s3_uri} {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"')
system(sync_cmd)

#### Index S3 Objects in Synapse ####
existing_dirs <- synGetChildren(PARQUET_FOLDER) %>% as.list()

if(length(existing_dirs)>0) {
for (i in seq_along(existing_dirs)) {
synDelete(existing_dirs[[i]]$id)
}
}
# Modify cohort identifier in dir name ------------------------------------

junk <- invisible(lapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION), replace_equal_with_underscore))


# Generate manifest of existing files -------------------------------------

# Generate manifest of existing files
SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN')
manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {SYNAPSE_PARENT_ID} --manifest ./current_manifest.tsv {AWS_PARQUET_DOWNLOAD_LOCATION}')
system(manifest_cmd)

## Get a list of all files to upload and their synapse locations(parentId)

# Index files in Synapse --------------------------------------------------

# Get a list of all files to upload and their synapse locations (parentId)
STR_LEN_AWS_PARQUET_DOWNLOAD_LOCATION <- stringr::str_length(AWS_PARQUET_DOWNLOAD_LOCATION)

## All files present locally from manifest
synapse_manifest <- read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>%
dplyr::filter(path != paste0(AWS_PARQUET_DOWNLOAD_LOCATION,'/owner.txt')) %>% # need not create a dataFileHandleId for owner.txt
dplyr::rowwise() %>%
dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_AWS_PARQUET_DOWNLOAD_LOCATION+2)) %>% # location of file from home folder of S3 bucket
dplyr::mutate(s3_file_key = paste0('main/parquet/', file_key)) %>% # the namespace for files in the S3 bucket is S3::bucket/main/
dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>%
dplyr::ungroup()
# List all local files present (from manifest)
synapse_manifest <-
read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>%
dplyr::filter(!grepl('owner.txt', path)) %>%
dplyr::rowwise() %>%
dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_AWS_PARQUET_DOWNLOAD_LOCATION+2)) %>%
dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY, file_key)) %>%
dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>%
dplyr::ungroup() %>%
dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key),
s3_file_key = gsub("cohort_", "cohort=", s3_file_key))

## All currently indexed files in Synapse
synapse_fileview <- synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% read.csv()
# List all files currently indexed in Synapse
synapse_fileview <-
synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>%
read.csv()
synapse_fileview <-
synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>%
read.csv()

## find those files that are not in the fileview - files that need to be indexed
# Find the files in the manifest that are not yet indexed in Synapse
if (nrow(synapse_fileview)>0) {
synapse_manifest_to_upload <-
synapse_manifest %>%
synapse_manifest_to_upload <-
synapse_manifest %>%
dplyr::anti_join(
synapse_fileview %>%
synapse_fileview %>%
dplyr::select(parent = parentId,
s3_file_key = dataFileKey,
md5_hash = dataFileMD5Hex))
} else {
synapse_manifest_to_upload <- synapse_manifest
}

## Index in Synapse
## For each file index it in Synapse given a parent synapse folder
if(nrow(synapse_manifest_to_upload) > 0){ # there are some files to upload
for(file_number in seq(nrow(synapse_manifest_to_upload))){

# file and related synapse parent id
file_= synapse_manifest_to_upload$path[file_number]
parent_id = synapse_manifest_to_upload$parent[file_number]
s3_file_key = synapse_manifest_to_upload$s3_file_key[file_number]
# this would be the location of the file in the S3 bucket, in the local it is at {AWS_PARQUET_DOWNLOAD_LOCATION}/
# Get script details for SynStore() provenance
latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-internal")
latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree")

# Index each file in Synapse
if(nrow(synapse_manifest_to_upload) > 0){
for(file_number in seq_len(nrow(synapse_manifest_to_upload))){
tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")]

absolute_file_path <- tools::file_path_as_absolute(file_) # local absolute path
absolute_file_path <- tools::file_path_as_absolute(tmp$path)

temp_syn_obj <- synapser::synCreateExternalS3FileHandle(
bucket_name = PARQUET_BUCKET,
s3_file_key = s3_file_key, #
file_path = absolute_file_path,
parent = parent_id
)
temp_syn_obj <-
synapser::synCreateExternalS3FileHandle(
bucket_name = PARQUET_BUCKET,
s3_file_key = tmp$s3_file_key,
file_path = absolute_file_path,
parent = tmp$parent)

# synapse does not accept ':' (colon) in filenames, so replacing it with '_colon_'
new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_')

f <- File(dataFileHandleId=temp_syn_obj$id,
parentId=parent_id,
name = new_fileName) ## set the new file name
f <- File(dataFileHandleId = temp_syn_obj$id,
parentId = tmp$parent,
name = new_fileName)

f <- synStore(f)
f <- synStore(f,
activityName = "Indexing",
activityDescription = "Indexing internal parquet datasets",
used = PARQUET_FOLDER,
executed = latest_commit_tree_url)

}
}

0 comments on commit 2e5d3a5

Please sign in to comment.