From 405cf401ea3c4b75dc42f1f964e5abb2fd42d5c2 Mon Sep 17 00:00:00 2001 From: Sean Smith Date: Mon, 23 Dec 2024 15:33:01 -0500 Subject: [PATCH] sort merger --- .../src/arrays/array_builder.rs | 12 ++ .../src/arrays/buffer/mod.rs | 74 +++----- .../src/arrays/executor/aggregate/unary.rs | 40 +--- .../src/arrays/executor/scalar/binary.rs | 73 ++------ .../src/arrays/executor/scalar/unary.rs | 53 ++---- crates/rayexec_execution/src/arrays/mod.rs | 1 + .../operators_exp/batch_collection.rs | 16 +- .../operators_exp/physical_sort/merge.rs | 176 ++++++++++++++++++ .../operators_exp/physical_sort/mod.rs | 1 + .../operators_exp/physical_sort/sort_data.rs | 99 +++++----- .../src/expr/physical/column_expr.rs | 18 +- .../src/expr/physical/literal_expr.rs | 6 +- .../src/expr/physical/scalar_function_expr.rs | 8 +- 13 files changed, 330 insertions(+), 247 deletions(-) create mode 100644 crates/rayexec_execution/src/arrays/array_builder.rs create mode 100644 crates/rayexec_execution/src/execution/operators_exp/physical_sort/merge.rs diff --git a/crates/rayexec_execution/src/arrays/array_builder.rs b/crates/rayexec_execution/src/arrays/array_builder.rs new file mode 100644 index 000000000..f2f0d74de --- /dev/null +++ b/crates/rayexec_execution/src/arrays/array_builder.rs @@ -0,0 +1,12 @@ +use iterutil::exact_size::IntoExactSizeIterator; +use rayexec_error::Result; + +use super::array::Array; +use super::buffer::physical_type::PhysicalStorage; +use super::buffer_manager::BufferManager; +use super::datatype::DataType; + +#[derive(Debug)] +pub struct Int8Builder<'a, B: BufferManager> { + pub manager: &'a B, +} diff --git a/crates/rayexec_execution/src/arrays/buffer/mod.rs b/crates/rayexec_execution/src/arrays/buffer/mod.rs index 1e410e5e7..ae9b1507c 100644 --- a/crates/rayexec_execution/src/arrays/buffer/mod.rs +++ b/crates/rayexec_execution/src/arrays/buffer/mod.rs @@ -22,12 +22,7 @@ use physical_type::{ PhysicalUtf8, }; use rayexec_error::{not_implemented, RayexecError, Result}; -use string_view::{ - StringViewHeap, - StringViewMetadataUnion, - StringViewStorage, - StringViewStorageMut, -}; +use string_view::{StringViewHeap, StringViewMetadataUnion, StringViewStorage, StringViewStorageMut}; use struct_buffer::{StructBuffer, StructItemMetadata}; use super::array::Array; @@ -98,11 +93,9 @@ where pub fn try_as_slice(&self) -> Result<&[S::PrimaryBufferType]> { if S::PHYSICAL_TYPE != self.physical_type { - return Err( - RayexecError::new("Attempted to cast buffer to wrong physical type") - .with_field("expected_type", self.physical_type) - .with_field("requested_type", S::PHYSICAL_TYPE), - ); + return Err(RayexecError::new("Attempted to cast buffer to wrong physical type") + .with_field("expected_type", self.physical_type) + .with_field("requested_type", S::PHYSICAL_TYPE)); } let data = unsafe { self.primary.as_slice::() }; @@ -112,11 +105,9 @@ where pub fn try_as_slice_mut(&mut self) -> Result<&mut [S::PrimaryBufferType]> { if S::PHYSICAL_TYPE != self.physical_type { - return Err( - RayexecError::new("Attempted to cast buffer to wrong physical type") - .with_field("expected_type", self.physical_type) - .with_field("requested_type", S::PHYSICAL_TYPE), - ); + return Err(RayexecError::new("Attempted to cast buffer to wrong physical type") + .with_field("expected_type", self.physical_type) + .with_field("requested_type", S::PHYSICAL_TYPE)); } let data = unsafe { self.primary.as_slice_mut::() }; @@ -136,11 +127,9 @@ where pub fn try_as_string_view_storage_mut(&mut self) -> Result> { // TODO: Duplicated, but let's us take each field mutably. if PhysicalUtf8::PHYSICAL_TYPE != self.physical_type { - return Err( - RayexecError::new("Attempted to cast buffer to wrong physical type") - .with_field("expected_type", self.physical_type) - .with_field("requested_type", PhysicalUtf8::PHYSICAL_TYPE), - ); + return Err(RayexecError::new("Attempted to cast buffer to wrong physical type") + .with_field("expected_type", self.physical_type) + .with_field("requested_type", PhysicalUtf8::PHYSICAL_TYPE)); } let metadata = unsafe { self.primary.as_slice_mut::() }; @@ -164,15 +153,9 @@ where } /// Appends data from another buffer into this buffer. - pub fn append_from( - &mut self, - manager: &B, - other: &ArrayBuffer, - ) -> Result<()> { + pub fn append_from(&mut self, manager: &B, other: &ArrayBuffer) -> Result<()> { if !self.secondary.is_none() { - return Err(RayexecError::new( - "Appending secondary buffers not yet supported", - )); + return Err(RayexecError::new("Appending secondary buffers not yet supported")); } let orig_len = self.capacity(); @@ -349,8 +332,8 @@ impl RawBufferParts { } } -pub type Int8Builder = PrimBufferBuilder; -pub type Int32Builder = PrimBufferBuilder; +pub type Int8BufferBuilder = PrimBufferBuilder; +pub type Int32BufferBuilder = PrimBufferBuilder; #[derive(Debug)] pub struct PrimBufferBuilder { @@ -363,8 +346,7 @@ impl PrimBufferBuilder { I: IntoExactSizeIterator, { let iter = iter.into_iter(); - let mut data = - RawBufferParts::try_new::(&NopBufferManager, iter.len())?; + let mut data = RawBufferParts::try_new::(&NopBufferManager, iter.len())?; let data_slice = unsafe { data.as_slice_mut() }; for (idx, val) in iter.enumerate() { @@ -380,17 +362,16 @@ impl PrimBufferBuilder { } #[derive(Debug)] -pub struct StringViewBufferBuilder; +pub struct StringBufferBuilder; -impl StringViewBufferBuilder { +impl StringBufferBuilder { pub fn from_iter(iter: I) -> Result where A: AsRef, I: IntoExactSizeIterator, { let iter = iter.into_iter(); - let mut data = - RawBufferParts::try_new::(&NopBufferManager, iter.len())?; + let mut data = RawBufferParts::try_new::(&NopBufferManager, iter.len())?; let mut heap = StringViewHeap::new(); @@ -451,18 +432,10 @@ impl ListBufferBuilder { // TODO: Move this out. match child_buf.physical_type { - PhysicalType::UntypedNull => { - child_buf.append_from::(&NopBufferManager, &child)? - } - PhysicalType::Int8 => { - child_buf.append_from::(&NopBufferManager, &child)? - } - PhysicalType::Int32 => { - child_buf.append_from::(&NopBufferManager, &child)? - } - PhysicalType::Utf8 => { - child_buf.append_from::(&NopBufferManager, &child)? - } + PhysicalType::UntypedNull => child_buf.append_from::(&NopBufferManager, &child)?, + PhysicalType::Int8 => child_buf.append_from::(&NopBufferManager, &child)?, + PhysicalType::Int32 => child_buf.append_from::(&NopBufferManager, &child)?, + PhysicalType::Utf8 => child_buf.append_from::(&NopBufferManager, &child)?, other => not_implemented!("append from {other}"), } } @@ -542,8 +515,7 @@ mod tests { #[test] fn new_from_strings_iter() { - let buf = - StringViewBufferBuilder::from_iter(["a", "bb", "ccc", "ddddddddddddddd"]).unwrap(); + let buf = StringBufferBuilder::from_iter(["a", "bb", "ccc", "ddddddddddddddd"]).unwrap(); let view_buf = buf.try_as_string_view_storage().unwrap(); assert_eq!("a", view_buf.get(0).unwrap()); diff --git a/crates/rayexec_execution/src/arrays/executor/aggregate/unary.rs b/crates/rayexec_execution/src/arrays/executor/aggregate/unary.rs index 3c408ffa5..48584037d 100644 --- a/crates/rayexec_execution/src/arrays/executor/aggregate/unary.rs +++ b/crates/rayexec_execution/src/arrays/executor/aggregate/unary.rs @@ -53,7 +53,7 @@ mod tests { use super::*; use crate::arrays::buffer::addressable::MutableAddressableStorage; use crate::arrays::buffer::physical_type::{PhysicalI32, PhysicalUtf8}; - use crate::arrays::buffer::{Int32Builder, StringViewBufferBuilder}; + use crate::arrays::buffer::{Int32BufferBuilder, StringBufferBuilder}; use crate::arrays::datatype::DataType; use crate::arrays::executor::PutBuffer; use crate::arrays::validity::Validity; @@ -86,18 +86,9 @@ mod tests { #[test] fn unary_primitive_single_state() { let mut states = [TestSumState::default()]; - let array = Array::new_with_buffer( - DataType::Int32, - Int32Builder::from_iter([1, 2, 3, 4, 5]).unwrap(), - ); + let array = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3, 4, 5]).unwrap()); - UnaryNonNullUpdater::update::( - &array, - [0, 1, 2, 4], - [0, 0, 0, 0], - &mut states, - ) - .unwrap(); + UnaryNonNullUpdater::update::(&array, [0, 1, 2, 4], [0, 0, 0, 0], &mut states).unwrap(); assert_eq!(11, states[0].val); } @@ -109,18 +100,12 @@ mod tests { validity.set_invalid(0); let array = Array::new_with_validity( DataType::Int32, - Int32Builder::from_iter([1, 2, 3, 4, 5]).unwrap(), + Int32BufferBuilder::from_iter([1, 2, 3, 4, 5]).unwrap(), validity, ) .unwrap(); - UnaryNonNullUpdater::update::( - &array, - [0, 1, 2, 4], - [0, 0, 0, 0], - &mut states, - ) - .unwrap(); + UnaryNonNullUpdater::update::(&array, [0, 1, 2, 4], [0, 0, 0, 0], &mut states).unwrap(); assert_eq!(10, states[0].val); } @@ -128,10 +113,7 @@ mod tests { #[test] fn unary_primitive_multiple_states() { let mut states = [TestSumState::default(), TestSumState::default()]; - let array = Array::new_with_buffer( - DataType::Int32, - Int32Builder::from_iter([1, 2, 3, 4, 5]).unwrap(), - ); + let array = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3, 4, 5]).unwrap()); UnaryNonNullUpdater::update::( &array, @@ -176,16 +158,10 @@ mod tests { let mut states = [TestStringAgg::default()]; let array = Array::new_with_buffer( DataType::Utf8, - StringViewBufferBuilder::from_iter(["aa", "bbb", "cccc"]).unwrap(), + StringBufferBuilder::from_iter(["aa", "bbb", "cccc"]).unwrap(), ); - UnaryNonNullUpdater::update::( - &array, - [0, 1, 2], - [0, 0, 0], - &mut states, - ) - .unwrap(); + UnaryNonNullUpdater::update::(&array, [0, 1, 2], [0, 0, 0], &mut states).unwrap(); assert_eq!("aabbbcccc", &states[0].val); } diff --git a/crates/rayexec_execution/src/arrays/executor/scalar/binary.rs b/crates/rayexec_execution/src/arrays/executor/scalar/binary.rs index c43651cba..deb211263 100644 --- a/crates/rayexec_execution/src/arrays/executor/scalar/binary.rs +++ b/crates/rayexec_execution/src/arrays/executor/scalar/binary.rs @@ -45,31 +45,19 @@ impl BinaryExecutor { let validity2 = array2.validity(); if validity1.all_valid() && validity2.all_valid() { - for (output_idx, (input1_idx, input2_idx)) in - sel1.into_iter().zip(sel2.into_iter()).enumerate() - { + for (output_idx, (input1_idx, input2_idx)) in sel1.into_iter().zip(sel2.into_iter()).enumerate() { let val1 = input1.get(input1_idx).unwrap(); let val2 = input2.get(input2_idx).unwrap(); - op( - val1, - val2, - PutBuffer::new(output_idx, &mut output, out.validity), - ); + op(val1, val2, PutBuffer::new(output_idx, &mut output, out.validity)); } } else { - for (output_idx, (input1_idx, input2_idx)) in - sel1.into_iter().zip(sel2.into_iter()).enumerate() - { + for (output_idx, (input1_idx, input2_idx)) in sel1.into_iter().zip(sel2.into_iter()).enumerate() { if validity1.is_valid(input1_idx) && validity2.is_valid(input2_idx) { let val1 = input1.get(input1_idx).unwrap(); let val2 = input2.get(input2_idx).unwrap(); - op( - val1, - val2, - PutBuffer::new(output_idx, &mut output, out.validity), - ); + op(val1, val2, PutBuffer::new(output_idx, &mut output, out.validity)); } else { out.validity.set_invalid(output_idx); } @@ -104,25 +92,17 @@ impl BinaryExecutor { let validity2 = &array2.validity; if validity1.all_valid() && validity2.all_valid() { - for (output_idx, (input1_idx, input2_idx)) in - sel1.into_iter().zip(sel2.into_iter()).enumerate() - { + for (output_idx, (input1_idx, input2_idx)) in sel1.into_iter().zip(sel2.into_iter()).enumerate() { let sel1 = array1.selection.get(input1_idx).unwrap(); let sel2 = array2.selection.get(input2_idx).unwrap(); let val1 = input1.get(sel1).unwrap(); let val2 = input2.get(sel2).unwrap(); - op( - val1, - val2, - PutBuffer::new(output_idx, &mut output, out.validity), - ); + op(val1, val2, PutBuffer::new(output_idx, &mut output, out.validity)); } } else { - for (output_idx, (input1_idx, input2_idx)) in - sel1.into_iter().zip(sel2.into_iter()).enumerate() - { + for (output_idx, (input1_idx, input2_idx)) in sel1.into_iter().zip(sel2.into_iter()).enumerate() { let sel1 = array1.selection.get(input1_idx).unwrap(); let sel2 = array2.selection.get(input2_idx).unwrap(); @@ -130,11 +110,7 @@ impl BinaryExecutor { let val1 = input1.get(sel1).unwrap(); let val2 = input2.get(sel2).unwrap(); - op( - val1, - val2, - PutBuffer::new(output_idx, &mut output, out.validity), - ); + op(val1, val2, PutBuffer::new(output_idx, &mut output, out.validity)); } else { out.validity.set_invalid(output_idx); } @@ -150,16 +126,14 @@ mod tests { use super::*; use crate::arrays::buffer::physical_type::{PhysicalI32, PhysicalUtf8}; use crate::arrays::buffer::string_view::StringViewHeap; - use crate::arrays::buffer::{Int32Builder, StringViewBufferBuilder}; + use crate::arrays::buffer::{Int32BufferBuilder, StringBufferBuilder}; use crate::arrays::buffer_manager::NopBufferManager; use crate::arrays::datatype::DataType; #[test] fn binary_simple_add() { - let left = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([1, 2, 3]).unwrap()); - let right = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([4, 5, 6]).unwrap()); + let left = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3]).unwrap()); + let right = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([4, 5, 6]).unwrap()); let mut out = ArrayBuffer::with_capacity::(&NopBufferManager, 3).unwrap(); let mut validity = Validity::new_all_valid(3); @@ -184,13 +158,11 @@ mod tests { #[test] fn binary_simple_add_with_selection() { - let mut left = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([2]).unwrap()); + let mut left = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([2]).unwrap()); // [2, 2, 2] left.select(&NopBufferManager, [0, 0, 0]).unwrap(); - let right = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([4, 5, 6]).unwrap()); + let right = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([4, 5, 6]).unwrap()); let mut out = ArrayBuffer::with_capacity::(&NopBufferManager, 3).unwrap(); let mut validity = Validity::new_all_valid(3); @@ -215,19 +187,15 @@ mod tests { #[test] fn binary_string_repeat() { - let left = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([1, 2, 3]).unwrap()); + let left = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3]).unwrap()); let right = Array::new_with_buffer( DataType::Utf8, - StringViewBufferBuilder::from_iter(["hello", "world", "goodbye!"]).unwrap(), + StringBufferBuilder::from_iter(["hello", "world", "goodbye!"]).unwrap(), ); - let mut out = ArrayBuffer::with_len_and_child_buffer::( - &NopBufferManager, - 3, - StringViewHeap::new(), - ) - .unwrap(); + let mut out = + ArrayBuffer::with_len_and_child_buffer::(&NopBufferManager, 3, StringViewHeap::new()) + .unwrap(); let mut validity = Validity::new_all_valid(3); let mut string_buf = String::new(); @@ -263,13 +231,12 @@ mod tests { left_validity.set_invalid(1); let left = Array::new_with_validity( DataType::Int32, - Int32Builder::from_iter([1, 2, 3]).unwrap(), + Int32BufferBuilder::from_iter([1, 2, 3]).unwrap(), left_validity, ) .unwrap(); - let right = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([4, 5, 6]).unwrap()); + let right = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([4, 5, 6]).unwrap()); let mut out = ArrayBuffer::with_capacity::(&NopBufferManager, 3).unwrap(); let mut validity = Validity::new_all_valid(3); diff --git a/crates/rayexec_execution/src/arrays/executor/scalar/unary.rs b/crates/rayexec_execution/src/arrays/executor/scalar/unary.rs index e558c1ae3..9dced01a8 100644 --- a/crates/rayexec_execution/src/arrays/executor/scalar/unary.rs +++ b/crates/rayexec_execution/src/arrays/executor/scalar/unary.rs @@ -174,15 +174,14 @@ mod tests { use super::*; use crate::arrays::buffer::physical_type::{PhysicalI32, PhysicalUtf8}; use crate::arrays::buffer::string_view::{StringViewHeap, StringViewStorageMut}; - use crate::arrays::buffer::{ArrayBuffer, Int32Builder, StringViewBufferBuilder}; + use crate::arrays::buffer::{ArrayBuffer, Int32BufferBuilder, StringBufferBuilder}; use crate::arrays::buffer_manager::NopBufferManager; use crate::arrays::datatype::DataType; use crate::arrays::validity::Validity; #[test] fn int32_inc_by_2() { - let array = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([1, 2, 3]).unwrap()); + let array = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3]).unwrap()); let mut out = ArrayBuffer::with_capacity::(&NopBufferManager, 3).unwrap(); let mut validity = Validity::new_all_valid(3); @@ -204,8 +203,7 @@ mod tests { #[test] fn int32_inc_by_2_using_flat_view() { - let array = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([1, 2, 3]).unwrap()); + let array = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3]).unwrap()); let mut out = ArrayBuffer::with_capacity::(&NopBufferManager, 3).unwrap(); let mut validity = Validity::new_all_valid(3); @@ -229,8 +227,7 @@ mod tests { #[test] fn int32_inc_by_2_in_place() { - let mut array = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([1, 2, 3]).unwrap()); + let mut array = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3]).unwrap()); UnaryExecutor::execute_in_place::(&mut array, |v| *v = *v + 2).unwrap(); @@ -243,7 +240,7 @@ mod tests { // Example with defined function, and allocating a new string every time. let array = Array::new_with_buffer( DataType::Utf8, - StringViewBufferBuilder::from_iter([ + StringBufferBuilder::from_iter([ "a", "bb", "ccc", @@ -254,12 +251,9 @@ mod tests { .unwrap(), ); - let mut out = ArrayBuffer::with_len_and_child_buffer::( - &NopBufferManager, - 6, - StringViewHeap::new(), - ) - .unwrap(); + let mut out = + ArrayBuffer::with_len_and_child_buffer::(&NopBufferManager, 6, StringViewHeap::new()) + .unwrap(); let mut validity = Validity::new_all_valid(6); fn my_string_double(s: &str, buf: PutBuffer) { @@ -287,10 +281,7 @@ mod tests { assert_eq!("cccccc", out.get(2).unwrap()); assert_eq!("dddddddd", out.get(3).unwrap()); assert_eq!("heapafterheapafter", out.get(4).unwrap()); - assert_eq!( - "alongerstringdontinlinealongerstringdontinline", - out.get(5).unwrap() - ); + assert_eq!("alongerstringdontinlinealongerstringdontinline", out.get(5).unwrap()); } #[test] @@ -298,7 +289,7 @@ mod tests { // Same thing, but with closure reusing a string buffer. let array = Array::new_with_buffer( DataType::Utf8, - StringViewBufferBuilder::from_iter([ + StringBufferBuilder::from_iter([ "a", "bb", "ccc", @@ -309,12 +300,9 @@ mod tests { .unwrap(), ); - let mut out = ArrayBuffer::with_len_and_child_buffer::( - &NopBufferManager, - 6, - StringViewHeap::new(), - ) - .unwrap(); + let mut out = + ArrayBuffer::with_len_and_child_buffer::(&NopBufferManager, 6, StringViewHeap::new()) + .unwrap(); let mut validity = Validity::new_all_valid(6); let mut string_buf = String::new(); @@ -345,23 +333,17 @@ mod tests { assert_eq!("cccccc", out.get(2).unwrap()); assert_eq!("dddddddd", out.get(3).unwrap()); assert_eq!("heapafterheapafter", out.get(4).unwrap()); - assert_eq!( - "alongerstringdontinlinealongerstringdontinline", - out.get(5).unwrap() - ); + assert_eq!("alongerstringdontinlinealongerstringdontinline", out.get(5).unwrap()); } #[test] fn string_uppercase_in_place() { let mut array = Array::new_with_buffer( DataType::Utf8, - StringViewBufferBuilder::from_iter(["a", "bb", "ccc"]).unwrap(), + StringBufferBuilder::from_iter(["a", "bb", "ccc"]).unwrap(), ); - UnaryExecutor::execute_in_place::(&mut array, |v| { - v.make_ascii_uppercase() - }) - .unwrap(); + UnaryExecutor::execute_in_place::(&mut array, |v| v.make_ascii_uppercase()).unwrap(); let out = array.data().try_as_string_view_storage().unwrap(); @@ -372,8 +354,7 @@ mod tests { #[test] fn int32_inc_by_2_with_dict() { - let mut array = - Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([1, 2, 3]).unwrap()); + let mut array = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3]).unwrap()); // [3, 3, 2, 1, 1, 3] array.select(&NopBufferManager, [2, 2, 1, 0, 0, 2]).unwrap(); diff --git a/crates/rayexec_execution/src/arrays/mod.rs b/crates/rayexec_execution/src/arrays/mod.rs index c0e7ee07f..f6c8dce3d 100644 --- a/crates/rayexec_execution/src/arrays/mod.rs +++ b/crates/rayexec_execution/src/arrays/mod.rs @@ -1,4 +1,5 @@ pub mod array; +pub mod array_builder; pub mod batch; pub mod bitmap; pub mod buffer; diff --git a/crates/rayexec_execution/src/execution/operators_exp/batch_collection.rs b/crates/rayexec_execution/src/execution/operators_exp/batch_collection.rs index ff55ce4a0..177a8f0c1 100644 --- a/crates/rayexec_execution/src/execution/operators_exp/batch_collection.rs +++ b/crates/rayexec_execution/src/execution/operators_exp/batch_collection.rs @@ -145,7 +145,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::arrays::buffer::{Int32Builder, StringViewBufferBuilder}; + use crate::arrays::buffer::{Int32BufferBuilder, StringBufferBuilder}; use crate::arrays::buffer_manager::NopBufferManager; use crate::arrays::executor::scalar::unary::UnaryExecutor; @@ -153,9 +153,9 @@ mod tests { fn append_i32() { let mut block = BatchCollectionBlock::new(&NopBufferManager, &[DataType::Int32], 4096).unwrap(); - let array1 = Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([4, 5, 6]).unwrap()); - let array2 = Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([7, 8]).unwrap()); - let array3 = Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([9, 10, 11]).unwrap()); + let array1 = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([4, 5, 6]).unwrap()); + let array2 = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([7, 8]).unwrap()); + let array3 = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([9, 10, 11]).unwrap()); let batch1 = Batch::from_arrays([array1], true).unwrap(); let batch2 = Batch::from_arrays([array2], true).unwrap(); @@ -180,7 +180,7 @@ mod tests { fn append_i32_dictionary() { let mut block = BatchCollectionBlock::new(&NopBufferManager, &[DataType::Int32], 4096).unwrap(); - let mut array = Array::new_with_buffer(DataType::Int32, Int32Builder::from_iter([4, 5, 6]).unwrap()); + let mut array = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([4, 5, 6]).unwrap()); array.select(&NopBufferManager, [0, 0, 2, 2, 1, 1]).unwrap(); let batch = Batch::from_arrays([array], true).unwrap(); @@ -203,12 +203,12 @@ mod tests { let array1 = Array::new_with_buffer( DataType::Utf8, - StringViewBufferBuilder::from_iter(["a", "bb", "ccc"]).unwrap(), + StringBufferBuilder::from_iter(["a", "bb", "ccc"]).unwrap(), ); - let array2 = Array::new_with_buffer(DataType::Utf8, StringViewBufferBuilder::from_iter(["d", "ee"]).unwrap()); + let array2 = Array::new_with_buffer(DataType::Utf8, StringBufferBuilder::from_iter(["d", "ee"]).unwrap()); let array3 = Array::new_with_buffer( DataType::Utf8, - StringViewBufferBuilder::from_iter(["f", "gg", "hhh"]).unwrap(), + StringBufferBuilder::from_iter(["f", "gg", "hhh"]).unwrap(), ); let batch1 = Batch::from_arrays([array1], true).unwrap(); diff --git a/crates/rayexec_execution/src/execution/operators_exp/physical_sort/merge.rs b/crates/rayexec_execution/src/execution/operators_exp/physical_sort/merge.rs new file mode 100644 index 000000000..18a4b8237 --- /dev/null +++ b/crates/rayexec_execution/src/execution/operators_exp/physical_sort/merge.rs @@ -0,0 +1,176 @@ +use std::cmp::{Ordering, Reverse}; +use std::collections::{BinaryHeap, VecDeque}; + +use rayexec_error::Result; + +use super::sort_data::{SortBlock, SortData, SortLayout}; +use crate::arrays::buffer_manager::BufferManager; + +/// A block containing sorted rows that's being merged with other blocks. +#[derive(Debug)] +pub struct MergingSortBlock { + /// The current index in the block that we're comparing. + pub curr_idx: usize, + /// The block we're merging. + pub block: SortBlock, +} + +#[derive(Debug)] +pub struct MergeQueue { + pub exhausted: bool, + pub current: MergingSortBlock, + pub remaining: Vec>, // Pop from back to front. +} + +impl MergeQueue +where + B: BufferManager, +{ + fn prepare_next_row(&mut self) { + if self.exhausted { + return; + } + + self.current.curr_idx += 1; + if self.current.curr_idx >= self.current.block.row_count() { + // Get next block in queue. + loop { + match self.remaining.pop() { + Some(block) => { + if block.block.row_count() == 0 { + // Skip empty blocks. + // TODO: Check if this is even valid. + continue; + } + self.current = MergingSortBlock { curr_idx: 0, block }; + return; + } + None => { + self.exhausted = true; + return; + } + } + } + } + } +} + +#[derive(Debug)] +pub struct Merger { + pub queues: Vec>, + pub layout: SortLayout, + pub out_capacity: usize, +} + +impl Merger +where + B: BufferManager, +{ + /// Do a single round of merging. + pub fn merge_round(&mut self, manager: &B) -> Result>> { + let mut out_block = SortBlock::new(manager, &self.layout, self.out_capacity)?; + + // Min heap containing at most one entry from each queue of blocks we're + // merging. + // + // When we pop an entry, the next element from the queue that the popped + // entry is from will be inserted. + let mut min_heap: BinaryHeap>> = BinaryHeap::with_capacity(self.queues.len()); + + // Init heap. + for queue in &mut self.queues { + if queue.exhausted { + continue; + } + + min_heap.push(Reverse(HeapEntry { + row_idx: queue.current.curr_idx, + queue, + })); + } + + for row_idx in 0..self.out_capacity { + let ent = match min_heap.pop() { + Some(ent) => ent, + None => { + // If heap is empty, we exhausted all queues. If out is + // empty, then just return None. + if out_block.row_count() == 0 { + return Ok(None); + } else { + return Ok(Some(out_block)); + } + } + }; + + // Copy the row to out. + out_block.copy_row_from_other(row_idx, &ent.0.queue.current.block, ent.0.row_idx)?; + + // Get next entry for the queue and put into heap. + let queue = ent.0.queue; + queue.prepare_next_row(); + + if queue.exhausted { + // Do nothing, no more entries from this queue. + continue; + } + + let ent = Reverse(HeapEntry { + row_idx: queue.current.curr_idx, + queue, + }); + min_heap.push(ent); + } + + Ok(Some(out_block)) + } +} + +/// Entry with the heap representing a block's row. +/// +/// Eq and Ord comparisons delegate the key buffer this entry represents. +#[derive(Debug)] +struct HeapEntry<'a, B: BufferManager> { + /// The queue this entry was from. + queue: &'a mut MergeQueue, + /// Row index within the block this entry is for. + row_idx: usize, +} + +impl<'a, B> HeapEntry<'a, B> +where + B: BufferManager, +{ + fn get_sort_key_buf(&self) -> &[u8] { + self.queue.current.block.get_sort_key_buf(self.row_idx) + } +} + +impl<'a, B> PartialEq for HeapEntry<'a, B> +where + B: BufferManager, +{ + fn eq(&self, other: &Self) -> bool { + self.get_sort_key_buf().eq(other.get_sort_key_buf()) + } +} + +impl<'a, B> Eq for HeapEntry<'a, B> where B: BufferManager {} + +impl<'a, B> PartialOrd for HeapEntry<'a, B> +where + B: BufferManager, +{ + fn partial_cmp(&self, other: &Self) -> Option { + self.get_sort_key_buf().partial_cmp(other.get_sort_key_buf()) + } +} + +impl<'a, B> Ord for HeapEntry<'a, B> +where + B: BufferManager, +{ + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap() + } +} diff --git a/crates/rayexec_execution/src/execution/operators_exp/physical_sort/mod.rs b/crates/rayexec_execution/src/execution/operators_exp/physical_sort/mod.rs index 2d73abdcf..6bb9dbd47 100644 --- a/crates/rayexec_execution/src/execution/operators_exp/physical_sort/mod.rs +++ b/crates/rayexec_execution/src/execution/operators_exp/physical_sort/mod.rs @@ -1,6 +1,7 @@ pub mod encode; pub mod partition_state; +mod merge; mod sort_data; use std::task::Context; diff --git a/crates/rayexec_execution/src/execution/operators_exp/physical_sort/sort_data.rs b/crates/rayexec_execution/src/execution/operators_exp/physical_sort/sort_data.rs index 9916009f4..7f881646c 100644 --- a/crates/rayexec_execution/src/execution/operators_exp/physical_sort/sort_data.rs +++ b/crates/rayexec_execution/src/execution/operators_exp/physical_sort/sort_data.rs @@ -20,9 +20,11 @@ pub struct SortData { /// Layout indicating how we're performing the sort. layout: SortLayout, /// Blocks not yet sorted. Can continue to be written to. - unsorted: Vec>, - /// Blocks after sort. - sorted: Vec>, + unsorted: Vec>, + /// Sorted blocks with each block containing sorted rows. + /// + /// The order of blocks themselves is arbitrary. + sorted: Vec>, } impl SortData @@ -94,7 +96,7 @@ where /// rows. Otherwise we allocate a new block. /// /// Pops to satisfy lifetimes more easily. - fn pop_or_allocate_unsorted_block(&mut self, count: usize) -> Result> { + fn pop_or_allocate_unsorted_block(&mut self, count: usize) -> Result> { debug_assert!(count <= self.max_per_block); if let Some(last) = self.unsorted.last() { @@ -103,19 +105,19 @@ where } } - let block = self.layout.new_unsorted_block(&self.manager, self.max_per_block)?; + let block = SortBlock::new(&self.manager, &self.layout, self.max_per_block)?; Ok(block) } } #[derive(Debug)] -struct SortLayout { - input_types: Vec, - key_columns: Vec, - key_sizes: Vec, - key_nulls_first: Vec, - key_desc: Vec, +pub struct SortLayout { + pub input_types: Vec, + pub key_columns: Vec, + pub key_sizes: Vec, + pub key_nulls_first: Vec, + pub key_desc: Vec, } impl SortLayout { @@ -146,40 +148,55 @@ impl SortLayout { key_nulls_first, } } +} - /// Create a new block adhering to this sort layout. - fn new_unsorted_block(&self, manager: &B, capacity: usize) -> Result> { - let total_key_size: usize = self.key_sizes.iter().sum(); +/// Blocks containing unsorted input and encoded keys. +#[derive(Debug)] +pub struct SortBlock { + /// Offsets of the first sort key into the key encode buffer. + pub key_encode_offsets: Vec, + /// Buffer containing all encoded keys. + pub key_encode_buffer: Vec, + /// Collection hold all arrays for sort input. + pub block: BatchCollectionBlock, +} + +impl SortBlock +where + B: BufferManager, +{ + pub fn new(manager: &B, layout: &SortLayout, capacity: usize) -> Result { + let total_key_size: usize = layout.key_sizes.iter().sum(); let offsets = (0..capacity).map(|idx| idx * total_key_size).collect(); let buffer = vec![0; total_key_size * capacity]; // TODO: Get from manager. - let block = BatchCollectionBlock::new(manager, &self.input_types, capacity)?; + let block = BatchCollectionBlock::new(manager, &layout.input_types, capacity)?; - Ok(UnsortedBlock { + Ok(SortBlock { key_encode_offsets: offsets, key_encode_buffer: buffer, block, }) } -} -/// Blocks containing unsorted input and encoded keys. -#[derive(Debug)] -struct UnsortedBlock { - /// Offsets of the first sort key into the key encode buffer. - key_encode_offsets: Vec, - /// Buffer containing all encoded keys. - key_encode_buffer: Vec, - /// Collection hold all arrays for sort input. - block: BatchCollectionBlock, -} + /// Get a buffer slice representing the encoded sort keys for a row. + pub fn get_sort_key_buf(&self, row_idx: usize) -> &[u8] { + let start = self.key_encode_offsets[row_idx]; + let end = self.key_encode_offsets[row_idx + 1]; + &self.key_encode_buffer[start..end] + } -impl UnsortedBlock -where - B: BufferManager, -{ - fn sort(mut self, manager: &B, layout: &SortLayout, sort_indices: &mut [usize]) -> Result> { + pub fn row_count(&self) -> usize { + self.block.row_count() + } + + /// Copy a row from another sort block into this sort block. + pub fn copy_row_from_other(&mut self, dest_row: usize, source: &SortBlock, source_row: usize) -> Result<()> { + unimplemented!() + } + + fn sort(mut self, manager: &B, layout: &SortLayout, sort_indices: &mut [usize]) -> Result> { debug_assert_eq!(sort_indices.len(), self.block.row_count()); // Reset sort indices to 0..rowlen @@ -211,7 +228,7 @@ where // Update batch block by selecting by sort indices. self.block.select(manager, sort_indices)?; - Ok(SortedBlock { + Ok(SortBlock { key_encode_offsets: self.key_encode_offsets, key_encode_buffer: output_buf, block: self.block, @@ -219,22 +236,12 @@ where } } -#[derive(Debug)] -struct SortedBlock { - /// Offsets of the first sort key into the key encode buffer. - key_encode_offsets: Vec, - /// Buffer containing all encoded keys. - key_encode_buffer: Vec, - /// Collection hold all arrays for sort input. - block: BatchCollectionBlock, -} - #[cfg(test)] mod tests { use super::*; use crate::arrays::array::Array; use crate::arrays::buffer::physical_type::PhysicalI32; - use crate::arrays::buffer::Int32Builder; + use crate::arrays::buffer::Int32BufferBuilder; use crate::arrays::buffer_manager::NopBufferManager; use crate::arrays::executor::scalar::unary::UnaryExecutor; use crate::expr::physical::column_expr::PhysicalColumnExpr; @@ -256,7 +263,7 @@ mod tests { let batch1 = Batch::from_arrays( [Array::new_with_buffer( DataType::Int32, - Int32Builder::from_iter([4, 7, 6]).unwrap(), + Int32BufferBuilder::from_iter([4, 7, 6]).unwrap(), )], true, ) @@ -264,7 +271,7 @@ mod tests { let batch2 = Batch::from_arrays( [Array::new_with_buffer( DataType::Int32, - Int32Builder::from_iter([2, 8]).unwrap(), + Int32BufferBuilder::from_iter([2, 8]).unwrap(), )], true, ) diff --git a/crates/rayexec_execution/src/expr/physical/column_expr.rs b/crates/rayexec_execution/src/expr/physical/column_expr.rs index e6ab09965..f5cfb09f3 100644 --- a/crates/rayexec_execution/src/expr/physical/column_expr.rs +++ b/crates/rayexec_execution/src/expr/physical/column_expr.rs @@ -59,9 +59,7 @@ impl DatabaseProtoConv for PhysicalColumnExpr { type ProtoType = rayexec_proto::generated::physical_expr::PhysicalColumnExpr; fn to_proto_ctx(&self, _context: &DatabaseContext) -> Result { - Ok(Self::ProtoType { - idx: self.idx as u32, - }) + Ok(Self::ProtoType { idx: self.idx as u32 }) } fn from_proto_ctx(proto: Self::ProtoType, _context: &DatabaseContext) -> Result { @@ -76,7 +74,7 @@ mod tests { use super::*; use crate::arrays::buffer::addressable::AddressableStorage; use crate::arrays::buffer::physical_type::{PhysicalDictionary, PhysicalI32}; - use crate::arrays::buffer::{Int32Builder, StringViewBufferBuilder}; + use crate::arrays::buffer::{Int32BufferBuilder, StringBufferBuilder}; use crate::arrays::datatype::DataType; use crate::arrays::executor::scalar::unary::UnaryExecutor; @@ -84,14 +82,8 @@ mod tests { fn eval_simple() { let mut batch = Batch::from_arrays( [ - Array::new_with_buffer( - DataType::Int32, - Int32Builder::from_iter([4, 5, 6]).unwrap(), - ), - Array::new_with_buffer( - DataType::Utf8, - StringViewBufferBuilder::from_iter(["a", "b", "c"]).unwrap(), - ), + Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([4, 5, 6]).unwrap()), + Array::new_with_buffer(DataType::Utf8, StringBufferBuilder::from_iter(["a", "b", "c"]).unwrap()), ], true, ) @@ -127,7 +119,7 @@ mod tests { let mut batch = Batch::from_arrays( [Array::new_with_buffer( DataType::Int32, - Int32Builder::from_iter([4, 5, 6]).unwrap(), + Int32BufferBuilder::from_iter([4, 5, 6]).unwrap(), )], true, ) diff --git a/crates/rayexec_execution/src/expr/physical/literal_expr.rs b/crates/rayexec_execution/src/expr/physical/literal_expr.rs index 83b22b15a..2e682fc84 100644 --- a/crates/rayexec_execution/src/expr/physical/literal_expr.rs +++ b/crates/rayexec_execution/src/expr/physical/literal_expr.rs @@ -70,7 +70,7 @@ mod tests { use super::*; use crate::arrays::buffer::physical_type::PhysicalI32; - use crate::arrays::buffer::Int32Builder; + use crate::arrays::buffer::Int32BufferBuilder; use crate::arrays::datatype::DataType; use crate::arrays::executor::scalar::unary::UnaryExecutor; @@ -79,7 +79,7 @@ mod tests { let mut batch = Batch::from_arrays( [Array::new_with_buffer( DataType::Int32, - Int32Builder::from_iter([4, 5, 6]).unwrap(), + Int32BufferBuilder::from_iter([4, 5, 6]).unwrap(), )], true, ) @@ -112,7 +112,7 @@ mod tests { let mut batch = Batch::from_arrays( [Array::new_with_buffer( DataType::Int32, - Int32Builder::from_iter([4, 5, 6]).unwrap(), + Int32BufferBuilder::from_iter([4, 5, 6]).unwrap(), )], true, ) diff --git a/crates/rayexec_execution/src/expr/physical/scalar_function_expr.rs b/crates/rayexec_execution/src/expr/physical/scalar_function_expr.rs index 4e4f658f7..7614f5bd0 100644 --- a/crates/rayexec_execution/src/expr/physical/scalar_function_expr.rs +++ b/crates/rayexec_execution/src/expr/physical/scalar_function_expr.rs @@ -123,7 +123,7 @@ mod tests { use super::*; use crate::arrays::buffer::physical_type::PhysicalI32; - use crate::arrays::buffer::Int32Builder; + use crate::arrays::buffer::Int32BufferBuilder; use crate::arrays::buffer_manager::NopBufferManager; use crate::arrays::datatype::DataType; use crate::expr::column_expr::ColumnExpr; @@ -155,7 +155,7 @@ mod tests { let mut batch = Batch::from_arrays( [Array::new_with_buffer( DataType::Int32, - Int32Builder::from_iter([4, 5, 6]).unwrap(), + Int32BufferBuilder::from_iter([4, 5, 6]).unwrap(), )], true, ) @@ -163,9 +163,7 @@ mod tests { let expr = PhysicalScalarFunctionExpr { function: make_temp_add_2(), - inputs: vec![PhysicalScalarExpression::Column(PhysicalColumnExpr { - idx: 0, - })], + inputs: vec![PhysicalScalarExpression::Column(PhysicalColumnExpr { idx: 0 })], }; let mut state = ExpressionState {