Skip to content

Commit

Permalink
fix: implement retry handling when streaming rows to handle broken co…
Browse files Browse the repository at this point in the history
…nnections
  • Loading branch information
imathews committed Apr 24, 2024
1 parent db7bb53 commit 58e0a4f
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 140 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: redivis
Type: Package
Title: R interface for Redivis
Version: 0.6.16
Version: 0.6.17
Author: Redivis Inc.
Maintainer: Redivis <[email protected]>
Description: Supports working with Redivis datasets and tables through R.
Expand Down
264 changes: 125 additions & 139 deletions R/api_request.R
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,9 @@ parallel_stream_arrow <- function(folder, streams, max_results, variables, coerc
headers <- get_authorization_header()
worker_count <- length(streams)

is_multisession <- FALSE

if (parallelly::supportsMulticore()){
oplan <- future::plan(future::multicore, workers = worker_count)
} else {
is_multisession <- TRUE
oplan <- future::plan(future::multisession, workers = worker_count)
# Helpful for testing in dev
# oplan <- future::plan(future::sequential)
Expand All @@ -397,14 +394,8 @@ parallel_stream_arrow <- function(folder, streams, max_results, variables, coerc
on.exit(plan(oplan), add = TRUE)
base_url = generate_api_url('/readStreams')

# schema <- get_arrow_schema(variables)

results <- furrr::future_map(streams, function(stream){
# IMPORTANT: we can't serialize an arrow schema across processes under multisession, so need to generate in each worker instead
# if (is_multisession){
process_arrow_stream <- function(stream, in_memory_batches=c(), stream_writer=NULL, output_file=NULL, stream_rows_read=0, retry_count=0){
schema <- get_arrow_schema(variables)
# }

schema_field_uncased_name_map <- sapply(schema$names, tolower)
schema_field_uncased_name_map <- set_names(names(schema_field_uncased_name_map), schema_field_uncased_name_map)

Expand All @@ -420,163 +411,158 @@ parallel_stream_arrow <- function(folder, streams, max_results, variables, coerc
# open(con, "rb")

# This ensures the url method doesn't time out after 60s. Only applies to this function, doesn't set globally
options(timeout=3600)
con <- url(str_interp('${base_url}/${stream$id}'), open = "rb", headers = headers, blocking=FALSE)

on.exit(close(con))
stream_reader <- arrow::RecordBatchStreamReader$create(getNamespace("arrow")$MakeRConnectionInputStream(con))

output_file <- NULL
in_memory_batches <- NULL
if (is.null(folder)){
in_memory_batches <- c()
} else {
output_file <- arrow::FileOutputStream$create(str_interp('${folder}/${stream$id}'))
}
tryCatch({
options(timeout=3600)
con <- url(str_interp('${base_url}/${stream$id}?offset=${stream_rows_read}'), open = "rb", headers = headers, blocking=FALSE)
on.exit(close(con))
stream_reader <- arrow::RecordBatchStreamReader$create(getNamespace("arrow")$MakeRConnectionInputStream(con))

fields_to_rename <- list()
fields_to_add <- list()
should_reorder_fields <- FALSE
time_variables_to_coerce = c()

i <- 0
# Make sure to first only get the fields in the reader, to handle when reading from an unreleased table made up of uploads with inconsistent variables
for (field in stream_reader$schema$fields){
i <- i+1
final_schema_field_name <- schema_field_uncased_name_map[[tolower(field$name)]]

if (final_schema_field_name != field$name){
stream_reader_schema <- stream_reader$schema
rename_config <- set_names(list(list(new_name=final_schema_field_name, index=i)), c(final_schema_field_name))
fields_to_rename <- append(fields_to_rename, rename_config)
}
if (coerce_schema && is(field$type, "Time64")){
time_variables_to_coerce <- append(time_variables_to_coerce, field$name)
if (!is.null(folder) && is.null(output_file)){
output_file <- arrow::FileOutputStream$create(str_interp('${folder}/${stream$id}'))
}
if (!should_reorder_fields && i != match(final_schema_field_name, names(schema))){
should_reorder_fields <- TRUE

fields_to_rename <- list()
fields_to_add <- list()
should_reorder_fields <- FALSE
time_variables_to_coerce = c()

i <- 0
# Make sure to first only get the fields in the reader, to handle when reading from an unreleased table made up of uploads with inconsistent variables
for (field in stream_reader$schema$fields){
i <- i+1
final_schema_field_name <- schema_field_uncased_name_map[[tolower(field$name)]]

if (final_schema_field_name != field$name){
stream_reader_schema <- stream_reader$schema
rename_config <- set_names(list(list(new_name=final_schema_field_name, index=i)), c(final_schema_field_name))
fields_to_rename <- append(fields_to_rename, rename_config)
}
if (coerce_schema && is(field$type, "Time64")){
time_variables_to_coerce <- append(time_variables_to_coerce, field$name)
}
if (!should_reorder_fields && i != match(final_schema_field_name, names(schema))){
should_reorder_fields <- TRUE
}
}
}

for (field_name in schema$names){
if (is.null(stream_reader$schema$GetFieldByName(field_name)) && is.null(fields_to_rename[[field_name]])){
fields_to_add <- append(fields_to_add, schema$GetFieldByName(field_name))
for (field_name in schema$names){
if (is.null(stream_reader$schema$GetFieldByName(field_name)) && is.null(fields_to_rename[[field_name]])){
fields_to_add <- append(fields_to_add, schema$GetFieldByName(field_name))
}
}
}

if (should_reorder_fields && length(fields_to_add)){
should_reorder_fields <- TRUE
}
if (should_reorder_fields && length(fields_to_add)){
should_reorder_fields <- TRUE
}

stream_writer <- NULL
last_measured_time <- Sys.time()
current_progress_rows <- 0

current_progress_rows <- 0
last_measured_time <- Sys.time()
while (TRUE){
batch <- stream_reader$read_next_batch()
if (is.null(batch)){
break
} else {
current_progress_rows = current_progress_rows + batch$num_rows
stream_rows_read = stream_rows_read + batch$num_rows

# We need to coerce_schema for all dataset tables, since their underlying storage type is always a string
if (coerce_schema){
# Note: this approach is much more performant than using %>% mutate(across())
# TODO: in the future, Arrow may support native coversion from time string to their type
# To test if supported: arrow::arrow_array(rep('10:30:04.123', 2))$cast(arrow::time64(unit="us"))
for (time_variable in time_variables_to_coerce){
batch[[time_variable]] <- arrow::arrow_array(stringr::str_c('2000-01-01T', batch[[time_variable]]$as_vector()))$cast(arrow::timestamp(unit='us'))
}
for (rename_args in fields_to_rename){
names(batch)[[rename_args$index]] <- rename_args$new_name
}
# TODO: this is a significant bottleneck. Can we make it faster, maybe call all at once?
for (field in fields_to_add){
if (is(field$type, "Date32")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_integer_, batch$num_rows))$cast(arrow::date32()))
} else if (is(field$type, "Time64")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_integer_, batch$num_rows))$cast(arrow::int64())$cast(arrow::time64()))
} else if (is(field$type, "Float64")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_real_, batch$num_rows)))
} else if (is(field$type, "Boolean")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA, batch$num_rows)))
} else if (is(field$type, "Int64")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_integer_, batch$num_rows))$cast(arrow::int64()))
} else if (is(field$type, "Timestamp")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_integer_, batch$num_rows))$cast(arrow::int64())$cast(arrow::timestamp(unit="us", timezone="")))
} else {
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_character_, batch$num_rows)))
}
}

while (TRUE){
batch <- stream_reader$read_next_batch()
if (is.null(batch)){
break
} else {
current_progress_rows = current_progress_rows + batch$num_rows
if (should_reorder_fields){
batch <- batch[, names(schema)] # reorder fields
}

# We need to coerce_schema for all dataset tables, since their underlying storage type is always a string
if (coerce_schema){
# Note: this approach is much more performant than using %>% mutate(across())
# TODO: in the future, Arrow may support native coversion from time string to their type
# To test if supported: arrow::arrow_array(rep('10:30:04.123', 2))$cast(arrow::time64(unit="us"))
for (time_variable in time_variables_to_coerce){
batch[[time_variable]] <- arrow::arrow_array(stringr::str_c('2000-01-01T', batch[[time_variable]]$as_vector()))$cast(arrow::timestamp(unit='us'))
batch <- arrow::as_record_batch(batch, schema=schema)
} else if (should_reorder_fields){
batch <- batch[, names(schema)] # reorder fields
batch <- arrow::as_record_batch(batch, schema=schema)
}
for (rename_args in fields_to_rename){
names(batch)[[rename_args$index]] <- rename_args$new_name

if (!is.null(batch_preprocessor)){
batch <- batch_preprocessor(batch)
}
# TODO: this is a significant bottleneck. Can we make it faster, maybe call all at once?
for (field in fields_to_add){
if (is(field$type, "Date32")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_integer_, batch$num_rows))$cast(arrow::date32()))
} else if (is(field$type, "Time64")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_integer_, batch$num_rows))$cast(arrow::int64())$cast(arrow::time64()))
} else if (is(field$type, "Float64")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_real_, batch$num_rows)))
} else if (is(field$type, "Boolean")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA, batch$num_rows)))
} else if (is(field$type, "Int64")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_integer_, batch$num_rows))$cast(arrow::int64()))
} else if (is(field$type, "Timestamp")){
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_integer_, batch$num_rows))$cast(arrow::int64())$cast(arrow::timestamp(unit="us", timezone="")))

if (!is.null(batch)){
if (!is.null(output_file)){
if (is.null(stream_writer)){
stream_writer <- arrow::RecordBatchFileWriter$create(output_file, schema=if (is.null(batch_preprocessor)) schema else batch$schema)
}
stream_writer$write_batch(batch)
} else {
batch <- batch$AddColumn(0, field, arrow::arrow_array(rep(NA_character_, batch$num_rows)))
in_memory_batches <- c(in_memory_batches, batch)
}
}

if (should_reorder_fields){
batch <- batch[, names(schema)] # reorder fields
if (Sys.time() - last_measured_time > 0.2){
pb(amount = current_progress_rows)
current_progress_rows <- 0
last_measured_time = Sys.time()
}

batch <- arrow::as_record_batch(batch, schema=schema)
} else if (should_reorder_fields){
batch <- batch[, names(schema)] # reorder fields
batch <- arrow::as_record_batch(batch, schema=schema)
}
}

if (!is.null(batch_preprocessor)){
batch <- batch_preprocessor(batch)
}
pb(amount = current_progress_rows)

if (!is.null(batch)){
if (!is.null(output_file)){
if (is.null(stream_writer)){
stream_writer <- arrow::RecordBatchFileWriter$create(output_file, schema=if (is.null(batch_preprocessor)) schema else batch$schema)
}
stream_writer$write_batch(batch)
} else {
in_memory_batches <- c(in_memory_batches, batch)
# in_memory_batches <- append(in_memory_batches, list(batch$serialize()))
}
if (is.null(output_file)){
# Need to serialize the table to pass between threads
table <- do.call(arrow::arrow_table, in_memory_batches)
serialized <- arrow::write_to_raw(table, format="stream") # stream is much faster to read
return(serialized)
} else {
if (!is.null(stream_writer)){
stream_writer$close()
}

if (Sys.time() - last_measured_time > 0.2){
pb(amount = current_progress_rows)
current_progress_rows <- 0
last_measured_time = Sys.time()
output_file$close()
}
}, error = function(cond){
if (grepl("cannot read from connection", conditionMessage(cond))){
if (retry_count > 20){
message("Download connection failed after too many retries, giving up.")
stop(conditionMessage(cond))
}
Sys.sleep(1)
return(process_arrow_stream(stream, in_memory_batches, stream_writer, output_file, stream_rows_read, retry_count=retry_count+1))
} else {
stop(conditionMessage(cond))
}
}

pb(amount = current_progress_rows)

if (is.null(output_file)){
# Need to serialize the table to pass between threads
# t = Sys.time()
# print("serializing")
})

# TODO: it would be better to call batch$serialize() on all the individual batches,
# but can't figure out a performant way to repeatedly increment a large raw vector, and also to deserialize these batches on the other
# This would spread the overhead of serialization across each read operation, where the bottleneck is network throughput anyway.
# serialized <- unlist(in_memory_batches, FALSE, FALSE)

table <- do.call(arrow::arrow_table, in_memory_batches)
serialized <- arrow::write_to_raw(table, format="stream") # stream is much faster to read

# print(Sys.time() - t)
}

return(serialized)
} else {
if (!is.null(stream_writer)){
stream_writer$close()
}
output_file$close()
}
results <- furrr::future_map(streams, function(stream){
process_arrow_stream(stream)
})

if (is.null(folder)){
# t = Sys.time()
# print("got records")
# table <- do.call(arrow::arrow_table, sapply(results, function(x) arrow::record_batch(x, schema=schema)))
table <- do.call(arrow::concat_tables, sapply(results, function(x) read_ipc_stream(x, as_data_frame = FALSE)))
# print(Sys.time() - t)
return(table)
return(do.call(arrow::concat_tables, sapply(results, function(x) read_ipc_stream(x, as_data_frame = FALSE))))
}

}

0 comments on commit 58e0a4f

Please sign in to comment.