Skip to content

Commit

Permalink
fix: resolve issue with parallelization in R Studio
Browse files Browse the repository at this point in the history
  • Loading branch information
imathews committed Mar 13, 2024
1 parent 05b1017 commit 5e224fd
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 55 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: redivis
Type: Package
Title: R interface for Redivis
Version: 0.6.6
Version: 0.6.7
Author: Redivis Inc.
Maintainer: Redivis <[email protected]>
Description: Supports working with Redivis datasets and tables through R.
Expand Down
36 changes: 17 additions & 19 deletions R/Query.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ Query <- setRefClass("Query",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'arrow_dataset',
schema = params$schema,
variables = params$variables,
progress = progress,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
Expand All @@ -39,9 +39,9 @@ Query <- setRefClass("Query",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'arrow_table',
schema = params$schema,
variables = params$variables,
progress=progress,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
Expand All @@ -54,10 +54,10 @@ Query <- setRefClass("Query",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'arrow_stream',
progress=progress,
schema = params$schema,
variables = params$variables,
progress = progress,
coerce_schema = params$coerce_schema
)
Expand All @@ -73,10 +73,10 @@ Query <- setRefClass("Query",
df <- make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'tibble',
progress=progress,
schema = params$schema,
variables = params$variables,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
)
Expand All @@ -98,9 +98,9 @@ Query <- setRefClass("Query",
df <- make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'tibble',
schema = params$schema,
variables = params$variables,
progress=progress,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
Expand All @@ -115,10 +115,10 @@ Query <- setRefClass("Query",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'data_frame',
progress=progress,
schema = params$schema,
variables = params$variables,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
)
Expand All @@ -130,10 +130,10 @@ Query <- setRefClass("Query",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'data_table',
progress=progress,
schema = params$schema,
variables = params$variables,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
)
Expand Down Expand Up @@ -163,8 +163,7 @@ get_query_request_params = function(self, max_results, variables, geography_vari
)
}

selected_variables <- if (is.null(variables)) NULL else Map(function(variable_name) variable_name, variables)
schema <- get_arrow_schema(variables_list)
selected_variable_names <- if (is.null(variables)) NULL else Map(function(variable_name) variable_name, variables)

if (!is.null(geography_variable) && geography_variable == ''){
geography_variable = NULL
Expand All @@ -179,9 +178,8 @@ get_query_request_params = function(self, max_results, variables, geography_vari
list(
"max_results" = max_results,
"uri" = uri,
"selected_variables" = selected_variables,
"variables_list" = variables_list,
"schema"=schema,
"selected_variable_names" = selected_variable_names,
"variables" = variables_list,
"geography_variable"=geography_variable,
"coerce_schema"=FALSE
)
Expand Down
44 changes: 21 additions & 23 deletions R/Table.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ Table <- setRefClass("Table",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'arrow_dataset',
schema = params$schema,
variables = params$variables,
progress = progress,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
Expand All @@ -139,10 +139,10 @@ Table <- setRefClass("Table",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'arrow_table',
progress=progress,
schema = params$schema,
variables = params$variables,
progress = progress,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
Expand All @@ -155,9 +155,9 @@ Table <- setRefClass("Table",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'arrow_stream',
schema = params$schema,
variables = params$variables,
progress = progress,
coerce_schema = params$coerce_schema
)
Expand All @@ -173,9 +173,9 @@ Table <- setRefClass("Table",
df <- make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'tibble',
schema = params$schema,
variables = params$variables,
progress = progress,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
Expand All @@ -198,9 +198,9 @@ Table <- setRefClass("Table",
df <- make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'tibble',
schema = params$schema,
variables = params$variables,
progress = progress,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
Expand All @@ -215,9 +215,9 @@ Table <- setRefClass("Table",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'data_frame',
schema = params$schema,
variables = params$variables,
progress = progress,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
Expand All @@ -230,9 +230,9 @@ Table <- setRefClass("Table",
make_rows_request(
uri=params$uri,
max_results=params$max_results,
selected_variables = params$selected_variables,
selected_variable_names = params$selected_variable_names,
type = 'data_table',
schema = params$schema,
variables = params$variables,
progress = progress,
coerce_schema = params$coerce_schema,
batch_preprocessor = batch_preprocessor
Expand All @@ -254,9 +254,9 @@ Table <- setRefClass("Table",
df <- make_rows_request(
uri=.self$uri,
max_results=max_results,
selected_variables = list(file_id_variable),
selected_variable_names = list(file_id_variable),
type='data_table',
schema=arrow::schema(arrow::field(file_id_variable, string())),
variables=list(list(name=file_id_variable, type="string")),
progress=FALSE
)
purrr::map(df[[file_id_variable]], function(id) {
Expand All @@ -283,9 +283,9 @@ Table <- setRefClass("Table",
df <- make_rows_request(
uri=.self$uri,
max_results=max_results,
selected_variables = list(file_id_variable),
selected_variable_names = list(file_id_variable),
type='data_table',
schema=arrow::schema(arrow::field(file_id_variable, string())),
variables=list(list(name=file_id_variable, type="string")),
progress=FALSE
)

Expand Down Expand Up @@ -352,8 +352,7 @@ get_table_request_params = function(self, max_results, variables, geography_vari
self$get()
}

selected_variables <- if (is.null(variables)) NULL else Map(function(variable_name) variable_name, variables)
schema <- get_arrow_schema(variables_list)
selected_variable_names <- if (is.null(variables)) NULL else Map(function(variable_name) variable_name, variables)

if (!is.null(geography_variable) && geography_variable == ''){
geography_variable = NULL
Expand All @@ -368,9 +367,8 @@ get_table_request_params = function(self, max_results, variables, geography_vari
list(
"max_results" = max_results,
"uri" = self$uri,
"selected_variables" = selected_variables,
"variables_list" = variables_list,
"schema"=schema,
"selected_variable_names" = selected_variable_names,
"variables"=variables_list,
"geography_variable"=geography_variable,
"coerce_schema"=self$properties$container$kind == 'dataset'
)
Expand Down
32 changes: 20 additions & 12 deletions R/api_request.R
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,14 @@ RedivisBatchReader <- setRefClass(
#' @importFrom arrow open_dataset Scanner
#' @importFrom uuid UUIDgenerate
#' @importFrom progressr progress with_progress
make_rows_request <- function(uri, max_results=NULL, selected_variables = NULL, type = 'tibble', schema = NULL, progress = TRUE, coerce_schema=FALSE, batch_preprocessor=NULL){
make_rows_request <- function(uri, max_results=NULL, selected_variable_names = NULL, type = 'tibble', variables = NULL, progress = TRUE, coerce_schema=FALSE, batch_preprocessor=NULL){
read_session <- make_request(
method="post",
path=str_interp("${uri}/readSessions"),
parse_response=TRUE,
payload=list(
"maxResults"=max_results,
"selectedVariables"=selected_variables,
"selectedVariables"=selected_variable_names,
"format"="arrow",
"requestedStreamCount"=if (type == 'arrow_stream') 1 else parallelly::availableCores()
)
Expand All @@ -347,7 +347,7 @@ make_rows_request <- function(uri, max_results=NULL, selected_variables = NULL,
current_stream_index=1,
date_variables=list(),
time_variables=list(),
custom_classes=list(schema=schema, progressor=p),
custom_classes=list(schema=get_arrow_schema(variables), progressor=p),
last_progressed_time=Sys.time(),
current_progress_rows=0
)
Expand All @@ -363,12 +363,12 @@ make_rows_request <- function(uri, max_results=NULL, selected_variables = NULL,
}

if (progress){
progressr::with_progress(parallel_stream_arrow(folder, read_session$streams, max_results=read_session$numRows, schema, coerce_schema, batch_preprocessor))
progressr::with_progress(parallel_stream_arrow(folder, read_session$streams, max_results=read_session$numRows, variables, coerce_schema, batch_preprocessor))
} else {
parallel_stream_arrow(folder, read_session$streams, max_results=read_session$numRows, schema, coerce_schema, batch_preprocessor)
parallel_stream_arrow(folder, read_session$streams, max_results=read_session$numRows, variables, coerce_schema, batch_preprocessor)
}

arrow_dataset <- arrow::open_dataset(folder, format = "feather", schema = if (is.null(batch_preprocessor)) schema else NULL)
arrow_dataset <- arrow::open_dataset(folder, format = "feather", schema = if (is.null(batch_preprocessor)) get_arrow_schema(variables) else NULL)

# TODO: remove head() once BE is sorted
if (type == 'arrow_dataset'){
Expand Down Expand Up @@ -407,28 +407,38 @@ get_authorization_header <- function(){
#' @importFrom stringr str_c
#' @import arrow
#' @import dplyr
parallel_stream_arrow <- function(folder, streams, max_results, schema, coerce_schema, batch_preprocessor){
parallel_stream_arrow <- function(folder, streams, max_results, variables, coerce_schema, batch_preprocessor){
pb <- progressr::progressor(steps = max_results)
# pb <- txtProgressBar(0, max_results, style = 3)
headers <- get_authorization_header()
worker_count <- length(streams)

is_multisession <- FALSE

if (parallelly::supportsMulticore()){
oplan <- future::plan(future::multicore, workers = worker_count)
} else {
# oplan <- future::plan(future::sequential)
is_multisession <- TRUE
oplan <- future::plan(future::multisession, workers = worker_count)
}

# This avoids overwriting any future strategy that may have been set by the user, resetting on exit
on.exit(plan(oplan), add = TRUE)
base_url = generate_api_url('/readStreams')

schema <- NULL

# IMPORTANT: we can't serialize an arrow schema across processes under multisession, so need to generate in each worker instead
if (is_multisession == FALSE){
schema <- get_arrow_schema(variables)
}

furrr::future_map(streams, function(stream){
# for (stream in streams){
output_file_path <- str_interp('${folder}/${stream$id}')

if (is_multisession){
schema <- get_arrow_schema(variables)
}

# This ensures the url method doesn't time out after 60s. Only applies to this function, doesn't set globally
options(timeout=3600)
con <- url(str_interp('${base_url}/${stream$id}'), open = "rb", headers = headers, blocking=FALSE)
Expand Down Expand Up @@ -506,10 +516,8 @@ parallel_stream_arrow <- function(folder, streams, max_results, schema, coerce_s
}

pb(amount = current_progress_rows)
# setTxtProgressBar(pb, current_progress_rows)

stream_writer$close()
output_file$close()
# }
})
}

0 comments on commit 5e224fd

Please sign in to comment.