Skip to content

Commit

Permalink
fix: parallel download (#13)
Browse files Browse the repository at this point in the history
* chore: add logs for debugging

* fix: trailing slash in path

* chore: add debug print

* chore: add more debug logs

* fix: always lowercase curl headers

* chore: remove console.log

* chore: bump version
  • Loading branch information
imathews authored Mar 12, 2024
1 parent 832e9f5 commit 56c4247
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 39 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: redivis
Type: Package
Title: R interface for Redivis
Version: 0.6.5
Version: 0.6.6
Author: Redivis Inc.
Maintainer: Redivis <[email protected]>
Description: Supports working with Redivis datasets and tables through R.
Expand Down
4 changes: 2 additions & 2 deletions R/Table.R
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ Table <- setRefClass("Table",
},

download_files = function(path = getwd(), overwrite = FALSE, max_results = NULL, file_id_variable = NULL, progress=TRUE){
if (!endsWith(path, '/')) {
path = str_interp("${path}/")
if (endsWith(path, '/')) {
path <- str_sub(path,1,nchar(path)-1) # remove trailing "/", as this screws up file.path()
}

if (is.null(file_id_variable)){
Expand Down
70 changes: 34 additions & 36 deletions R/api_request.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ parse_curl_headers <- function(res_data){
vec <- curl::parse_headers(res_data$headers)

header_names <- purrr::map(vec, function(header) {
strsplit(header, ':')[[1]][[1]]
tolower(strsplit(header, ':')[[1]][[1]])
})
headers <- purrr::map(vec, function(header) {
split = strsplit(header, ':\\s+')[[1]]
Expand All @@ -142,49 +142,47 @@ perform_parallel_download <- function(paths, overwrite, get_download_path_from_h
curl::handle_setheaders(h, "Authorization"=auth[[1]])
curl::handle_setopt(h, "url"=url)

make_data_fn <- function(h, url, get_download_path_from_headers, overwrite, on_finish){
file_con <- NULL
handle <- h
path <- url
return(function(chunk, final){
if (is.null(file_con)){
res_data <- curl::handle_data(handle)
status_code <- res_data$status_code
if (status_code >= 400){
if (stop_on_error){
stop(str_interp("Received HTTP status ${status_code} for path ${path}"))
} else {
return(NULL)
}
}

headers <- parse_curl_headers(res_data)

download_path <- get_download_path_from_headers(headers)
if (!overwrite && base::file.exists(download_path)){
stop(str_interp("File already exists at '${download_path}'. Set parameter overwrite=TRUE to overwrite existing files."))
}
file_con <<- base::file(download_path, "w+b")
}
if (length(chunk)){
writeBin(chunk, file_con)
}
if (final){
on_finish()
close(file_con)
}
})
}

fail_fn <- function(e){
print(e)
stop(e)
}
curl::multi_add(h, fail = fail_fn, data = make_data_fn(h, url, get_download_path_from_headers, overwrite, on_finish), pool = pool)
curl::multi_add(h, fail = fail_fn, data = parallel_download_data_cb_factory(h, url, get_download_path_from_headers, overwrite, on_finish), pool = pool)
}
curl::multi_run(pool=pool)
}

parallel_download_data_cb_factory <- function(h, url, get_download_path_from_headers, overwrite, on_finish){
file_con <- NULL
handle <- h
path <- url
return(function(chunk, final){
if (is.null(file_con)){
res_data <- curl::handle_data(handle)
status_code <- res_data$status_code
if (status_code >= 400){
if (stop_on_error){
stop(str_interp("Received HTTP status ${status_code} for path ${path}"))
} else {
return(NULL)
}
}

headers <- parse_curl_headers(res_data)
download_path <- get_download_path_from_headers(headers)
if (!overwrite && base::file.exists(download_path)){
stop(str_interp("File already exists at '${download_path}'. Set parameter overwrite=TRUE to overwrite existing files."))
}
file_con <<- base::file(download_path, "w+b")
}
if (length(chunk)){
writeBin(chunk, file_con)
}
if (final){
on_finish()
close(file_con)
}
})
}


make_paginated_request <- function(path, query=list(), page_size=100, max_results=NULL){
Expand Down

0 comments on commit 56c4247

Please sign in to comment.