Skip to content

Commit

Permalink
list values
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 31, 2024
1 parent 3a0de15 commit 04d6d38
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 78 deletions.
4 changes: 4 additions & 0 deletions crates/rayexec_execution/src/arrays/array/exp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ where
&self.validity
}

pub fn validity_mut(&mut self) -> &mut Validity {
&mut self.validity
}

pub fn put_validity(&mut self, validity: Validity) -> Result<()> {
if validity.len() != self.data().capacity() {
return Err(RayexecError::new("Invalid validity length")
Expand Down
87 changes: 87 additions & 0 deletions crates/rayexec_execution/src/arrays/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ use super::array::array_data::ArrayData;
use super::array::exp::Array;
use super::array::validity::Validity;

/// Buffer for arrays.
///
/// Buffers are able to hold a fixed number of elements in the primary buffer.
/// Some types make use of secondary buffers for additional data. In such cases,
/// the primary buffer may hold things like metadata or offsets depending on the
/// type.
#[derive(Debug)]
pub struct ArrayBuffer<B: BufferManager = NopBufferManager> {
/// Physical type of the buffer.
Expand Down Expand Up @@ -148,6 +154,38 @@ where
Ok(BinaryViewAddressableMut { metadata, heap })
}

/// Resize the primary buffer to be able to hold `capacity` elements.
pub fn resize_primary<S: PhysicalStorage>(
&mut self,
manager: &B,
capacity: usize,
) -> Result<()> {
self.check_type(S::PHYSICAL_TYPE)?;

unsafe {
self.primary
.resize::<S::PrimaryBufferType>(manager, capacity)
}
}

/// Ensure the primary buffer can hold `capacity` elements.
///
/// Does nothing if the primary buffer already has enough capacity.
pub fn reserve_primary<S: PhysicalStorage>(
&mut self,
manager: &B,
capacity: usize,
) -> Result<()> {
self.check_type(S::PHYSICAL_TYPE)?;

if self.capacity() >= capacity {
return Ok(());
}

self.resize_primary::<S>(manager, capacity)
}

/// Checks that the physical type of this buffer matches `want`.
fn check_type(&self, want: PhysicalType) -> Result<()> {
if want != self.physical_type {
return Err(RayexecError::new("Physical types don't match")
Expand Down Expand Up @@ -228,3 +266,52 @@ where
ListBuffer { child }
}
}

#[cfg(test)]
mod tests {
use physical_type::PhysicalI32;

use super::*;

#[test]
fn resize_primitive_increase_size() {
let mut buffer =
ArrayBuffer::with_primary_capacity::<PhysicalI32>(&NopBufferManager, 4).unwrap();

let s = buffer.try_as_slice::<PhysicalI32>().unwrap();
assert_eq!(4, s.len());

buffer
.resize_primary::<PhysicalI32>(&NopBufferManager, 8)
.unwrap();

let s = buffer.try_as_slice_mut::<PhysicalI32>().unwrap();
assert_eq!(8, s.len());

// Sanity check, make sure we can write to it.
s.iter_mut().for_each(|v| *v = 12);

assert_eq!(vec![12; 8].as_slice(), s);
}

#[test]
fn resize_primitive_decrease_size() {
let mut buffer =
ArrayBuffer::with_primary_capacity::<PhysicalI32>(&NopBufferManager, 4).unwrap();

let s = buffer.try_as_slice::<PhysicalI32>().unwrap();
assert_eq!(4, s.len());

buffer
.resize_primary::<PhysicalI32>(&NopBufferManager, 2)
.unwrap();

let s = buffer.try_as_slice_mut::<PhysicalI32>().unwrap();
assert_eq!(2, s.len());

// Sanity check, make sure we can write to it.
s.iter_mut().for_each(|v| *v = 12);

assert_eq!(vec![12; 2].as_slice(), s);
}
}
10 changes: 10 additions & 0 deletions crates/rayexec_execution/src/arrays/buffer/physical_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ impl PhysicalStorage for PhysicalUntypedNull {
}
}

impl MutablePhysicalStorage for PhysicalUntypedNull {
type AddressableMut<'a> = &'a mut [UntypedNull];

fn get_addressable_mut<B: BufferManager>(
buffer: &mut ArrayBuffer<B>,
) -> Result<Self::AddressableMut<'_>> {
buffer.try_as_slice_mut::<Self>()
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PhysicalUtf8;

Expand Down
160 changes: 117 additions & 43 deletions crates/rayexec_execution/src/arrays/testutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use std::collections::BTreeMap;
use std::fmt::Debug;

use iterutil::IntoExactSizeIterator;

use super::array::exp::Array;
use super::batch_exp::Batch;
use crate::arrays::array::flat::FlatArrayView;
Expand All @@ -21,6 +23,7 @@ use crate::arrays::buffer::physical_type::{
PhysicalI32,
PhysicalI64,
PhysicalI8,
PhysicalList,
PhysicalStorage,
PhysicalType,
PhysicalU128,
Expand All @@ -30,6 +33,7 @@ use crate::arrays::buffer::physical_type::{
PhysicalU8,
PhysicalUtf8,
};
use crate::arrays::buffer::SecondaryBuffer;
use crate::arrays::executor_exp::scalar::unary::UnaryExecutor;

/// Assert two arrays are logically equal.
Expand All @@ -42,64 +46,133 @@ pub fn assert_arrays_eq(array1: &Array, array2: &Array) {
array2.capacity(),
"array capacities differ"
);
assert_arrays_eq_count(array1, array2, array1.capacity())

let sel = 0..array1.capacity();

assert_arrays_eq_sel(array1, sel.clone(), array2, sel)
}

/// Asserts that two arrays are logically equal for the first `count` rows.
///
/// This will check valid and invalid values. Assertion error messages will
/// print out Some/None to represent valid/invalid.
#[track_caller]
pub fn assert_arrays_eq_count(array1: &Array, array2: &Array, count: usize) {
pub fn assert_arrays_eq_sel(
array1: &Array,
sel1: impl IntoExactSizeIterator<Item = usize>,
array2: &Array,
sel2: impl IntoExactSizeIterator<Item = usize>,
) {
assert_eq!(array1.datatype, array2.datatype);

let flat1 = array1.flat_view().unwrap();
let flat2 = array2.flat_view().unwrap();

fn assert_eq_inner<S>(flat1: FlatArrayView, flat2: FlatArrayView, count: usize)
where
S: PhysicalStorage,
S::StorageType: ToOwned<Owned: Debug + PartialEq>,
{
let mut out = BTreeMap::new();
let sel = 0..count;

UnaryExecutor::for_each_flat::<S, _>(flat1, sel.clone(), |idx, v| {
out.insert(idx, v.map(|v| v.to_owned()));
})
.unwrap();
match array1.datatype.physical_type() {
PhysicalType::Boolean => {
assert_arrays_eq_sel_inner::<PhysicalBool>(flat1, sel1, flat2, sel2)
}
PhysicalType::Int8 => assert_arrays_eq_sel_inner::<PhysicalI8>(flat1, sel1, flat2, sel2),
PhysicalType::Int16 => assert_arrays_eq_sel_inner::<PhysicalI16>(flat1, sel1, flat2, sel2),
PhysicalType::Int32 => assert_arrays_eq_sel_inner::<PhysicalI32>(flat1, sel1, flat2, sel2),
PhysicalType::Int64 => assert_arrays_eq_sel_inner::<PhysicalI64>(flat1, sel1, flat2, sel2),
PhysicalType::Int128 => {
assert_arrays_eq_sel_inner::<PhysicalI128>(flat1, sel1, flat2, sel2)
}
PhysicalType::UInt8 => assert_arrays_eq_sel_inner::<PhysicalU8>(flat1, sel1, flat2, sel2),
PhysicalType::UInt16 => assert_arrays_eq_sel_inner::<PhysicalU16>(flat1, sel1, flat2, sel2),
PhysicalType::UInt32 => assert_arrays_eq_sel_inner::<PhysicalU32>(flat1, sel1, flat2, sel2),
PhysicalType::UInt64 => assert_arrays_eq_sel_inner::<PhysicalU64>(flat1, sel1, flat2, sel2),
PhysicalType::UInt128 => {
assert_arrays_eq_sel_inner::<PhysicalU128>(flat1, sel1, flat2, sel2)
}
PhysicalType::Float16 => {
assert_arrays_eq_sel_inner::<PhysicalF16>(flat1, sel1, flat2, sel2)
}
PhysicalType::Float32 => {
assert_arrays_eq_sel_inner::<PhysicalF32>(flat1, sel1, flat2, sel2)
}
PhysicalType::Float64 => {
assert_arrays_eq_sel_inner::<PhysicalF64>(flat1, sel1, flat2, sel2)
}
PhysicalType::Utf8 => assert_arrays_eq_sel_inner::<PhysicalUtf8>(flat1, sel1, flat2, sel2),
PhysicalType::List => {
assert_arrays_eq_sel_list_inner(flat1, sel1, flat2, sel2);
}
other => unimplemented!("{other:?}"),
}
}

UnaryExecutor::for_each_flat::<S, _>(flat2, sel, |idx, v| match out.remove(&idx) {
Some(existing) => {
let v = v.map(|v| v.to_owned());
assert_eq!(existing, v, "values differ at index {idx}");
}
None => panic!("missing value for index in array 1 {idx}"),
})
.unwrap();
fn assert_arrays_eq_sel_list_inner(
flat1: FlatArrayView,
sel1: impl IntoExactSizeIterator<Item = usize>,
flat2: FlatArrayView,
sel2: impl IntoExactSizeIterator<Item = usize>,
) {
let inner1 = match flat1.array_buffer.get_secondary() {
SecondaryBuffer::List(list) => &list.child,
_ => panic!("Missing child for array 1"),
};

if !out.is_empty() {
panic!("extra entries in array 1: {:?}", out);
}
let inner2 = match flat2.array_buffer.get_secondary() {
SecondaryBuffer::List(list) => &list.child,
_ => panic!("Missing child for array 2"),
};

let metas1 = PhysicalList::get_addressable(&flat1.array_buffer).unwrap();
let metas2 = PhysicalList::get_addressable(&flat2.array_buffer).unwrap();

let sel1 = sel1.into_iter();
let sel2 = sel2.into_iter();
assert_eq!(sel1.len(), sel2.len());

for (row_idx, (idx1, idx2)) in sel1.zip(sel2).enumerate() {
let idx1 = flat1.selection.get(idx1).unwrap();
let idx2 = flat1.selection.get(idx2).unwrap();

assert_eq!(
flat1.validity.is_valid(idx1),
flat2.validity.is_valid(idx2),
"validity mismatch for row {row_idx}"
);

let m1 = metas1.get(idx1).unwrap();
let m2 = metas2.get(idx2).unwrap();

let sel1 = (m1.offset as usize)..((m1.offset + m1.len) as usize);
let sel2 = (m2.offset as usize)..((m2.offset + m2.len) as usize);

assert_arrays_eq_sel(inner1, sel1, inner2, sel2);
}
}

match array1.datatype.physical_type() {
PhysicalType::Boolean => assert_eq_inner::<PhysicalBool>(flat1, flat2, count),
PhysicalType::Int8 => assert_eq_inner::<PhysicalI8>(flat1, flat2, count),
PhysicalType::Int16 => assert_eq_inner::<PhysicalI16>(flat1, flat2, count),
PhysicalType::Int32 => assert_eq_inner::<PhysicalI32>(flat1, flat2, count),
PhysicalType::Int64 => assert_eq_inner::<PhysicalI64>(flat1, flat2, count),
PhysicalType::Int128 => assert_eq_inner::<PhysicalI128>(flat1, flat2, count),
PhysicalType::UInt8 => assert_eq_inner::<PhysicalU8>(flat1, flat2, count),
PhysicalType::UInt16 => assert_eq_inner::<PhysicalU16>(flat1, flat2, count),
PhysicalType::UInt32 => assert_eq_inner::<PhysicalU32>(flat1, flat2, count),
PhysicalType::UInt64 => assert_eq_inner::<PhysicalU64>(flat1, flat2, count),
PhysicalType::UInt128 => assert_eq_inner::<PhysicalU128>(flat1, flat2, count),
PhysicalType::Float16 => assert_eq_inner::<PhysicalF16>(flat1, flat2, count),
PhysicalType::Float32 => assert_eq_inner::<PhysicalF32>(flat1, flat2, count),
PhysicalType::Float64 => assert_eq_inner::<PhysicalF64>(flat1, flat2, count),
PhysicalType::Utf8 => assert_eq_inner::<PhysicalUtf8>(flat1, flat2, count),
other => unimplemented!("{other:?}"),
fn assert_arrays_eq_sel_inner<S>(
flat1: FlatArrayView,
sel1: impl IntoExactSizeIterator<Item = usize>,
flat2: FlatArrayView,
sel2: impl IntoExactSizeIterator<Item = usize>,
) where
S: PhysicalStorage,
S::StorageType: ToOwned<Owned: Debug + PartialEq>,
{
let mut out = BTreeMap::new();

UnaryExecutor::for_each_flat::<S, _>(flat1, sel1, |idx, v| {
out.insert(idx, v.map(|v| v.to_owned()));
})
.unwrap();

UnaryExecutor::for_each_flat::<S, _>(flat2, sel2, |idx, v| match out.remove(&idx) {
Some(existing) => {
let v = v.map(|v| v.to_owned());
assert_eq!(existing, v, "values differ at index {idx}");
}
None => panic!("missing value for index in array 1 {idx}"),
})
.unwrap();

if !out.is_empty() {
panic!("extra entries in array 1: {:?}", out);
}
}

Expand All @@ -121,7 +194,8 @@ pub fn assert_batches_eq(batch1: &Batch, batch2: &Batch) {
);

for (array1, array2) in arrays1.iter().zip(arrays2) {
assert_arrays_eq_count(array1, array2, batch1.num_rows());
let sel = 0..batch1.num_rows();
assert_arrays_eq_sel(array1, sel.clone(), array2, sel);
}
}

Expand Down
Loading

0 comments on commit 04d6d38

Please sign in to comment.