Skip to content

Commit

Permalink
chore: Add raw buffer (#3389)
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr authored Jan 8, 2025
1 parent f1d3428 commit 600a7e7
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 0 deletions.
115 changes: 115 additions & 0 deletions crates/rayexec_execution/src/arrays/array/buffer_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use core::alloc;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

use rayexec_error::{Result, ResultExt};

pub trait BufferManager: Debug + Sync + Send + Sized {
// TODO: T => Spillable or something.
type CowPtr<T>: CowPtr<T>
where
T: Debug;

fn reserve_external(
self: &Arc<Self>,
size_bytes: usize,
align: usize,
) -> Result<Reservation<Self>>;

fn reserve_from_layout(self: &Arc<Self>, layout: alloc::Layout) -> Result<Reservation<Self>> {
self.reserve_external(layout.size(), layout.align())
}

fn make_cow<T: Debug>(&self, item: T) -> Result<Self::CowPtr<T>, T>;

/// Drops a memory reservation.
fn drop_reservation(&self, reservation: &Reservation<Self>);
}

#[derive(Debug)]
pub struct Reservation<B: BufferManager> {
manager: Arc<B>,
/// Size in bytes of the memory reservation.
size: usize,
/// Alignment of the memory reservation.
///
/// Used during deallocation of raw buffers.
align: usize,
}

impl<B> Reservation<B>
where
B: BufferManager,
{
fn try_new(manager: Arc<B>, size: usize, align: usize) -> Result<Self> {
alloc::Layout::from_size_align(size, align)
.context("unable to create layout for reservation")?;

Ok(Reservation {
manager,
size,
align,
})
}

pub fn manager(&self) -> &Arc<B> {
&self.manager
}

pub fn size(&self) -> usize {
self.size
}

pub fn align(&self) -> usize {
self.align
}

/// Returns an equivalent allocation layout for the reservation.
///
/// Reservations must always produce a valid allocation layout.
pub fn layout(&self) -> alloc::Layout {
alloc::Layout::from_size_align(self.size, self.align).unwrap()
}
}

// TODO: Probably rename, I don't think we want the 'cow' logic on this. Instead
// that'll probably be on ArrayData.
pub trait CowPtr<T>: Debug + Clone + AsRef<T> + Deref<Target = T> {
// TODO: Clone on write.
//
// Will need to be able to get the underlying reservation in order to track
// appropriately.
//
// Also might need to recurse to make sure everything is writable, not sure
// yet.
}

impl<T> CowPtr<T> for Arc<T> where T: Debug {}

/// Placeholder buffer manager.
#[derive(Debug, Clone)]
pub struct NopBufferManager;

impl BufferManager for NopBufferManager {
type CowPtr<T>
= Arc<T>
where
T: Debug;

fn reserve_external(
self: &Arc<Self>,
size_bytes: usize,
align: usize,
) -> Result<Reservation<Self>> {
Reservation::try_new(self.clone(), size_bytes, align)
}

fn make_cow<T: Debug>(&self, item: T) -> Result<Self::CowPtr<T>, T> {
Ok(Arc::new(item))
}

fn drop_reservation(&self, _reservation: &Reservation<Self>) {
// Ok
}
}
3 changes: 3 additions & 0 deletions crates/rayexec_execution/src/arrays/array/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod buffer_manager;

mod raw;
mod shared_or_owned;

use std::fmt::Debug;
Expand Down
160 changes: 160 additions & 0 deletions crates/rayexec_execution/src/arrays/array/raw.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#![allow(dead_code)]

use std::alloc::{self, Layout};
use std::ptr::NonNull;
use std::sync::Arc;

use rayexec_error::{Result, ResultExt};

use super::buffer_manager::{BufferManager, Reservation};

#[derive(Debug)]
pub struct RawBuffer<B: BufferManager> {
/// Memory reservation for this buffer.
pub(crate) reservation: Reservation<B>,
/// Raw pointer to start of vec.
///
/// This stores the pointer as a u8 pointer and it'll be casted to the right
/// type during array operations.
pub(crate) ptr: NonNull<u8>,
}

unsafe impl<B: BufferManager> Send for RawBuffer<B> {}
unsafe impl<B: BufferManager> Sync for RawBuffer<B> {}

impl<B> RawBuffer<B>
where
B: BufferManager,
{
/// Try to create a new buffer with a given capacity for type `T`.
pub fn try_with_capacity<T>(manager: &Arc<B>, cap: usize) -> Result<Self> {
assert!(std::mem::size_of::<T>() > 0);

if cap == 0 {
return Ok(RawBuffer {
reservation: manager.reserve_external(0, 0)?,
ptr: NonNull::dangling(),
});
}

let layout = Layout::array::<T>(cap).context("failed to create layout")?;
let ptr = unsafe { alloc::alloc(layout) };
let ptr = match NonNull::new(ptr) {
Some(ptr) => ptr,
None => alloc::handle_alloc_error(layout),
};

let reservation = manager.reserve_from_layout(layout)?;

Ok(RawBuffer { reservation, ptr })
}

pub unsafe fn as_slice<T>(&self) -> &[T] {
debug_assert_eq!(std::mem::align_of::<T>(), self.reservation.align());
debug_assert_eq!(0, self.reservation.size() % std::mem::size_of::<T>());

let cap = self.reservation.size() / std::mem::size_of::<T>();
std::slice::from_raw_parts(self.ptr.as_ptr().cast::<T>().cast_const(), cap)
}

pub unsafe fn as_slice_mut<T>(&mut self) -> &mut [T] {
debug_assert_eq!(std::mem::align_of::<T>(), self.reservation.align());
debug_assert_eq!(0, self.reservation.size() % std::mem::size_of::<T>());

let cap = self.reservation.size() / std::mem::size_of::<T>();
std::slice::from_raw_parts_mut(self.ptr.as_ptr().cast::<T>(), cap)
}

pub unsafe fn reserve<T>(&mut self, manager: &Arc<B>, additional: usize) -> Result<()> {
debug_assert_eq!(std::mem::align_of::<T>(), self.reservation.align());
debug_assert_eq!(0, self.reservation.size() % std::mem::size_of::<T>());

let cap = self.reservation.size() / std::mem::size_of::<T>();
let layout = Layout::array::<T>(cap + additional).context("failed to create layout")?;

let new_ptr = if cap == 0 {
unsafe { alloc::alloc(layout) }
} else {
let old_ptr = self.ptr.as_ptr();
let old_layout = self.reservation.layout();
unsafe { alloc::realloc(old_ptr, old_layout, layout.size()) }
};

self.ptr = match NonNull::new(new_ptr) {
Some(p) => p,
None => alloc::handle_alloc_error(layout),
};

self.reservation = manager.reserve_from_layout(layout)?;

Ok(())
}
}

impl<B> Drop for RawBuffer<B>
where
B: BufferManager,
{
fn drop(&mut self) {
if self.reservation.size() != 0 {
let layout = self.reservation.layout();
unsafe {
alloc::dealloc(self.ptr.as_ptr(), layout);
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::arrays::array::buffer_manager::NopBufferManager;

#[test]
fn new_drop() {
let b = RawBuffer::try_with_capacity::<i64>(&Arc::new(NopBufferManager), 4).unwrap();

assert_eq!(32, b.reservation.size());

std::mem::drop(b);
}

#[test]
fn as_slice_mut() {
let mut b = RawBuffer::try_with_capacity::<i64>(&Arc::new(NopBufferManager), 4).unwrap();
let s = unsafe { b.as_slice_mut::<i64>() };
assert_eq!(4, s.len());

for i in 0..4 {
s[i] = i as i64;
}

let s = unsafe { b.as_slice::<i64>() };
assert_eq!(4, s.len());
assert_eq!(&[0, 1, 2, 3], s);
}

#[test]
fn reserve() {
let mut b = RawBuffer::try_with_capacity::<i64>(&Arc::new(NopBufferManager), 4).unwrap();
let s = unsafe { b.as_slice_mut::<i64>() };
assert_eq!(4, s.len());

for i in 0..4 {
s[i] = i as i64;
}

unsafe { b.reserve::<i64>(&Arc::new(NopBufferManager), 4).unwrap() };
assert_eq!(64, b.reservation.size());

let s = unsafe { b.as_slice_mut::<i64>() };
assert_eq!(8, s.len());

for i in 0..4 {
s[i + 4] = s[i] * 2;
}

let s = unsafe { b.as_slice::<i64>() };
assert_eq!(&[0, 1, 2, 3, 0, 2, 4, 6], s);
}
}

0 comments on commit 600a7e7

Please sign in to comment.