Skip to content

Commit

Permalink
Use arrow-rs in ArrowBuffer (#8201)
Browse files Browse the repository at this point in the history
* Part of #3741
* Closes #2978
  • Loading branch information
emilk authored Nov 25, 2024
1 parent a060d06 commit 11df7bb
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 42 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5990,6 +5990,7 @@ name = "re_space_view"
version = "0.21.0-alpha.1+dev"
dependencies = [
"ahash",
"arrow",
"bytemuck",
"egui",
"glam",
Expand Down Expand Up @@ -6080,6 +6081,7 @@ version = "0.21.0-alpha.1+dev"
dependencies = [
"ahash",
"anyhow",
"arrow",
"bitflags 2.6.0",
"bytemuck",
"criterion",
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_chunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ serde = [
]

## Enable conversion to and from arrow-rs types
arrow = ["arrow2/arrow", "dep:arrow"]
arrow = ["arrow2/arrow"]


[dependencies]
Expand All @@ -49,6 +49,7 @@ re_types_core.workspace = true
# External
ahash.workspace = true
anyhow.workspace = true
arrow.workspace = true
arrow2 = { workspace = true, features = [
"compute_concatenate",
"compute_filter",
Expand All @@ -64,7 +65,6 @@ thiserror.workspace = true

# Optional dependencies:
serde = { workspace = true, optional = true, features = ["derive", "rc"] }
arrow = { workspace = true, optional = true }

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ impl Chunk {
/// * [`Self::iter_string`].
/// * [`Self::iter_component_arrays`].
/// * [`Self::iter_component`].
pub fn iter_buffer<T: arrow2::types::NativeType>(
pub fn iter_buffer<T: arrow::datatypes::ArrowNativeType + arrow2::types::NativeType>(
&self,
component_name: &ComponentName,
) -> impl Iterator<Item = Vec<ArrowBuffer<T>>> + '_ {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_types_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ anyhow.workspace = true
arrow.workspace = true
arrow2 = { workspace = true, features = [
"arrow",
"compute_concatenate",
"io_ipc",
"io_print",
"compute_concatenate",
] }
backtrace.workspace = true
bytemuck.workspace = true
Expand Down
94 changes: 58 additions & 36 deletions crates/store/re_types_core/src/arrow_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
/// Convenience-wrapper around an [`arrow2::buffer::Buffer`] that is known to contain a
use arrow::datatypes::ArrowNativeType;

/// Convenience-wrapper around an [`arrow::buffer::ScalarBuffer`] that is known to contain a
/// a primitive type.
///
/// The [`ArrowBuffer`] object is internally reference-counted and can be
/// easily converted back to a `&[T]` referencing the underlying storage.
/// This avoids some of the lifetime complexities that would otherwise
/// arise from returning a `&[T]` directly, but is significantly more
/// performant than doing the full allocation necessary to return a `Vec<T>`.
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ArrowBuffer<T>(arrow2::buffer::Buffer<T>);
#[derive(Clone, Debug, PartialEq)]
pub struct ArrowBuffer<T: ArrowNativeType>(arrow::buffer::ScalarBuffer<T>);

impl<T: ArrowNativeType> Default for ArrowBuffer<T> {
fn default() -> Self {
Self(arrow::buffer::ScalarBuffer::<T>::from(vec![]))
}
}

impl<T: crate::SizeBytes> crate::SizeBytes for ArrowBuffer<T> {
impl<T: crate::SizeBytes + ArrowNativeType> crate::SizeBytes for ArrowBuffer<T> {
#[inline]
fn heap_size_bytes(&self) -> u64 {
let Self(buf) = self;
std::mem::size_of_val(buf.as_slice()) as _
std::mem::size_of_val(self.as_slice()) as _
}
}

impl<T> ArrowBuffer<T> {
impl<T: ArrowNativeType> ArrowBuffer<T> {
/// The number of instances of T stored in this buffer.
#[inline]
pub fn num_instances(&self) -> usize {
// WARNING: If you are touching this code, make sure you know what len() actually does.
//
// There is ambiguity in how arrow2 and arrow-rs talk about buffer lengths, including
// some incorrect documentation: https://github.com/jorgecarleitao/arrow2/issues/1430
//
// Arrow2 `Buffer<T>` is typed and `len()` is the number of units of `T`, but the documentation
// is currently incorrect.
// Arrow-rs `Buffer` is untyped and len() is in bytes, but `ScalarBuffer`s are in units of T.
self.0.len()
self.as_slice().len()
}

/// The number of bytes stored in this buffer
Expand All @@ -45,7 +44,7 @@ impl<T> ArrowBuffer<T> {

#[inline]
pub fn as_slice(&self) -> &[T] {
self.0.as_slice()
self.0.as_ref()
}

/// Returns a new [`ArrowBuffer`] that is a slice of this buffer starting at `offset`.
Expand All @@ -56,19 +55,19 @@ impl<T> ArrowBuffer<T> {
/// Panics iff `offset + length` is larger than `len`.
#[inline]
pub fn sliced(self, range: std::ops::Range<usize>) -> Self {
Self(self.0.sliced(range.start, range.len()))
Self(self.0.slice(range.start, range.len()))
}
}

impl<T: bytemuck::Pod> ArrowBuffer<T> {
impl<T: bytemuck::Pod + ArrowNativeType> ArrowBuffer<T> {
/// Cast POD (plain-old-data) types to another POD type.
///
/// For instance: cast a buffer of `u8` to a buffer of `f32`.
#[inline]
pub fn cast_pod<Target: bytemuck::Pod>(
pub fn cast_pod<Target: bytemuck::Pod + ArrowNativeType>(
&self,
) -> Result<ArrowBuffer<Target>, bytemuck::PodCastError> {
// TODO(emilk): when we switch from arrow2, see if we can make this function zero-copy
// TODO(#2978): when we switch from arrow2, see if we can make this function zero-copy
re_tracing::profile_function!();
let target_slice: &[Target] = bytemuck::try_cast_slice(self.as_slice())?;
Ok(ArrowBuffer::from(target_slice.to_vec()))
Expand All @@ -84,56 +83,79 @@ impl<T: bytemuck::Pod> ArrowBuffer<T> {
}
}

impl<T: Eq> Eq for ArrowBuffer<T> {}
impl<T: Eq + ArrowNativeType> Eq for ArrowBuffer<T> {}

impl<T: Clone> ArrowBuffer<T> {
impl<T: ArrowNativeType> ArrowBuffer<T> {
#[inline]
pub fn to_vec(&self) -> Vec<T> {
self.0.as_slice().to_vec()
self.as_slice().to_vec()
}
}

impl<T: arrow::datatypes::ArrowNativeType + arrow2::types::NativeType>
From<arrow::buffer::ScalarBuffer<T>> for ArrowBuffer<T>
impl<T: ArrowNativeType + arrow2::types::NativeType> From<arrow::buffer::ScalarBuffer<T>>
for ArrowBuffer<T>
{
#[inline]
fn from(value: arrow::buffer::ScalarBuffer<T>) -> Self {
Self(value.into_inner().into())
}
}

impl<T> From<arrow2::buffer::Buffer<T>> for ArrowBuffer<T> {
impl<T: ArrowNativeType + arrow2::types::NativeType> From<arrow2::buffer::Buffer<T>>
for ArrowBuffer<T>
{
#[inline]
fn from(value: arrow2::buffer::Buffer<T>) -> Self {
Self(value)
fn from(arrow2_buffer: arrow2::buffer::Buffer<T>) -> Self {
let num_elements = arrow2_buffer.len();
let arrow1_buffer = arrow::buffer::Buffer::from(arrow2_buffer);
let scalar_buffer = arrow::buffer::ScalarBuffer::new(arrow1_buffer, 0, num_elements);
Self(scalar_buffer)
}
}

impl<T> From<Vec<T>> for ArrowBuffer<T> {
impl<T: ArrowNativeType> From<Vec<T>> for ArrowBuffer<T> {
#[inline]
fn from(value: Vec<T>) -> Self {
Self(value.into())
}
}

impl<T: Clone> From<&[T]> for ArrowBuffer<T> {
impl<T: ArrowNativeType> From<&[T]> for ArrowBuffer<T> {
#[inline]
fn from(value: &[T]) -> Self {
Self(value.iter().cloned().collect()) // TODO(emilk): avoid extra clones
Self(value.iter().copied().collect())
}
}

impl<T> FromIterator<T> for ArrowBuffer<T> {
impl<T: ArrowNativeType> FromIterator<T> for ArrowBuffer<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self(arrow2::buffer::Buffer::from_iter(iter))
Self(arrow::buffer::ScalarBuffer::from_iter(iter))
}
}

impl<T> std::ops::Deref for ArrowBuffer<T> {
impl<T: ArrowNativeType> std::ops::Deref for ArrowBuffer<T> {
type Target = [T];

#[inline]
fn deref(&self) -> &[T] {
self.0.as_slice()
self.as_slice()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_arrow2_compatibility() {
let arrow2_buffer = arrow2::buffer::Buffer::<f32>::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
assert_eq!(arrow2_buffer.as_slice(), &[1.0, 2.0, 3.0, 4.0, 5.0]);

let sliced_arrow2_buffer = arrow2_buffer.sliced(1, 3);
assert_eq!(sliced_arrow2_buffer.as_slice(), &[2.0, 3.0, 4.0]);

let arrow_buffer = ArrowBuffer::<f32>::from(sliced_arrow2_buffer);
assert_eq!(arrow_buffer.num_instances(), 3);
assert_eq!(arrow_buffer.as_slice(), &[2.0, 3.0, 4.0]);
}
}
1 change: 1 addition & 0 deletions crates/viewer/re_space_view/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ re_viewer_context.workspace = true
re_viewport_blueprint.workspace = true

ahash.workspace = true
arrow.workspace = true
bytemuck.workspace = true
egui.workspace = true
glam.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/viewer/re_space_view/src/results_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ impl<'a> HybridResultsChunkIter<'a> {
/// Iterate as indexed buffers.
///
/// See [`Chunk::iter_buffer`] for more information.
pub fn buffer<T: arrow2::types::NativeType>(
pub fn buffer<T: arrow::datatypes::ArrowNativeType + arrow2::types::NativeType>(
&'a self,
) -> impl Iterator<Item = ((TimeInt, RowId), Vec<re_types_core::ArrowBuffer<T>>)> + 'a {
self.chunks.iter().flat_map(|chunk| {
Expand Down
1 change: 1 addition & 0 deletions crates/viewer/re_space_view_spatial/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ re_video.workspace = true
re_viewer_context.workspace = true
re_viewport_blueprint.workspace = true

arrow.workspace = true
arrow2.workspace = true
ahash.workspace = true
anyhow.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ pub fn iter_string<'a>(
///
/// See [`Chunk::iter_buffer`] for more information.
#[allow(unused)]
pub fn iter_buffer<'a, T: arrow2::types::NativeType>(
pub fn iter_buffer<'a, T: arrow::datatypes::ArrowNativeType + arrow2::types::NativeType>(
chunks: &'a std::borrow::Cow<'a, [Chunk]>,
timeline: Timeline,
component_name: ComponentName,
Expand Down

0 comments on commit 11df7bb

Please sign in to comment.