Skip to content

Commit

Permalink
some string funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Dec 30, 2024
1 parent c6a4110 commit 9178c78
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 156 deletions.
4 changes: 4 additions & 0 deletions crates/rayexec_execution/src/arrays/array/exp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ where
&self.data
}

pub fn data_mut(&mut self) -> &mut ArrayData<B> {
&mut self.data
}

pub fn validity(&self) -> &Validity {
&self.validity
}
Expand Down
43 changes: 43 additions & 0 deletions crates/rayexec_execution/src/arrays/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ pub mod string_view;
mod raw;

use buffer_manager::{BufferManager, NopBufferManager};
use fmtutil::IntoDisplayableSlice;
use physical_type::{PhysicalStorage, PhysicalType};
use raw::RawBufferParts;
use rayexec_error::{RayexecError, Result};
use string_view::{
BinaryViewAddressable,
BinaryViewAddressableMut,
StringViewAddressable,
StringViewAddressableMut,
StringViewHeap,
Expand Down Expand Up @@ -110,6 +113,34 @@ where
Ok(StringViewAddressableMut { metadata, heap })
}

pub fn try_as_binary_view_addressable(&self) -> Result<BinaryViewAddressable> {
self.check_type_one_of(&[PhysicalType::Utf8, PhysicalType::Binary])?;

let metadata = unsafe { self.primary.as_slice::<StringViewMetadataUnion>() };
let heap = match self.secondary.as_ref() {
SecondaryBuffer::StringViewHeap(heap) => heap,
_ => return Err(RayexecError::new("Missing string heap")),
};

Ok(BinaryViewAddressable { metadata, heap })
}

pub fn try_as_binary_view_addressable_mut(&mut self) -> Result<BinaryViewAddressableMut> {
// Note that unlike the non-mut version of this function, we only allow
// physical binary types here. For reads, treating strings as binary is
// completely fine, but allowing writing raw binary to a logical string
// array could lead to invalid utf8.
self.check_type(PhysicalType::Binary)?;

let metadata = unsafe { self.primary.as_slice_mut::<StringViewMetadataUnion>() };
let heap = match self.secondary.as_mut() {
SecondaryBuffer::StringViewHeap(heap) => heap,
_ => return Err(RayexecError::new("Missing string heap")),
};

Ok(BinaryViewAddressableMut { metadata, heap })
}

fn check_type(&self, want: PhysicalType) -> Result<()> {
if want != self.physical_type {
return Err(RayexecError::new("Physical types don't match")
Expand All @@ -119,6 +150,18 @@ where

Ok(())
}

fn check_type_one_of(&self, oneof: &[PhysicalType]) -> Result<()> {
if !oneof.contains(&self.physical_type) {
return Err(
RayexecError::new("Physical type not one of requested types")
.with_field("have", self.physical_type)
.with_field("oneof", oneof.display_as_list().to_string()),
);
}

Ok(())
}
}

impl<B: BufferManager> Drop for ArrayBuffer<B> {
Expand Down
28 changes: 28 additions & 0 deletions crates/rayexec_execution/src/arrays/buffer/physical_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use rayexec_error::Result;

use super::buffer_manager::BufferManager;
use super::string_view::{
BinaryViewAddressable,
BinaryViewAddressableMut,
StringViewAddressable,
StringViewAddressableMut,
StringViewMetadataUnion,
Expand Down Expand Up @@ -273,6 +275,32 @@ impl MutablePhysicalStorage for PhysicalUtf8 {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PhysicalBinary;

impl PhysicalStorage for PhysicalBinary {
const PHYSICAL_TYPE: PhysicalType = PhysicalType::Binary;

type PrimaryBufferType = StringViewMetadataUnion;
type StorageType = [u8];

type Addressable<'a> = BinaryViewAddressable<'a>;

fn get_addressable<B: BufferManager>(buffer: &ArrayBuffer<B>) -> Result<Self::Addressable<'_>> {
buffer.try_as_binary_view_addressable()
}
}

impl MutablePhysicalStorage for PhysicalBinary {
type AddressableMut<'a> = BinaryViewAddressableMut<'a>;

fn get_addressable_mut<B: BufferManager>(
buffer: &mut ArrayBuffer<B>,
) -> Result<Self::AddressableMut<'_>> {
buffer.try_as_binary_view_addressable_mut()
}
}

/// Dictionary arrays have the selection vector as the primary data buffer.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PhysicalDictionary;
Expand Down
43 changes: 43 additions & 0 deletions crates/rayexec_execution/src/arrays/buffer/string_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,49 @@ impl<'a> AddressableMut for StringViewAddressableMut<'a> {
}
}

#[derive(Debug)]
pub struct BinaryViewAddressable<'a> {
pub(crate) metadata: &'a [StringViewMetadataUnion],
pub(crate) heap: &'a StringViewHeap,
}

impl<'a> Addressable for BinaryViewAddressable<'a> {
type T = [u8];

fn len(&self) -> usize {
self.metadata.len()
}

fn get(&self, idx: usize) -> Option<&Self::T> {
let m = self.metadata.get(idx)?;
self.heap.get(m)
}
}

#[derive(Debug)]
pub struct BinaryViewAddressableMut<'a> {
pub(crate) metadata: &'a mut [StringViewMetadataUnion],
pub(crate) heap: &'a mut StringViewHeap,
}

impl<'a> AddressableMut for BinaryViewAddressableMut<'a> {
type T = [u8];

fn len(&self) -> usize {
self.metadata.len()
}

fn get_mut(&mut self, idx: usize) -> Option<&mut Self::T> {
let m = self.metadata.get_mut(idx)?;
self.heap.get_mut(m)
}

fn put(&mut self, idx: usize, val: &Self::T) {
let new_m = self.heap.push_bytes(val);
self.metadata[idx] = new_m;
}
}

/// Metadata for small (<= 12 bytes) varlen data.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(C)]
Expand Down
10 changes: 5 additions & 5 deletions crates/rayexec_execution/src/arrays/executor/scalar/uniform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::arrays::selection;
use crate::arrays::storage::AddressableStorage;

#[derive(Debug, Clone, Copy)]
pub struct UniformExecutor;
pub struct UniformExecutor2;

impl UniformExecutor {
impl UniformExecutor2 {
pub fn execute<'a, S, B, Op>(
arrays: &[&'a Array2],
builder: ArrayBuilder<B>,
Expand Down Expand Up @@ -127,7 +127,7 @@ mod tests {

let mut string_buffer = String::new();

let got = UniformExecutor::execute::<PhysicalUtf8_2, _, _>(
let got = UniformExecutor2::execute::<PhysicalUtf8_2, _, _>(
&[&first, &second, &third],
builder,
|inputs, buf| {
Expand Down Expand Up @@ -162,7 +162,7 @@ mod tests {

let mut string_buffer = String::new();

let got = UniformExecutor::execute::<PhysicalUtf8_2, _, _>(
let got = UniformExecutor2::execute::<PhysicalUtf8_2, _, _>(
&[&first, &second, &third],
builder,
|inputs, buf| {
Expand Down Expand Up @@ -195,7 +195,7 @@ mod tests {

let mut string_buffer = String::new();

let got = UniformExecutor::execute::<PhysicalUtf8_2, _, _>(
let got = UniformExecutor2::execute::<PhysicalUtf8_2, _, _>(
&[&first, &second, &third],
builder,
|inputs, buf| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::arrays::bitmap::Bitmap;
use crate::arrays::datatype::{DataType, DataTypeId};
use crate::arrays::executor::builder::{ArrayBuilder, BooleanBuffer};
use crate::arrays::executor::physical_type::PhysicalBool_2;
use crate::arrays::executor::scalar::{BinaryExecutor2, TernaryExecutor, UniformExecutor};
use crate::arrays::executor::scalar::{BinaryExecutor2, TernaryExecutor, UniformExecutor2};
use crate::arrays::storage::BooleanStorage;
use crate::expr::Expression;
use crate::functions::documentation::{Category, Documentation, Example};
Expand Down Expand Up @@ -111,7 +111,7 @@ impl ScalarFunctionImpl for AndImpl {
}
_ => {
let len = inputs[0].logical_len();
UniformExecutor::execute::<PhysicalBool_2, _, _>(
UniformExecutor2::execute::<PhysicalBool_2, _, _>(
inputs,
ArrayBuilder {
datatype: DataType::Boolean,
Expand Down Expand Up @@ -204,7 +204,7 @@ impl ScalarFunctionImpl for OrImpl {
}
_ => {
let len = inputs[0].logical_len();
UniformExecutor::execute::<PhysicalBool_2, _, _>(
UniformExecutor2::execute::<PhysicalBool_2, _, _>(
inputs,
ArrayBuilder {
datatype: DataType::Boolean,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use rayexec_error::Result;

use crate::arrays::array::Array2;
use crate::arrays::array::exp::Array;
use crate::arrays::batch_exp::Batch;
use crate::arrays::buffer::physical_type::{AddressableMut, PhysicalUtf8};
use crate::arrays::datatype::{DataType, DataTypeId};
use crate::arrays::executor::builder::{ArrayBuilder, GermanVarlenBuffer};
use crate::arrays::executor::physical_type::PhysicalUtf8_2;
use crate::arrays::executor::scalar::{BinaryExecutor2, UniformExecutor};
use crate::arrays::executor_exp::scalar::binary::BinaryExecutor;
use crate::arrays::executor_exp::scalar::unary::UnaryExecutor;
use crate::arrays::executor_exp::scalar::uniform::UniformExecutor;
use crate::arrays::executor_exp::OutBuffer;
use crate::expr::Expression;
use crate::functions::documentation::{Category, Documentation, Example};
use crate::functions::scalar::{PlannedScalarFunction, ScalarFunction, ScalarFunctionImpl};
Expand Down Expand Up @@ -68,55 +71,70 @@ impl ScalarFunction for Concat {
pub struct StringConcatImpl;

impl ScalarFunctionImpl for StringConcatImpl {
fn execute2(&self, inputs: &[&Array2]) -> Result<Array2> {
match inputs.len() {
fn execute(&self, input: &Batch, output: &mut Array) -> Result<()> {
let sel = input.selection();

match input.arrays().len() {
0 => {
let mut array = Array2::from_iter([""]);
array.set_physical_validity(0, false);
Ok(array)
// TODO: Zero args should actually error during planning.
// Currently this just sets everything to an empty string.
let mut addressable = output
.data_mut()
.try_as_mut()?
.try_as_string_view_addressable_mut()?;

for idx in 0..addressable.len() {
addressable.put(idx, "");
}
}
1 => Ok(inputs[0].clone()),
2 => {
let a = inputs[0];
let b = inputs[1];
1 => {
let input = &input.arrays()[0];

let mut string_buf = String::new();
UnaryExecutor::execute::<PhysicalUtf8, PhysicalUtf8, _>(
input,
sel,
OutBuffer::from_array(output)?,
|s, buf| buf.put(s),
)?;
}
2 => {
let a = &input.arrays()[0];
let b = &input.arrays()[0];

// TODO: Compute data capacity.
let mut str_buf = String::new();

BinaryExecutor2::execute::<PhysicalUtf8_2, PhysicalUtf8_2, _, _>(
BinaryExecutor::execute::<PhysicalUtf8, PhysicalUtf8, PhysicalUtf8, _>(
a,
sel,
b,
ArrayBuilder {
datatype: DataType::Utf8,
buffer: GermanVarlenBuffer::with_len(a.logical_len()),
},
|a, b, buf| {
string_buf.clear();
string_buf.push_str(a);
string_buf.push_str(b);
buf.put(string_buf.as_str());
sel,
OutBuffer::from_array(output)?,
|s1, s2, buf| {
str_buf.clear();
str_buf.push_str(s1);
str_buf.push_str(s2);
buf.put(&str_buf);
},
)
)?;
}
_ => {
let mut string_buf = String::new();
let mut str_buf = String::new();

UniformExecutor::execute::<PhysicalUtf8_2, _, _>(
inputs,
ArrayBuilder {
datatype: DataType::Utf8,
buffer: GermanVarlenBuffer::with_len(inputs[0].logical_len()),
},
|strings, buf| {
string_buf.clear();
for s in strings {
string_buf.push_str(s);
UniformExecutor::execute::<PhysicalUtf8, PhysicalUtf8, _>(
input.arrays(),
sel,
OutBuffer::from_array(output)?,
|ss, buf| {
str_buf.clear();
for s in ss {
str_buf.push_str(s);
}
buf.put(string_buf.as_str());
buf.put(&str_buf);
},
)
)?;
}
}

Ok(())
}
}
Loading

0 comments on commit 9178c78

Please sign in to comment.