Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC StringViewArray in CoalesceBatchesStream #11587

Merged
merged 10 commits into from
Jul 25, 2024
44 changes: 44 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};

use arrow::array::{AsArray, StringViewBuilder};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ArrayRef};
use datafusion_common::Result;
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -216,6 +218,8 @@ impl CoalesceBatchesStream {
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
let batch = gc_string_view_batch(&batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here inside gc string buffer will be copied once, (below) in concat_batches() string buffer will be copied again, it seems possible to copy only once by changing the internal implementation of concat_batches()
But I think we can do it later when there is a good benchmark to assess the impact, at least excessive copy in coalesce_batches() does not have a huge performance impact on TPCH now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent point

I think given how concat is implemented for StringView it will only copy the fixed parts (not the actual string data)

Perhaps what we could do is implement a wrapper around arrow::concat_batches that has the datafusion specific GC trigger for sparse arrays, and falls back to concat for other types: https://docs.rs/arrow-select/52.1.0/src/arrow_select/concat.rs.html#150

/// wrapper around [`arrow::compute::concat`] that 
pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
 // loop over columns here and handle StringView specially, 
 // or fallback to concat
 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #11628 to track this idea


if batch.num_rows() >= self.target_batch_size
&& self.buffer.is_empty()
{
Expand Down Expand Up @@ -290,6 +294,46 @@ pub fn concat_batches(
arrow::compute::concat_batches(schema, batches)
}

/// `StringViewArray` reference to the raw parquet decoded buffer, which reduces copy but prevents those buffer from being released.
/// When `StringViewArray`'s cardinality significantly drops (e.g., after `FilterExec` or `HashJoinExec` or many others),
/// we should consider consolidating it so that we can release the buffer to reduce memory usage and improve string locality for better performance.
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
let new_columns: Vec<ArrayRef> = batch
.columns()
.iter()
.map(|c| {
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
if let Some(s) = c.as_string_view_opt() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor comment here is that I think you can reduce the level of nesting by using syntax like

let Some(s) = c.as_string_view_opt() else {
  return Arc::clone(c)
}

let view_cnt = s.views().len();
let buffer_size = s.get_buffer_memory_size();

// Re-creating the array copies data and can be time consuming.
// We only do it if the array is sparse, below is a heuristic to determine if the array is sparse.
if buffer_size > (view_cnt * 32) {
// We use a block size of 2MB (instead of 8KB) to reduce the number of buffers to track.
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
let mut builder = StringViewBuilder::with_capacity(s.len())
.with_block_size(1024 * 1024 * 2);

for v in s.iter() {
builder.append_option(v);
}

let gc_string = builder.finish();

Arc::new(gc_string)
} else {
Arc::clone(c)
}
} else {
Arc::clone(c)
}
})
.collect();
RecordBatch::try_new(batch.schema(), new_columns)
.expect("Failed to re-create the gc'ed record batch")
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down