Skip to content

Commit

Permalink
list buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 20, 2024
1 parent e1e7ec6 commit ca5fb3b
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 13 deletions.
11 changes: 10 additions & 1 deletion crates/rayexec_bullet/src/exp/buffer/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,14 @@ pub struct ListItemMetadata {

#[derive(Debug)]
pub struct ListBuffer<R: ReservationTracker> {
child: Array<R>, // TODO
child: Array<R>,
}

impl<R> ListBuffer<R>
where
R: ReservationTracker,
{
pub fn new(child: Array<R>) -> Self {
ListBuffer { child }
}
}
116 changes: 104 additions & 12 deletions crates/rayexec_bullet/src/exp/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@ pub mod string_view;

use std::marker::PhantomData;

use list::ListBuffer;
use physical_type::{PhysicalI32, PhysicalStorage, PhysicalType, PhysicalUtf8};
use rayexec_error::{RayexecError, Result};
use list::{ListBuffer, ListItemMetadata};
use physical_type::{
PhysicalI32,
PhysicalI8,
PhysicalList,
PhysicalStorage,
PhysicalType,
PhysicalUntypedNull,
PhysicalUtf8,
};
use rayexec_error::{not_implemented, RayexecError, Result};
use reservation::{NopReservationTracker, Reservation, ReservationTracker};
use string_view::{
StringViewHeap,
Expand All @@ -17,8 +25,9 @@ use string_view::{
StringViewStorageMut,
};

use super::array::Array;
use crate::compute::util::IntoExactSizedIterator;
use crate::executor::physical_type::PhysicalI8;
use crate::datatype::DataType;

#[derive(Debug)]
pub struct ArrayBuffer<R: ReservationTracker = NopReservationTracker> {
Expand Down Expand Up @@ -74,6 +83,10 @@ where
self.len() == 0
}

pub fn secondary_buffers(&self) -> &SecondaryBuffers<R> {
self.secondary.as_ref()
}

pub fn try_as_slice<S: PhysicalStorage>(&self) -> Result<&[S::PrimaryBufferType]> {
if S::PHYSICAL_TYPE != self.physical_type {
return Err(
Expand Down Expand Up @@ -129,7 +142,7 @@ where
}
}

pub fn resize<S: PhysicalStorage>(&mut self, len: usize) -> Result<()> {
pub fn resize<S: PhysicalStorage>(&mut self, tracker: &R, len: usize) -> Result<()> {
if S::PHYSICAL_TYPE != self.physical_type {
return Err(
RayexecError::new("Attempted to resize buffer using wrong physical type")
Expand All @@ -138,11 +151,15 @@ where
);
}

unsafe { self.primary.resize::<S::PrimaryBufferType>(len) }
unsafe { self.primary.resize::<S::PrimaryBufferType>(tracker, len) }
}

/// Appends data from another buffer into this buffer.
pub fn append_from<S: PhysicalStorage>(&mut self, other: &ArrayBuffer) -> Result<()> {
pub fn append_from<S: PhysicalStorage>(
&mut self,
tracker: &R,
other: &ArrayBuffer,
) -> Result<()> {
if !self.secondary.is_none() {
return Err(RayexecError::new(
"Appending secondary buffers not yet supported",
Expand All @@ -156,7 +173,7 @@ where
let other = other.try_as_slice::<S>()?;

// Resize self to new size.
self.resize::<S>(new_len)?;
self.resize::<S>(tracker, new_len)?;

// Now copy everything over.
let this = self.try_as_slice_mut::<S>()?;
Expand Down Expand Up @@ -255,7 +272,23 @@ impl<R: ReservationTracker> RawBufferParts<R> {
std::slice::from_raw_parts_mut(self.ptr.cast::<T>(), self.len)
}

unsafe fn resize<T: Default + Copy>(&mut self, len: usize) -> Result<()> {
unsafe fn resize<T: Default + Copy>(&mut self, tracker: &R, len: usize) -> Result<()> {
if self.len == 0 {
// Special case when length is zero.
//
// We want to enable the use case where we initialize the buffer to
// nothing (null) and later append to it. However, the `T` that we
// pass in here might have a different alignment which wouldn't be
// safe.
//
// By just creating a new buffer, we can avoid that issue.
let new_self = Self::try_new::<T>(tracker, len)?;
*self = new_self;
return Ok(());
}

debug_assert_eq!(self.ptr as usize % std::mem::size_of::<T>(), 0);

let mut data: Vec<T> = Vec::from_raw_parts(self.ptr.cast(), self.len, self.cap);

// TODO: Reservation stuff.
Expand Down Expand Up @@ -335,11 +368,69 @@ impl StringViewBufferBuilder {
pub struct ListBufferBuilder;

impl ListBufferBuilder {
pub fn from_iter<I>(iter: I) -> Result<ArrayBuffer>
pub fn from_iter<I>(child_datatype: DataType, iter: I) -> Result<ArrayBuffer>
where
I: IntoExactSizedIterator<Item = ArrayBuffer>,
{
unimplemented!()
let mut iter = iter.into_iter();

let mut data =
RawBufferParts::try_new::<ListItemMetadata>(&NopReservationTracker, iter.len())?;

// Init child buffer with first array buffer.
let mut child_buf = match iter.next() {
Some(buf) => buf,
None => {
// We have a list array, but no lists (rows == 0).
return Ok(ArrayBuffer {
physical_type: PhysicalList::PHYSICAL_TYPE,
primary: data,
secondary: Box::new(SecondaryBuffers::None),
});
}
};

// Track first list.
let parent = unsafe { data.as_slice_mut() };
parent[0] = ListItemMetadata {
offset: 0,
len: child_buf.len() as i32,
};

// Now iter all remaining array buffers, append the actual data to the
// child buf and tracking the metadata in the parent buf.
for (idx, child) in iter.enumerate() {
// +1 since we already have the first entry.
parent[idx + 1] = ListItemMetadata {
offset: child_buf.len() as i32,
len: child.len() as i32,
};

// TODO: Move this out.
match child_buf.physical_type {
PhysicalType::UntypedNull => {
child_buf.append_from::<PhysicalUntypedNull>(&NopReservationTracker, &child)?
}
PhysicalType::Int8 => {
child_buf.append_from::<PhysicalI8>(&NopReservationTracker, &child)?
}
PhysicalType::Int32 => {
child_buf.append_from::<PhysicalI32>(&NopReservationTracker, &child)?
}
PhysicalType::Utf8 => {
child_buf.append_from::<PhysicalUtf8>(&NopReservationTracker, &child)?
}
other => not_implemented!("append from {other}"),
}
}

let child_array = Array::new(child_datatype, child_buf);

Ok(ArrayBuffer {
physical_type: PhysicalList::PHYSICAL_TYPE,
primary: data,
secondary: Box::new(SecondaryBuffers::List(ListBuffer::new(child_array))),
})
}
}

Expand Down Expand Up @@ -394,7 +485,8 @@ mod tests {
let mut a = PrimBufferBuilder::<PhysicalI32>::from_iter([4, 5, 6]).unwrap();
let b = PrimBufferBuilder::<PhysicalI32>::from_iter([7, 8]).unwrap();

a.append_from::<PhysicalI32>(&b).unwrap();
a.append_from::<PhysicalI32>(&NopReservationTracker, &b)
.unwrap();

let a_slice = a.try_as_slice::<PhysicalI32>().unwrap();
assert_eq!(&[4, 5, 6, 7, 8], a_slice);
Expand Down
22 changes: 22 additions & 0 deletions crates/rayexec_bullet/src/exp/buffer/physical_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,28 @@ pub trait MutablePhysicalStorage: PhysicalStorage {
R: ReservationTracker;
}

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct UntypedNull;

#[derive(Debug, Clone, Copy)]
pub struct PhysicalUntypedNull;

impl PhysicalStorage for PhysicalUntypedNull {
const PHYSICAL_TYPE: PhysicalType = PhysicalType::UntypedNull;

type PrimaryBufferType = UntypedNull;
type StorageType = UntypedNull;

type Storage<'a> = &'a [Self::StorageType];

fn get_storage<R>(buffer: &ArrayBuffer<R>) -> Result<Self::Storage<'_>>
where
R: ReservationTracker,
{
buffer.try_as_slice::<Self>()
}
}

#[derive(Debug, Clone, Copy)]
pub struct PhysicalI8;

Expand Down

0 comments on commit ca5fb3b

Please sign in to comment.