Skip to content

Commit

Permalink
sort merger
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 23, 2024
1 parent d2c1bb3 commit 405cf40
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 247 deletions.
12 changes: 12 additions & 0 deletions crates/rayexec_execution/src/arrays/array_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use iterutil::exact_size::IntoExactSizeIterator;
use rayexec_error::Result;

use super::array::Array;
use super::buffer::physical_type::PhysicalStorage;
use super::buffer_manager::BufferManager;
use super::datatype::DataType;

#[derive(Debug)]
pub struct Int8Builder<'a, B: BufferManager> {
pub manager: &'a B,
}
74 changes: 23 additions & 51 deletions crates/rayexec_execution/src/arrays/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@ use physical_type::{
PhysicalUtf8,
};
use rayexec_error::{not_implemented, RayexecError, Result};
use string_view::{
StringViewHeap,
StringViewMetadataUnion,
StringViewStorage,
StringViewStorageMut,
};
use string_view::{StringViewHeap, StringViewMetadataUnion, StringViewStorage, StringViewStorageMut};
use struct_buffer::{StructBuffer, StructItemMetadata};

use super::array::Array;
Expand Down Expand Up @@ -98,11 +93,9 @@ where

pub fn try_as_slice<S: PhysicalStorage>(&self) -> Result<&[S::PrimaryBufferType]> {
if S::PHYSICAL_TYPE != self.physical_type {
return Err(
RayexecError::new("Attempted to cast buffer to wrong physical type")
.with_field("expected_type", self.physical_type)
.with_field("requested_type", S::PHYSICAL_TYPE),
);
return Err(RayexecError::new("Attempted to cast buffer to wrong physical type")
.with_field("expected_type", self.physical_type)
.with_field("requested_type", S::PHYSICAL_TYPE));
}

let data = unsafe { self.primary.as_slice::<S::PrimaryBufferType>() };
Expand All @@ -112,11 +105,9 @@ where

pub fn try_as_slice_mut<S: PhysicalStorage>(&mut self) -> Result<&mut [S::PrimaryBufferType]> {
if S::PHYSICAL_TYPE != self.physical_type {
return Err(
RayexecError::new("Attempted to cast buffer to wrong physical type")
.with_field("expected_type", self.physical_type)
.with_field("requested_type", S::PHYSICAL_TYPE),
);
return Err(RayexecError::new("Attempted to cast buffer to wrong physical type")
.with_field("expected_type", self.physical_type)
.with_field("requested_type", S::PHYSICAL_TYPE));
}

let data = unsafe { self.primary.as_slice_mut::<S::PrimaryBufferType>() };
Expand All @@ -136,11 +127,9 @@ where
pub fn try_as_string_view_storage_mut(&mut self) -> Result<StringViewStorageMut<'_>> {
// TODO: Duplicated, but let's us take each field mutably.
if PhysicalUtf8::PHYSICAL_TYPE != self.physical_type {
return Err(
RayexecError::new("Attempted to cast buffer to wrong physical type")
.with_field("expected_type", self.physical_type)
.with_field("requested_type", PhysicalUtf8::PHYSICAL_TYPE),
);
return Err(RayexecError::new("Attempted to cast buffer to wrong physical type")
.with_field("expected_type", self.physical_type)
.with_field("requested_type", PhysicalUtf8::PHYSICAL_TYPE));
}

let metadata = unsafe { self.primary.as_slice_mut::<StringViewMetadataUnion>() };
Expand All @@ -164,15 +153,9 @@ where
}

/// Appends data from another buffer into this buffer.
pub fn append_from<S: PhysicalStorage>(
&mut self,
manager: &B,
other: &ArrayBuffer,
) -> Result<()> {
pub fn append_from<S: PhysicalStorage>(&mut self, manager: &B, other: &ArrayBuffer) -> Result<()> {
if !self.secondary.is_none() {
return Err(RayexecError::new(
"Appending secondary buffers not yet supported",
));
return Err(RayexecError::new("Appending secondary buffers not yet supported"));
}

let orig_len = self.capacity();
Expand Down Expand Up @@ -349,8 +332,8 @@ impl<B: BufferManager> RawBufferParts<B> {
}
}

pub type Int8Builder = PrimBufferBuilder<PhysicalI8>;
pub type Int32Builder = PrimBufferBuilder<PhysicalI32>;
pub type Int8BufferBuilder = PrimBufferBuilder<PhysicalI8>;
pub type Int32BufferBuilder = PrimBufferBuilder<PhysicalI32>;

#[derive(Debug)]
pub struct PrimBufferBuilder<S: PhysicalStorage> {
Expand All @@ -363,8 +346,7 @@ impl<S: PhysicalStorage> PrimBufferBuilder<S> {
I: IntoExactSizeIterator<Item = S::PrimaryBufferType>,
{
let iter = iter.into_iter();
let mut data =
RawBufferParts::try_new::<S::PrimaryBufferType>(&NopBufferManager, iter.len())?;
let mut data = RawBufferParts::try_new::<S::PrimaryBufferType>(&NopBufferManager, iter.len())?;

let data_slice = unsafe { data.as_slice_mut() };
for (idx, val) in iter.enumerate() {
Expand All @@ -380,17 +362,16 @@ impl<S: PhysicalStorage> PrimBufferBuilder<S> {
}

#[derive(Debug)]
pub struct StringViewBufferBuilder;
pub struct StringBufferBuilder;

impl StringViewBufferBuilder {
impl StringBufferBuilder {
pub fn from_iter<A, I>(iter: I) -> Result<ArrayBuffer>
where
A: AsRef<str>,
I: IntoExactSizeIterator<Item = A>,
{
let iter = iter.into_iter();
let mut data =
RawBufferParts::try_new::<StringViewMetadataUnion>(&NopBufferManager, iter.len())?;
let mut data = RawBufferParts::try_new::<StringViewMetadataUnion>(&NopBufferManager, iter.len())?;

let mut heap = StringViewHeap::new();

Expand Down Expand Up @@ -451,18 +432,10 @@ impl ListBufferBuilder {

// TODO: Move this out.
match child_buf.physical_type {
PhysicalType::UntypedNull => {
child_buf.append_from::<PhysicalUntypedNull>(&NopBufferManager, &child)?
}
PhysicalType::Int8 => {
child_buf.append_from::<PhysicalI8>(&NopBufferManager, &child)?
}
PhysicalType::Int32 => {
child_buf.append_from::<PhysicalI32>(&NopBufferManager, &child)?
}
PhysicalType::Utf8 => {
child_buf.append_from::<PhysicalUtf8>(&NopBufferManager, &child)?
}
PhysicalType::UntypedNull => child_buf.append_from::<PhysicalUntypedNull>(&NopBufferManager, &child)?,
PhysicalType::Int8 => child_buf.append_from::<PhysicalI8>(&NopBufferManager, &child)?,
PhysicalType::Int32 => child_buf.append_from::<PhysicalI32>(&NopBufferManager, &child)?,
PhysicalType::Utf8 => child_buf.append_from::<PhysicalUtf8>(&NopBufferManager, &child)?,
other => not_implemented!("append from {other}"),
}
}
Expand Down Expand Up @@ -542,8 +515,7 @@ mod tests {

#[test]
fn new_from_strings_iter() {
let buf =
StringViewBufferBuilder::from_iter(["a", "bb", "ccc", "ddddddddddddddd"]).unwrap();
let buf = StringBufferBuilder::from_iter(["a", "bb", "ccc", "ddddddddddddddd"]).unwrap();
let view_buf = buf.try_as_string_view_storage().unwrap();

assert_eq!("a", view_buf.get(0).unwrap());
Expand Down
40 changes: 8 additions & 32 deletions crates/rayexec_execution/src/arrays/executor/aggregate/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod tests {
use super::*;
use crate::arrays::buffer::addressable::MutableAddressableStorage;
use crate::arrays::buffer::physical_type::{PhysicalI32, PhysicalUtf8};
use crate::arrays::buffer::{Int32Builder, StringViewBufferBuilder};
use crate::arrays::buffer::{Int32BufferBuilder, StringBufferBuilder};
use crate::arrays::datatype::DataType;
use crate::arrays::executor::PutBuffer;
use crate::arrays::validity::Validity;
Expand Down Expand Up @@ -86,18 +86,9 @@ mod tests {
#[test]
fn unary_primitive_single_state() {
let mut states = [TestSumState::default()];
let array = Array::new_with_buffer(
DataType::Int32,
Int32Builder::from_iter([1, 2, 3, 4, 5]).unwrap(),
);
let array = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3, 4, 5]).unwrap());

UnaryNonNullUpdater::update::<PhysicalI32, _, _>(
&array,
[0, 1, 2, 4],
[0, 0, 0, 0],
&mut states,
)
.unwrap();
UnaryNonNullUpdater::update::<PhysicalI32, _, _>(&array, [0, 1, 2, 4], [0, 0, 0, 0], &mut states).unwrap();

assert_eq!(11, states[0].val);
}
Expand All @@ -109,29 +100,20 @@ mod tests {
validity.set_invalid(0);
let array = Array::new_with_validity(
DataType::Int32,
Int32Builder::from_iter([1, 2, 3, 4, 5]).unwrap(),
Int32BufferBuilder::from_iter([1, 2, 3, 4, 5]).unwrap(),
validity,
)
.unwrap();

UnaryNonNullUpdater::update::<PhysicalI32, _, _>(
&array,
[0, 1, 2, 4],
[0, 0, 0, 0],
&mut states,
)
.unwrap();
UnaryNonNullUpdater::update::<PhysicalI32, _, _>(&array, [0, 1, 2, 4], [0, 0, 0, 0], &mut states).unwrap();

assert_eq!(10, states[0].val);
}

#[test]
fn unary_primitive_multiple_states() {
let mut states = [TestSumState::default(), TestSumState::default()];
let array = Array::new_with_buffer(
DataType::Int32,
Int32Builder::from_iter([1, 2, 3, 4, 5]).unwrap(),
);
let array = Array::new_with_buffer(DataType::Int32, Int32BufferBuilder::from_iter([1, 2, 3, 4, 5]).unwrap());

UnaryNonNullUpdater::update::<PhysicalI32, _, _>(
&array,
Expand Down Expand Up @@ -176,16 +158,10 @@ mod tests {
let mut states = [TestStringAgg::default()];
let array = Array::new_with_buffer(
DataType::Utf8,
StringViewBufferBuilder::from_iter(["aa", "bbb", "cccc"]).unwrap(),
StringBufferBuilder::from_iter(["aa", "bbb", "cccc"]).unwrap(),
);

UnaryNonNullUpdater::update::<PhysicalUtf8, _, _>(
&array,
[0, 1, 2],
[0, 0, 0],
&mut states,
)
.unwrap();
UnaryNonNullUpdater::update::<PhysicalUtf8, _, _>(&array, [0, 1, 2], [0, 0, 0], &mut states).unwrap();

assert_eq!("aabbbcccc", &states[0].val);
}
Expand Down
Loading

0 comments on commit 405cf40

Please sign in to comment.