From 1fac94a996013441efe8241b0e485513abab9810 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 24 Jul 2024 13:44:23 -0400 Subject: [PATCH] Add comments and tests for gc_string_view_batch --- .../physical-plan/src/coalesce_batches.rs | 122 +++++++++++++++++- 1 file changed, 118 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 828ad15be242..81fa62f94f99 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -294,9 +294,26 @@ pub fn concat_batches( arrow::compute::concat_batches(schema, batches) } -/// `StringViewArray` reference to the raw parquet decoded buffer, which reduces copy but prevents those buffer from being released. -/// When `StringViewArray`'s cardinality significantly drops (e.g., after `FilterExec` or `HashJoinExec` or many others), -/// we should consider consolidating it so that we can release the buffer to reduce memory usage and improve string locality for better performance. +/// Heuristically compact [`StringViewArray`]s to reduce memory usage, if needed +/// +/// This function decides when to consolidate the StringView into a new buffer +/// to reduce memory usage and improve string locality for better performance. +/// +/// This differs from [`StringViewArray::gc`] because: +/// 1. It may not compact the array depending on a heuristic. +/// 2. It uses a larger default block size (2MB) to reduce the number of buffers to track. +/// +/// # Heuristic +/// +/// If the average size of each view is larger than 32 bytes, we compact the array. +/// +/// `StringViewArray` include pointers to buffer that hold the underlying data. +/// One of the great benefits of `StringViewArray` is that many operations +/// (e.g., `filter`) can be done without copying the underlying data. +/// +/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the +/// `StringViewArray` may only refer to a small portion of the buffer, +/// significantly increasing memory usage. fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { let new_columns: Vec = batch .columns() @@ -339,7 +356,8 @@ mod tests { use crate::{memory::MemoryExec, repartition::RepartitionExec, Partitioning}; use arrow::datatypes::{DataType, Field, Schema}; - use arrow_array::UInt32Array; + use arrow_array::builder::ArrayBuilder; + use arrow_array::{StringViewArray, UInt32Array}; #[tokio::test(flavor = "multi_thread")] async fn test_concat_batches() -> Result<()> { @@ -412,4 +430,100 @@ mod tests { ) .unwrap() } + + #[test] + fn test_gc_string_view_batch_small_no_compact() { + // view with only short strings (no buffers) --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("a"), Some("b"), Some("c")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 0); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_batch_large_no_compact() { + // view with large strings (has buffers) but full --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("This string is longer than 12 bytes")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + // TODO this is failing now (it always compacts) + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_batch_large_slice_compact() { + // view with large strings (has buffers) and only partially used --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("this string is longer than 12 bytes")], + } + .build(); + + // slice only 11 rows, so most of the buffer is not used + let array = array.slice(11, 22); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer + } + + /// Compares the values of two string view arrays + fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) { + assert_eq!(arr1.len(), arr2.len()); + for (s1, s2) in arr1.iter().zip(arr2.iter()) { + assert_eq!(s1, s2); + } + } + + /// runs garbage collection on string view array + /// and ensures the number of rows are the same + fn do_gc(array: StringViewArray) -> StringViewArray { + let batch = + RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); + let gc_batch = gc_string_view_batch(&batch); + assert_eq!(batch.num_rows(), gc_batch.num_rows()); + assert_eq!(batch.schema(), gc_batch.schema()); + gc_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .clone() + } + + /// Describes parameters for creating a `StringViewArray` + struct StringViewTest { + /// The number of rows in the array + rows: usize, + /// The strings to use in the array (repeated over and over + strings: Vec>, + } + + impl StringViewTest { + /// Create a `StringViewArray` with the parameters specified in this struct + fn build(self) -> StringViewArray { + let mut builder = StringViewBuilder::with_capacity(100); + loop { + for &v in self.strings.iter() { + builder.append_option(v); + if builder.len() >= self.rows { + return builder.finish(); + } + } + } + } + } }