Skip to content


apacheGH-34437: [R] Use FetchNode and OrderByNode (apache#34685)
Browse files Browse the repository at this point in the history
### Rationale for this change

See also apache#32991. By using the new nodes, we're closer to having all dplyr query business happening inside the ExecPlan. Unfortunately, there are still two cases where we have to apply operations in R after running a query:

* apache#34941: Taking head/tail on unordered data, which has non-deterministic results but that should be possible, in the case where the user wants to see a slice of the result, any slice
* apache#34942: Implementing tail in the FetchNode or similar would enable removing more hacks and workarounds.

Once those are resolved, we can simply further and then move to the new Declaration class.

### What changes are included in this PR?

This removes the use of different SinkNodes and many R-specific workarounds to support sorting and head/tail, so *almost* 
everything we do in a query should be represented in an ExecPlan. 

### Are these changes tested?

Yes. This is mostly an internal refactor, but behavior changes are accompanied by test updates.

### Are there any user-facing changes?

The `show_query()` method will print slightly different ExecPlans. In many cases, they will be more informative. 

`tail()` now actually returns the tail of the data in cases where the data has an implicit order (currently only in-memory tables). Previously it was non-deterministic (and would return the head or some other slice of the data).

When printing query objects that include `summarize()` when the `arrow.summarize.sort = TRUE` option is set, the sorting is correctly printed.

It's unclear if there should be changes in performance; running benchmarks would be good but it's also not clear that our benchmarks cover all affected scenarios. 

* Closes: apache#34437
* Closes: apache#31980
* Closes: apache#31982

Authored-by: Neal Richardson <[email protected]>
Signed-off-by: Nic Crane <[email protected]>
  • Loading branch information
nealrichardson authored and liujiacheng777 committed May 11, 2023
1 parent 8db5c04 commit 590f98c
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 228 deletions.
16 changes: 14 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions r/R/dplyr-summarize.R
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ do_arrow_summarize <- function(.data, ..., .groups = NULL) {
stop(paste("Invalid .groups argument:", .groups))
out$drop_empty_groups <- .data$drop_empty_groups
if (getOption("arrow.summarise.sort", FALSE)) {
# Add sorting instructions for the rows to match dplyr
out$arrange_vars <- .data$selected_columns[.data$group_by_vars]
out$arrange_desc <- rep(FALSE, length(.data$group_by_vars))
Expand Down
35 changes: 22 additions & 13 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,7 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) {
#' mutate(x = gear / carb) %>%
#' show_exec_plan()
show_exec_plan <- function(x) {
adq <- as_adq(x)

# do not show the plan if we have a nested query (as this will force the
# evaluation of the inner query/queries)
# TODO see if we can remove after ARROW-16628
if (is_collapsed(x) && has_head_tail(x$.data)) {
warn("The `ExecPlan` cannot be printed for a nested query.")

result <- as_record_batch_reader(adq)
result <- as_record_batch_reader(as_adq(x))
plan <- result$Plan()
Expand Down Expand Up @@ -419,6 +409,25 @@ query_can_stream <- function(x) {

is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query")

has_head_tail <- function(x) {
!is.null(x$head) || !is.null(x$tail) || (is_collapsed(x) && has_head_tail(x$.data))
has_unordered_head <- function(x) {
if (is.null(x$head %||% x$tail)) {
# no head/tail

has_order <- function(x) {
length(x$arrange_vars) > 0 ||
has_implicit_order(x) ||
(is_collapsed(x) && has_order(x$.data))

has_implicit_order <- function(x) {
# Approximate what ExecNode$has_ordered_batches() would return (w/o building ExecPlan)
# An in-memory table has an implicit order
# TODO(GH-34698): FileSystemDataset and RecordBatchReader will have implicit order
inherits(x$.data, "ArrowTabular") &&
# But joins, aggregations, etc. will result in non-deterministic order
is.null(x$aggregations) && is.null(x$join) && is.null(x$union_all)
158 changes: 77 additions & 81 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ ExecPlan <- R6Class("ExecPlan",

if (is_collapsed(.data)) {
# We have a nested query.
if (has_head_tail(.data$.data)) {
# head and tail are not ExecNodes; at best we can handle them via
# SinkNode, so if there are any steps done after head/tail, we need to
# evaluate the query up to then and then do a new query for the rest.
# as_record_batch_reader() will build and run an ExecPlan
if (has_unordered_head(.data$.data)) {
# TODO(GH-34941): FetchNode should do non-deterministic fetch
# Instead, we need to evaluate the query up to here,
# and then do a new query for the rest.
# as_record_batch_reader() will build and run an ExecPlan and do head() on it
reader <- as_record_batch_reader(.data$.data)
node <- self$SourceNode(reader)
Expand Down Expand Up @@ -126,15 +126,6 @@ ExecPlan <- R6Class("ExecPlan",
options = .data$aggregations,
key_names = group_vars

if (grouped && getOption("arrow.summarise.sort", FALSE)) {
# Add sorting instructions for the rows too to match dplyr
# (see below about why sorting isn't itself a Node)
node$extras$sort <- list(
names = group_vars,
orders = rep(0L, length(group_vars))
} else {
# If any columns are derived, reordered, or renamed we need to Project
# If there are aggregations, the projection was already handled above.
Expand Down Expand Up @@ -166,82 +157,81 @@ ExecPlan <- R6Class("ExecPlan",

# Apply sorting: this is currently not an ExecNode itself, it is a
# sink node option.
# TODO: handle some cases:
# (1) arrange > summarize > arrange
# (2) ARROW-13779: arrange then operation where order matters (e.g. cumsum)
# Apply sorting and head/tail
head_or_tail <- .data$head %||% .data$tail
if (length(.data$arrange_vars)) {
node$extras$sort <- list(
if (!is.null(.data$tail)) {
# Handle tail first: Reverse sort, take head
# TODO(GH-34942): FetchNode support for tail
node <- node$OrderBy(list(
names = names(.data$arrange_vars),
orders = as.integer(!.data$arrange_desc)
node <- node$Fetch(.data$tail)
# Apply sorting
node <- node$OrderBy(list(
names = names(.data$arrange_vars),
orders = .data$arrange_desc,
temp_columns = names(.data$temp_columns)
# This is only safe because we are going to evaluate queries that end
# with head/tail first, then evaluate any subsequent query as a new query
if (!is.null(.data$head)) {
node$extras$head <- .data$head
if (!is.null(.data$tail)) {
node$extras$tail <- .data$tail
orders = as.integer(.data$arrange_desc)

if (length(.data$temp_columns)) {
# If we sorted on ad-hoc derived columns, Project to drop them
temp_schema <- node$schema
cols_to_keep <- setdiff(names(temp_schema), names(.data$temp_columns))
node <- node$Project(make_field_refs(cols_to_keep))

if (!is.null(.data$head)) {
# Take the head now
node <- node$Fetch(.data$head)
} else if (!is.null(head_or_tail)) {
# Unsorted head/tail
# Handle a couple of special cases here:
if (node$has_ordered_batches()) {
# Data that has order, even implicit order from an in-memory table, is supported
# in FetchNode
if (!is.null(.data$head)) {
node <- node$Fetch(.data$head)
} else {
# TODO(GH-34942): FetchNode support for tail
# FetchNode currently doesn't support tail, but it has limit + offset
# So if we know how many rows the query will result in, we can offset
data_without_tail <- .data
data_without_tail$tail <- NULL
row_count <- nrow(data_without_tail)
if (! {
node <- node$Fetch(.data$tail, offset = row_count - .data$tail)
} else {
# Workaround: non-deterministic tail
node$extras$slice_size <- head_or_tail
} else {
# TODO(GH-34941): non-deterministic FetchNode
# Data has non-deterministic order, so head/tail means "just show me any N rows"
# FetchNode does not support non-deterministic scans, so we have to handle outside
node$extras$slice_size <- head_or_tail
Run = function(node) {
assert_is(node, "ExecNode")

# Sorting and head/tail (if sorted) are handled in the SinkNode,
# created in ExecPlan_build
sorting <- node$extras$sort %||% list()
select_k <- node$extras$head %||% -1L
has_sorting <- length(sorting) > 0
if (has_sorting) {
if (!is.null(node$extras$tail)) {
# Reverse the sort order and take the top K, then after we'll reverse
# the resulting rows so that it is ordered as expected
sorting$orders <- !sorting$orders
select_k <- node$extras$tail
sorting$orders <- as.integer(sorting$orders)

out <- ExecPlan_run(

if (!has_sorting) {
# Since ExecPlans don't scan in deterministic order, head/tail are both
if (!is.null(node$extras$slice_size)) {
# For non-deterministic scans, head/tail are
# essentially taking a random slice from somewhere in the dataset.
# And since the head() implementation is way more efficient than tail(),
# just use it to take the random slice
# TODO(ARROW-16628): handle limit in ExecNode
slice_size <- node$extras$head %||% node$extras$tail
if (!is.null(slice_size)) {
out <- head(out, slice_size)
} else if (!is.null(node$extras$tail)) {
# TODO(ARROW-16630): proper BottomK support
# Reverse the row order to get back what we expect
out <- as_arrow_table(out)
out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
out <- as_record_batch_reader(out)

# If arrange() created $temp_columns, make sure to omit them from the result
# We can't currently handle this in ExecPlan_run itself because sorting
# happens in the end (SinkNode) so nothing comes after it.
# TODO(ARROW-16631): move into ExecPlan
if (length(node$extras$sort$temp_columns) > 0) {
tab <- as_arrow_table(out)
tab <- tab[, setdiff(names(tab), node$extras$sort$temp_columns), drop = FALSE]
out <- as_record_batch_reader(tab)
out <- head(out, node$extras$slice_size)

Write = function(node, ...) {
Expand Down Expand Up @@ -272,13 +262,8 @@ ExecNode <- R6Class("ExecNode",
inherit = ArrowObject,
public = list(
extras = list(
# `sort` is a slight hack to be able to keep around arrange() params,
# which don't currently yield their own ExecNode but rather are consumed
# in the SinkNode (in ExecPlan$run())
sort = NULL,
# Similar hacks for head and tail
head = NULL,
tail = NULL,
# Workaround for non-deterministic head/tail
slice_size = NULL,
# `source_schema` is put here in Scan() so that at Run/Write, we can
# extract the relevant metadata and keep it in the result
source_schema = NULL
Expand All @@ -295,6 +280,7 @@ ExecNode <- R6Class("ExecNode",
old_meta$r <- get_r_metadata_from_old_schema(self$schema, old_schema)
has_ordered_batches = function() ExecNode_has_ordered_batches(self),
Project = function(cols) {
if (length(cols)) {
assert_is_list_of(cols, "Expression")
Expand Down Expand Up @@ -336,6 +322,16 @@ ExecNode <- R6Class("ExecNode",
Union = function(right_node) {
self$preserve_extras(ExecNode_Union(self, right_node))
Fetch = function(limit, offset = 0L) {
ExecNode_Fetch(self, offset, limit)
OrderBy = function(sorting) {
ExecNode_OrderBy(self, sorting)
active = list(
Expand Down

0 comments on commit 590f98c

Please sign in to comment.