Skip to content

Commit

Permalink
fix: parallel download (#15)
Browse files Browse the repository at this point in the history
* fix: use http1

* chore: bump version

* refactor: add support for redirect

* fix: add accidentally deleted line

* fix: properly parse quotes in content-disposition

* test: change R params

* fix: bad refactor

* test: add multiplex

* test: revert multiplex, increase host conn

* test: host_con to 100

* revert max connections to 20

* try multiplex again

* host_con = 20

* host_con=50

* tweak buffer size

* try curl default pool options

* add print for testing

* 256k buffer

* remove buffer size

* cleanup
  • Loading branch information
imathews authored Mar 19, 2024
1 parent e914426 commit 544d8b7
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: redivis
Type: Package
Title: R interface for Redivis
Version: 0.6.8
Version: 0.6.9
Author: Redivis Inc.
Maintainer: Redivis <[email protected]>
Description: Supports working with Redivis datasets and tables through R.
Expand Down
4 changes: 2 additions & 2 deletions R/File.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ File <- setRefClass(
stream_callback <- NULL
if (is_dir){
get_download_path_callback <- function(headers){
name <- headers$'x-redivis-filename'
name <- sub(".*filename=", "", headers$'content-disposition')
file_name <- file.path(path, name)
if (!overwrite && base::file.exists(file_name)){
stop(str_interp("File already exists at '${file_name}'. Set parameter overwrite=TRUE to overwrite existing files."))
Expand All @@ -54,7 +54,7 @@ File <- setRefClass(
on.exit(close(con))
}

res <- make_request(method="GET", path=str_interp("/rawFiles/${id}"), parse_response=FALSE, get_download_path_callback=get_download_path_callback, stream_callback=stream_callback)
res <- make_request(method="GET", path=str_interp("/rawFiles/${id}"), query=list(allowRedirect="true"), parse_response=FALSE, get_download_path_callback=get_download_path_callback, stream_callback=stream_callback)
if (is_dir){
return(res)
} else {
Expand Down
8 changes: 5 additions & 3 deletions R/Table.R
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ Table <- setRefClass("Table",
if (progress){
progressr::with_progress(perform_table_parallel_file_download(df[[file_id_variable]], path, overwrite))
} else {
perform_parallel_file_download(df[[file_id_variable]], path, overwrite)
perform_table_parallel_file_download(df[[file_id_variable]], path, overwrite)
}
}
)
Expand All @@ -304,16 +304,18 @@ Table <- setRefClass("Table",
#' @importFrom future plan multicore multisession sequential
#' @importFrom parallelly supportsMulticore
#' @importFrom progressr progressor
#' @importFrom purrr map
perform_table_parallel_file_download <- function(vec, path, overwrite){
pb <- progressr::progressor(steps = length(vec))
download_paths <- list()
get_download_path_from_headers <- function(headers){
file_path <- base::file.path(path, headers$'x-redivis-filename')
name <- gsub('^"|"$', '', sub(".*filename=", "", headers$'content-disposition'))
file_path <- base::file.path(path, name)
download_paths <<- append(download_paths, file_path)
return(file_path)
}
perform_parallel_download(
purrr::map(vec, function(id){str_interp("/rawFiles/${id}")}),
purrr::map(vec, function(id){str_interp("/rawFiles/${id}?allowRedirect=true")}),
overwrite=overwrite,
get_download_path_from_headers=get_download_path_from_headers,
on_finish=function(){pb(1)}
Expand Down
5 changes: 3 additions & 2 deletions R/api_request.R
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ make_request <- function(method='GET', query=NULL, payload = NULL, parse_respons
}
}

#' @importFrom purrr map set_names
parse_curl_headers <- function(res_data){
vec <- curl::parse_headers(res_data$headers)

Expand All @@ -133,14 +134,14 @@ parse_curl_headers <- function(res_data){

#' @import curl
perform_parallel_download <- function(paths, overwrite, get_download_path_from_headers, on_finish, stop_on_error=TRUE){
pool <- curl::new_pool(total_con = 100, host_con = 20, multiplex = TRUE)
pool <- curl::new_pool()
handles = list()
for (path in paths){
h <- curl::new_handle()
url <- generate_api_url(path)
auth = get_authorization_header()
curl::handle_setheaders(h, "Authorization"=auth[[1]])
curl::handle_setopt(h, "url"=url, buffersize=1048576) # 1MB buffer
curl::handle_setopt(h, "url"=url)

fail_fn <- function(e){
print(e)
Expand Down

0 comments on commit 544d8b7

Please sign in to comment.