From 0fb5f7b80642bda60d50d06407d2b21e196389a1 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 21 Jul 2024 13:53:05 -0400 Subject: [PATCH 1/9] gc string view when appropriate --- .../physical-plan/src/coalesce_batches.rs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index b9bdfcdee712..9cf271ba6ab5 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -29,9 +29,11 @@ use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; +use arrow::array::{AsArray, StringViewBuilder}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; +use arrow_array::Array; use datafusion_common::Result; use datafusion_execution::TaskContext; @@ -216,6 +218,41 @@ impl CoalesceBatchesStream { match input_batch { Poll::Ready(x) => match x { Some(Ok(batch)) => { + let new_columns: Vec> = batch + .columns() + .iter() + .map(|c| { + // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. + if let Some(s) = c.as_string_view_opt() { + let view_cnt = s.views().len(); + let buffer_size = s.get_buffer_memory_size(); + + // Re-creating the array copies data and can be time consuming. + // We only do it if the array is sparse, below is a heuristic to determine if the array is sparse. + if buffer_size > (view_cnt * 32) { + // We use a block size of 2MB (instead of 8KB) to reduce the number of buffers to track. + // See https://github.com/apache/arrow-rs/issues/6094 for more details. + let mut builder = + StringViewBuilder::with_capacity(s.len()) + .with_block_size(1024 * 1024 * 2); + + for v in s.iter() { + builder.append_option(v); + } + + let gc_string = builder.finish(); + + Arc::new(gc_string) + } else { + c.clone() + } + } else { + c.clone() + } + }) + .collect(); + let batch = RecordBatch::try_new(batch.schema(), new_columns)?; + if batch.num_rows() >= self.target_batch_size && self.buffer.is_empty() { From 1340a636df848bd18ef12cbbe48349e86876ebd2 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 21 Jul 2024 14:07:26 -0400 Subject: [PATCH 2/9] make clippy happy --- datafusion/physical-plan/src/coalesce_batches.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 9cf271ba6ab5..ac0bada38caa 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -244,10 +244,10 @@ impl CoalesceBatchesStream { Arc::new(gc_string) } else { - c.clone() + Arc::clone(c) } } else { - c.clone() + Arc::clone(c) } }) .collect(); From a55810ee3e506524231aba168a62799ed6f6014d Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 22 Jul 2024 09:03:54 -0400 Subject: [PATCH 3/9] address comments --- .../physical-plan/src/coalesce_batches.rs | 77 ++++++++++--------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index ac0bada38caa..a37b49c1e2a4 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -33,7 +33,7 @@ use arrow::array::{AsArray, StringViewBuilder}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use arrow_array::Array; +use arrow_array::{Array, ArrayRef}; use datafusion_common::Result; use datafusion_execution::TaskContext; @@ -218,40 +218,7 @@ impl CoalesceBatchesStream { match input_batch { Poll::Ready(x) => match x { Some(Ok(batch)) => { - let new_columns: Vec> = batch - .columns() - .iter() - .map(|c| { - // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. - if let Some(s) = c.as_string_view_opt() { - let view_cnt = s.views().len(); - let buffer_size = s.get_buffer_memory_size(); - - // Re-creating the array copies data and can be time consuming. - // We only do it if the array is sparse, below is a heuristic to determine if the array is sparse. - if buffer_size > (view_cnt * 32) { - // We use a block size of 2MB (instead of 8KB) to reduce the number of buffers to track. - // See https://github.com/apache/arrow-rs/issues/6094 for more details. - let mut builder = - StringViewBuilder::with_capacity(s.len()) - .with_block_size(1024 * 1024 * 2); - - for v in s.iter() { - builder.append_option(v); - } - - let gc_string = builder.finish(); - - Arc::new(gc_string) - } else { - Arc::clone(c) - } - } else { - Arc::clone(c) - } - }) - .collect(); - let batch = RecordBatch::try_new(batch.schema(), new_columns)?; + let batch = gc_string_view_batch(&batch); if batch.num_rows() >= self.target_batch_size && self.buffer.is_empty() @@ -327,6 +294,46 @@ pub fn concat_batches( arrow::compute::concat_batches(schema, batches) } +/// [`StringViewArray`] reference to the raw parquet decoded buffer, which reduces copy but prevents those buffer from being released. +/// When `StringViewArray`'s cardinality significantly drops (e.g., after `FilterExec` or `HashJoinExec` or many others), +/// we should consider consolidating it so that we can release the buffer to reduce memory usage and improve string locality for better performance. +fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { + let new_columns: Vec = batch + .columns() + .iter() + .map(|c| { + // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. + if let Some(s) = c.as_string_view_opt() { + let view_cnt = s.views().len(); + let buffer_size = s.get_buffer_memory_size(); + + // Re-creating the array copies data and can be time consuming. + // We only do it if the array is sparse, below is a heuristic to determine if the array is sparse. + if buffer_size > (view_cnt * 32) { + // We use a block size of 2MB (instead of 8KB) to reduce the number of buffers to track. + // See https://github.com/apache/arrow-rs/issues/6094 for more details. + let mut builder = StringViewBuilder::with_capacity(s.len()) + .with_block_size(1024 * 1024 * 2); + + for v in s.iter() { + builder.append_option(v); + } + + let gc_string = builder.finish(); + + Arc::new(gc_string) + } else { + Arc::clone(c) + } + } else { + Arc::clone(c) + } + }) + .collect(); + RecordBatch::try_new(batch.schema(), new_columns) + .expect("Failed to re-create the gc'ed record batch") +} + #[cfg(test)] mod tests { use super::*; From f886d5032dc68a6a2fbb42be5d378485c8b2ab26 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 22 Jul 2024 09:19:05 -0400 Subject: [PATCH 4/9] make doc happy --- datafusion/physical-plan/src/coalesce_batches.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index a37b49c1e2a4..f4af7be38517 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -294,7 +294,7 @@ pub fn concat_batches( arrow::compute::concat_batches(schema, batches) } -/// [`StringViewArray`] reference to the raw parquet decoded buffer, which reduces copy but prevents those buffer from being released. +/// `StringViewArray` reference to the raw parquet decoded buffer, which reduces copy but prevents those buffer from being released. /// When `StringViewArray`'s cardinality significantly drops (e.g., after `FilterExec` or `HashJoinExec` or many others), /// we should consider consolidating it so that we can release the buffer to reduce memory usage and improve string locality for better performance. fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { From b3979449612dc9e478632c5817e34e4607260c54 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 23 Jul 2024 14:54:16 -0400 Subject: [PATCH 5/9] update style --- .../physical-plan/src/coalesce_batches.rs | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index f4af7be38517..828ad15be242 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -303,28 +303,27 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { .iter() .map(|c| { // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. - if let Some(s) = c.as_string_view_opt() { - let view_cnt = s.views().len(); - let buffer_size = s.get_buffer_memory_size(); - - // Re-creating the array copies data and can be time consuming. - // We only do it if the array is sparse, below is a heuristic to determine if the array is sparse. - if buffer_size > (view_cnt * 32) { - // We use a block size of 2MB (instead of 8KB) to reduce the number of buffers to track. - // See https://github.com/apache/arrow-rs/issues/6094 for more details. - let mut builder = StringViewBuilder::with_capacity(s.len()) - .with_block_size(1024 * 1024 * 2); - - for v in s.iter() { - builder.append_option(v); - } + let Some(s) = c.as_string_view_opt() else { + return Arc::clone(c); + }; + let view_cnt = s.views().len(); + let buffer_size = s.get_buffer_memory_size(); + + // Re-creating the array copies data and can be time consuming. + // We only do it if the array is sparse, below is a heuristic to determine if the array is sparse. + if buffer_size > (view_cnt * 32) { + // We use a block size of 2MB (instead of 8KB) to reduce the number of buffers to track. + // See https://github.com/apache/arrow-rs/issues/6094 for more details. + let mut builder = StringViewBuilder::with_capacity(s.len()) + .with_block_size(1024 * 1024 * 2); + + for v in s.iter() { + builder.append_option(v); + } - let gc_string = builder.finish(); + let gc_string = builder.finish(); - Arc::new(gc_string) - } else { - Arc::clone(c) - } + Arc::new(gc_string) } else { Arc::clone(c) } From 1fac94a996013441efe8241b0e485513abab9810 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 24 Jul 2024 13:44:23 -0400 Subject: [PATCH 6/9] Add comments and tests for gc_string_view_batch --- .../physical-plan/src/coalesce_batches.rs | 122 +++++++++++++++++- 1 file changed, 118 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 828ad15be242..81fa62f94f99 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -294,9 +294,26 @@ pub fn concat_batches( arrow::compute::concat_batches(schema, batches) } -/// `StringViewArray` reference to the raw parquet decoded buffer, which reduces copy but prevents those buffer from being released. -/// When `StringViewArray`'s cardinality significantly drops (e.g., after `FilterExec` or `HashJoinExec` or many others), -/// we should consider consolidating it so that we can release the buffer to reduce memory usage and improve string locality for better performance. +/// Heuristically compact [`StringViewArray`]s to reduce memory usage, if needed +/// +/// This function decides when to consolidate the StringView into a new buffer +/// to reduce memory usage and improve string locality for better performance. +/// +/// This differs from [`StringViewArray::gc`] because: +/// 1. It may not compact the array depending on a heuristic. +/// 2. It uses a larger default block size (2MB) to reduce the number of buffers to track. +/// +/// # Heuristic +/// +/// If the average size of each view is larger than 32 bytes, we compact the array. +/// +/// `StringViewArray` include pointers to buffer that hold the underlying data. +/// One of the great benefits of `StringViewArray` is that many operations +/// (e.g., `filter`) can be done without copying the underlying data. +/// +/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the +/// `StringViewArray` may only refer to a small portion of the buffer, +/// significantly increasing memory usage. fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { let new_columns: Vec = batch .columns() @@ -339,7 +356,8 @@ mod tests { use crate::{memory::MemoryExec, repartition::RepartitionExec, Partitioning}; use arrow::datatypes::{DataType, Field, Schema}; - use arrow_array::UInt32Array; + use arrow_array::builder::ArrayBuilder; + use arrow_array::{StringViewArray, UInt32Array}; #[tokio::test(flavor = "multi_thread")] async fn test_concat_batches() -> Result<()> { @@ -412,4 +430,100 @@ mod tests { ) .unwrap() } + + #[test] + fn test_gc_string_view_batch_small_no_compact() { + // view with only short strings (no buffers) --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("a"), Some("b"), Some("c")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 0); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_batch_large_no_compact() { + // view with large strings (has buffers) but full --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("This string is longer than 12 bytes")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + // TODO this is failing now (it always compacts) + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_batch_large_slice_compact() { + // view with large strings (has buffers) and only partially used --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("this string is longer than 12 bytes")], + } + .build(); + + // slice only 11 rows, so most of the buffer is not used + let array = array.slice(11, 22); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer + } + + /// Compares the values of two string view arrays + fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) { + assert_eq!(arr1.len(), arr2.len()); + for (s1, s2) in arr1.iter().zip(arr2.iter()) { + assert_eq!(s1, s2); + } + } + + /// runs garbage collection on string view array + /// and ensures the number of rows are the same + fn do_gc(array: StringViewArray) -> StringViewArray { + let batch = + RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); + let gc_batch = gc_string_view_batch(&batch); + assert_eq!(batch.num_rows(), gc_batch.num_rows()); + assert_eq!(batch.schema(), gc_batch.schema()); + gc_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .clone() + } + + /// Describes parameters for creating a `StringViewArray` + struct StringViewTest { + /// The number of rows in the array + rows: usize, + /// The strings to use in the array (repeated over and over + strings: Vec>, + } + + impl StringViewTest { + /// Create a `StringViewArray` with the parameters specified in this struct + fn build(self) -> StringViewArray { + let mut builder = StringViewBuilder::with_capacity(100); + loop { + for &v in self.strings.iter() { + builder.append_option(v); + if builder.len() >= self.rows { + return builder.finish(); + } + } + } + } + } } From de0c84a274eda5b5e5a3d3d67d5f4e210a8f42d3 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 25 Jul 2024 11:25:26 -0400 Subject: [PATCH 7/9] better herustic --- .../physical-plan/src/coalesce_batches.rs | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 828ad15be242..daec37c904ae 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -306,16 +306,27 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { let Some(s) = c.as_string_view_opt() else { return Arc::clone(c); }; - let view_cnt = s.views().len(); - let buffer_size = s.get_buffer_memory_size(); + let ideal_buffer_size: usize = s + .views() + .iter() + .map(|v| { + let len = (*v as u32) as usize; + if len > 12 { + len + } else { + 0 + } + }) + .sum(); + let actual_buffer_size = s.get_buffer_memory_size(); // Re-creating the array copies data and can be time consuming. - // We only do it if the array is sparse, below is a heuristic to determine if the array is sparse. - if buffer_size > (view_cnt * 32) { - // We use a block size of 2MB (instead of 8KB) to reduce the number of buffers to track. + // We only do it if the array is sparse + if actual_buffer_size > (ideal_buffer_size * 2) { + // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. // See https://github.com/apache/arrow-rs/issues/6094 for more details. let mut builder = StringViewBuilder::with_capacity(s.len()) - .with_block_size(1024 * 1024 * 2); + .with_block_size(ideal_buffer_size as u32); for v in s.iter() { builder.append_option(v); @@ -323,6 +334,8 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { let gc_string = builder.finish(); + debug_assert_eq!(gc_string.data_buffers().len(), 1); + Arc::new(gc_string) } else { Arc::clone(c) From 8ab087cb56d5d593030b5f0eb22ff57bc0356c88 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 25 Jul 2024 12:05:02 -0400 Subject: [PATCH 8/9] update test --- datafusion/physical-plan/src/coalesce_batches.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index ae52fb1d7695..98187c7dd7a3 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -351,7 +351,7 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { let gc_string = builder.finish(); - debug_assert_eq!(gc_string.data_buffers().len(), 1); + debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0 Arc::new(gc_string) } else { From 8e0eaa6ccc0b20b81a4f834e9fc1b057b15dc405 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 25 Jul 2024 12:10:54 -0400 Subject: [PATCH 9/9] Update datafusion/physical-plan/src/coalesce_batches.rs Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/coalesce_batches.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 98187c7dd7a3..33f4b2653a11 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -471,7 +471,6 @@ mod tests { let gc_array = do_gc(array.clone()); compare_string_array_values(&array, &gc_array); assert_eq!(array.data_buffers().len(), 5); - // TODO this is failing now (it always compacts) assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction }