Skip to content

Commit

Permalink
chore: Add next field for array internals refactor (#3393)
Browse files Browse the repository at this point in the history
Split out from #3382
  • Loading branch information
scsmithr authored Jan 9, 2025
1 parent 9608bd5 commit 7c54ac2
Show file tree
Hide file tree
Showing 110 changed files with 2,375 additions and 1,115 deletions.
4 changes: 2 additions & 2 deletions crates/docgen/src/markdown_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn write_markdown_table<'a>(

for batch in batches {
for row in 0..batch.num_rows() {
for (idx, column) in batch.columns().iter().enumerate() {
for (idx, column) in batch.arrays().iter().enumerate() {
if idx == 0 {
write!(output, "|")?;
}
Expand All @@ -62,7 +62,7 @@ mod tests {

#[test]
fn simple() {
let batch = Batch::try_new([
let batch = Batch::try_from_arrays([
Array::from_iter([1, 2, 3]),
Array::from_iter(["cat", "dog", "mouse"]),
])
Expand Down
6 changes: 3 additions & 3 deletions crates/rayexec_csv/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use rayexec_error::{RayexecError, Result};
use rayexec_execution::arrays::array::{Array, ArrayData};
use rayexec_execution::arrays::array::{Array, ArrayData2};
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::bitmap::Bitmap;
use rayexec_execution::arrays::compute::cast::parse::{
Expand Down Expand Up @@ -483,7 +483,7 @@ impl AsyncCsvStream {
arrs.push(arr);
}

Batch::try_new(arrs)
Batch::try_from_arrays(arrs)
}

fn build_boolean(
Expand Down Expand Up @@ -524,7 +524,7 @@ impl AsyncCsvStream {
where
T: Default,
P: Parser<Type = T>,
PrimitiveStorage<T>: Into<ArrayData>,
PrimitiveStorage<T>: Into<ArrayData2>,
{
let mut values = Vec::with_capacity(completed.num_completed());
let mut validity = Bitmap::with_capacity(completed.num_completed());
Expand Down
2 changes: 1 addition & 1 deletion crates/rayexec_csv/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl CsvEncoder {
for row in 0..batch.num_rows() {
self.record.clear();

for col in batch.columns() {
for col in batch.arrays() {
let scalar = FORMATTER
.format_array_value(col, row)
.expect("row to exist");
Expand Down
54 changes: 32 additions & 22 deletions crates/rayexec_execution/src/arrays/array/array_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::marker::PhantomData;
use std::sync::Arc;

use fmtutil::IntoDisplayableSlice;
use rayexec_error::{RayexecError, Result};

use super::array_data::ArrayData;
use super::buffer_manager::BufferManager;
use super::physical_type::{PhysicalStorage, PhysicalType};
use super::raw::RawBuffer;
Expand All @@ -15,6 +15,8 @@ use super::string_view::{
StringViewHeap,
StringViewMetadataUnion,
};
use super::validity::Validity;
use super::Array;

/// Buffer for arrays.
///
Expand Down Expand Up @@ -43,7 +45,6 @@ where
/// buffer.
///
/// The secondary buffer will be initialized to None.
#[allow(dead_code)]
pub(crate) fn with_primary_capacity<S: PhysicalStorage>(
manager: &Arc<B>,
capacity: usize,
Expand All @@ -62,6 +63,14 @@ where
self.secondary = Box::new(secondary)
}

pub const fn physical_type(&self) -> PhysicalType {
self.physical_type
}

pub fn primary_capacity(&self) -> usize {
self.primary.reservation.size() / self.physical_type.primary_buffer_mem_size()
}

pub fn get_secondary(&self) -> &SecondaryBuffer<B> {
&self.secondary
}
Expand Down Expand Up @@ -209,19 +218,18 @@ where

#[derive(Debug)]
pub struct DictionaryBuffer<B: BufferManager> {
// pub(crate) validity: Validity,
// pub(crate) buffer: ArrayData<B>,
_b: PhantomData<B>,
pub(crate) validity: Validity,
pub(crate) buffer: ArrayData<B>,
}

impl<B> DictionaryBuffer<B>
where
B: BufferManager,
{
// pub fn new(buffer: ArrayData<B>, validity: Validity) -> Self {
// debug_assert_eq!(buffer.capacity(), validity.len());
// DictionaryBuffer { buffer, validity }
// }
pub fn new(buffer: ArrayData<B>, validity: Validity) -> Self {
debug_assert_eq!(buffer.primary_capacity(), validity.len());
DictionaryBuffer { buffer, validity }
}
}

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
Expand All @@ -232,25 +240,25 @@ pub struct ListItemMetadata {

#[derive(Debug)]
pub struct ListBuffer<B: BufferManager> {
// /// Number of "filled" entries in the child array.
// ///
// /// This differs from the child's capacity as we need to be able
// /// incrementally push back values.
// ///
// /// This is only looked at when writing values to the child array. Reads can
// /// ignore this as all required info is in the entry metadata.
// pub(crate) entries: usize,
// pub(crate) child: Array<B>,
_b: PhantomData<B>,
/// Number of "filled" entries in the child array.
///
/// This differs from the child's capacity as we need to be able
/// incrementally push back values.
///
/// This is only looked at when writing values to the child array. Reads can
/// ignore this as all required info is in the entry metadata.
#[allow(dead_code)]
pub(crate) entries: usize,
pub(crate) child: Array<B>,
}

impl<B> ListBuffer<B>
where
B: BufferManager,
{
// pub fn new(child: Array<B>) -> Self {
// ListBuffer { entries: 0, child }
// }
pub fn new(child: Array<B>) -> Self {
ListBuffer { entries: 0, child }
}
}

#[cfg(test)]
Expand All @@ -270,11 +278,13 @@ mod tests {
let mut buffer =
ArrayBuffer::with_primary_capacity::<PhysicalI32>(&Arc::new(NopBufferManager), 4)
.unwrap();
assert_eq!(4, buffer.primary_capacity());

let s = buffer.try_as_slice::<PhysicalI32>().unwrap();
assert_eq!(4, s.len());

buffer.reserve_primary::<PhysicalI32>(8).unwrap();
assert_eq!(8, buffer.primary_capacity());

let s = buffer.try_as_slice_mut::<PhysicalI32>().unwrap();
assert_eq!(8, s.len());
Expand Down
6 changes: 3 additions & 3 deletions crates/rayexec_execution/src/arrays/array/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ where
})
}

pub fn manager(&self) -> &Arc<B> {
pub const fn manager(&self) -> &Arc<B> {
&self.manager
}

pub fn size(&self) -> usize {
pub const fn size(&self) -> usize {
self.size
}

pub fn align(&self) -> usize {
pub const fn align(&self) -> usize {
self.align
}

Expand Down
56 changes: 56 additions & 0 deletions crates/rayexec_execution/src/arrays/array/flat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use rayexec_error::{RayexecError, Result};

use super::array_buffer::{ArrayBuffer, SecondaryBuffer};
use super::buffer_manager::{BufferManager, NopBufferManager};
use super::physical_type::PhysicalDictionary;
use super::selection::Selection;
use super::validity::Validity;
use super::Array;

/// A view on top of normal arrays flattening some parts of the nested
/// structure.
#[derive(Debug)]
pub struct FlatArrayView<'a, B: BufferManager = NopBufferManager> {
pub(crate) validity: &'a Validity,
pub(crate) array_buffer: &'a ArrayBuffer<B>,
pub(crate) selection: Selection<'a>,
}

impl<'a, B> FlatArrayView<'a, B>
where
B: BufferManager,
{
pub fn from_array(array: &'a Array<B>) -> Result<Self> {
let data = &array.next.as_ref().unwrap().data;
if array.is_dictionary() {
let selection = data.try_as_slice::<PhysicalDictionary>()?;

let dict_buffer = match data.get_secondary() {
SecondaryBuffer::Dictionary(dict) => dict,
_ => {
return Err(RayexecError::new(
"Secondary buffer not a dictionary buffer",
))
}
};

Ok(FlatArrayView {
validity: &dict_buffer.validity,
array_buffer: &dict_buffer.buffer,
selection: Selection::slice(selection),
})
} else {
let validity = &array.next.as_ref().unwrap().validity;

Ok(FlatArrayView {
validity,
array_buffer: data,
selection: Selection::linear(array.capacity()),
})
}
}

pub fn logical_len(&self) -> usize {
self.selection.len()
}
}
Loading

0 comments on commit 7c54ac2

Please sign in to comment.