From 544d8b765d28f29261559d148437f35530059eae Mon Sep 17 00:00:00 2001 From: Ian Mathews Date: Tue, 19 Mar 2024 13:54:55 -0600 Subject: [PATCH] fix: parallel download (#15) * fix: use http1 * chore: bump version * refactor: add support for redirect * fix: add accidentally deleted line * fix: properly parse quotes in content-disposition * test: change R params * fix: bad refactor * test: add multiplex * test: revert multiplex, increase host conn * test: host_con to 100 * revert max connections to 20 * try multiplex again * host_con = 20 * host_con=50 * tweak buffer size * try curl default pool options * add print for testing * 256k buffer * remove buffer size * cleanup --- DESCRIPTION | 2 +- R/File.R | 4 ++-- R/Table.R | 8 +++++--- R/api_request.R | 5 +++-- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 99f204a..eeb30c3 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: redivis Type: Package Title: R interface for Redivis -Version: 0.6.8 +Version: 0.6.9 Author: Redivis Inc. Maintainer: Redivis Description: Supports working with Redivis datasets and tables through R. diff --git a/R/File.R b/R/File.R index 1211f67..18ef384 100644 --- a/R/File.R +++ b/R/File.R @@ -36,7 +36,7 @@ File <- setRefClass( stream_callback <- NULL if (is_dir){ get_download_path_callback <- function(headers){ - name <- headers$'x-redivis-filename' + name <- sub(".*filename=", "", headers$'content-disposition') file_name <- file.path(path, name) if (!overwrite && base::file.exists(file_name)){ stop(str_interp("File already exists at '${file_name}'. Set parameter overwrite=TRUE to overwrite existing files.")) @@ -54,7 +54,7 @@ File <- setRefClass( on.exit(close(con)) } - res <- make_request(method="GET", path=str_interp("/rawFiles/${id}"), parse_response=FALSE, get_download_path_callback=get_download_path_callback, stream_callback=stream_callback) + res <- make_request(method="GET", path=str_interp("/rawFiles/${id}"), query=list(allowRedirect="true"), parse_response=FALSE, get_download_path_callback=get_download_path_callback, stream_callback=stream_callback) if (is_dir){ return(res) } else { diff --git a/R/Table.R b/R/Table.R index 0868899..8768321 100644 --- a/R/Table.R +++ b/R/Table.R @@ -295,7 +295,7 @@ Table <- setRefClass("Table", if (progress){ progressr::with_progress(perform_table_parallel_file_download(df[[file_id_variable]], path, overwrite)) } else { - perform_parallel_file_download(df[[file_id_variable]], path, overwrite) + perform_table_parallel_file_download(df[[file_id_variable]], path, overwrite) } } ) @@ -304,16 +304,18 @@ Table <- setRefClass("Table", #' @importFrom future plan multicore multisession sequential #' @importFrom parallelly supportsMulticore #' @importFrom progressr progressor +#' @importFrom purrr map perform_table_parallel_file_download <- function(vec, path, overwrite){ pb <- progressr::progressor(steps = length(vec)) download_paths <- list() get_download_path_from_headers <- function(headers){ - file_path <- base::file.path(path, headers$'x-redivis-filename') + name <- gsub('^"|"$', '', sub(".*filename=", "", headers$'content-disposition')) + file_path <- base::file.path(path, name) download_paths <<- append(download_paths, file_path) return(file_path) } perform_parallel_download( - purrr::map(vec, function(id){str_interp("/rawFiles/${id}")}), + purrr::map(vec, function(id){str_interp("/rawFiles/${id}?allowRedirect=true")}), overwrite=overwrite, get_download_path_from_headers=get_download_path_from_headers, on_finish=function(){pb(1)} diff --git a/R/api_request.R b/R/api_request.R index 8261477..83f522c 100644 --- a/R/api_request.R +++ b/R/api_request.R @@ -119,6 +119,7 @@ make_request <- function(method='GET', query=NULL, payload = NULL, parse_respons } } +#' @importFrom purrr map set_names parse_curl_headers <- function(res_data){ vec <- curl::parse_headers(res_data$headers) @@ -133,14 +134,14 @@ parse_curl_headers <- function(res_data){ #' @import curl perform_parallel_download <- function(paths, overwrite, get_download_path_from_headers, on_finish, stop_on_error=TRUE){ - pool <- curl::new_pool(total_con = 100, host_con = 20, multiplex = TRUE) + pool <- curl::new_pool() handles = list() for (path in paths){ h <- curl::new_handle() url <- generate_api_url(path) auth = get_authorization_header() curl::handle_setheaders(h, "Authorization"=auth[[1]]) - curl::handle_setopt(h, "url"=url, buffersize=1048576) # 1MB buffer + curl::handle_setopt(h, "url"=url) fail_fn <- function(e){ print(e)