diff --git a/Cargo.lock b/Cargo.lock index 821ba67e5e1b..ab7c80f9c8bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5668,6 +5668,7 @@ version = "0.22.0-alpha.1+dev" dependencies = [ "ahash", "anyhow", + "arrow", "criterion", "document-features", "indent", @@ -6627,6 +6628,7 @@ dependencies = [ "itertools 0.13.0", "re_chunk_store", "re_dataframe", + "re_error", "re_format", "re_log", "re_log_types", diff --git a/crates/store/re_chunk_store/Cargo.toml b/crates/store/re_chunk_store/Cargo.toml index 1929c5c73989..5feff61a7562 100644 --- a/crates/store/re_chunk_store/Cargo.toml +++ b/crates/store/re_chunk_store/Cargo.toml @@ -41,6 +41,7 @@ re_types_core.workspace = true # External dependencies: ahash.workspace = true anyhow.workspace = true +arrow.workspace = true arrow2 = { workspace = true, features = ["compute_concatenate"] } document-features.workspace = true indent.workspace = true diff --git a/crates/store/re_chunk_store/src/lib.rs b/crates/store/re_chunk_store/src/lib.rs index 645d6c7e9e84..f39aea1e134f 100644 --- a/crates/store/re_chunk_store/src/lib.rs +++ b/crates/store/re_chunk_store/src/lib.rs @@ -56,6 +56,7 @@ pub use re_chunk::{ pub use re_log_types::{ResolvedTimeRange, TimeInt, TimeType, Timeline}; pub mod external { + pub use arrow; pub use arrow2; pub use re_chunk; diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 76653d05eb70..b67c56b531b8 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -19,7 +19,8 @@ use itertools::Itertools; use nohash_hasher::{IntMap, IntSet}; use re_chunk::{ - Chunk, ComponentName, EntityPath, RangeQuery, RowId, TimeInt, Timeline, UnitChunkShared, + external::arrow::array::ArrayRef, Chunk, ComponentName, EntityPath, RangeQuery, RowId, TimeInt, + Timeline, UnitChunkShared, }; use re_chunk_store::{ ChunkStore, ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, @@ -794,7 +795,39 @@ impl QueryHandle { /// } /// ``` #[inline] - pub fn next_row(&self) -> Option>> { + pub fn next_row(&self) -> Option> { + self.engine + .with(|store, cache| self._next_row(store, cache)) + .map(|vec| vec.into_iter().map(|a| a.into()).collect()) + } + + /// Returns the next row's worth of data. + /// + /// The returned vector of Arrow arrays strictly follows the schema specified by [`Self::schema`]. + /// Columns that do not yield any data will still be present in the results, filled with null values. + /// + /// Each cell in the result corresponds to the latest _locally_ known value at that particular point in + /// the index, for each respective `ColumnDescriptor`. + /// See [`QueryExpression::sparse_fill_strategy`] to go beyond local resolution. + /// + /// Example: + /// ```ignore + /// while let Some(row) = query_handle.next_row() { + /// // … + /// } + /// ``` + /// + /// ## Pagination + /// + /// Use [`Self::seek_to_row`]: + /// ```ignore + /// query_handle.seek_to_row(42); + /// for row in query_handle.into_iter().take(len) { + /// // … + /// } + /// ``` + #[inline] + fn next_row_arrow2(&self) -> Option>> { self.engine .with(|store, cache| self._next_row(store, cache)) } @@ -1239,7 +1272,7 @@ impl QueryHandle { pub fn next_row_batch(&self) -> Option { Some(RecordBatch { schema: self.schema().clone(), - data: Arrow2Chunk::new(self.next_row()?), + data: Arrow2Chunk::new(self.next_row_arrow2()?), }) } @@ -1266,13 +1299,13 @@ impl QueryHandle { /// Returns an iterator backed by [`Self::next_row`]. #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work pub fn iter(&self) -> impl Iterator>> + '_ { - std::iter::from_fn(move || self.next_row()) + std::iter::from_fn(move || self.next_row_arrow2()) } /// Returns an iterator backed by [`Self::next_row`]. #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work pub fn into_iter(self) -> impl Iterator>> { - std::iter::from_fn(move || self.next_row()) + std::iter::from_fn(move || self.next_row_arrow2()) } /// Returns an iterator backed by [`Self::next_row_batch`]. diff --git a/crates/utils/re_memory/src/accounting_allocator.rs b/crates/utils/re_memory/src/accounting_allocator.rs index ff2013293a95..b1edc406e574 100644 --- a/crates/utils/re_memory/src/accounting_allocator.rs +++ b/crates/utils/re_memory/src/accounting_allocator.rs @@ -11,10 +11,10 @@ use crate::{ }; /// Only track allocations of at least this size. -const SMALL_SIZE: usize = 128; // TODO(emilk): make this setable by users +const SMALL_SIZE: usize = 128; // TODO(emilk): make this settable by users /// Allocations smaller than are stochastically sampled. -const MEDIUM_SIZE: usize = 4 * 1024; // TODO(emilk): make this setable by users +const MEDIUM_SIZE: usize = 4 * 1024; // TODO(emilk): make this settable by users // TODO(emilk): yet another tier would maybe make sense, with a different stochastic rate. diff --git a/crates/viewer/re_view_dataframe/Cargo.toml b/crates/viewer/re_view_dataframe/Cargo.toml index 425b6bd42e62..ab6613059ce6 100644 --- a/crates/viewer/re_view_dataframe/Cargo.toml +++ b/crates/viewer/re_view_dataframe/Cargo.toml @@ -21,6 +21,7 @@ all-features = true [dependencies] re_chunk_store.workspace = true re_dataframe.workspace = true +re_error.workspace = true re_format.workspace = true re_log_types.workspace = true re_log.workspace = true diff --git a/crates/viewer/re_view_dataframe/src/dataframe_ui.rs b/crates/viewer/re_view_dataframe/src/dataframe_ui.rs index 056e131d3f03..c8f5833347d8 100644 --- a/crates/viewer/re_view_dataframe/src/dataframe_ui.rs +++ b/crates/viewer/re_view_dataframe/src/dataframe_ui.rs @@ -2,10 +2,10 @@ use std::collections::BTreeMap; use std::ops::Range; use anyhow::Context; +use arrow::array::ArrayRef; use egui::NumExt as _; use itertools::Itertools; -use re_chunk_store::external::re_chunk::Arrow2Array; use re_chunk_store::{ColumnDescriptor, LatestAtQuery}; use re_dataframe::external::re_query::StorageEngineArcReadGuard; use re_dataframe::QueryHandle; @@ -142,7 +142,7 @@ struct RowsDisplayData { impl RowsDisplayData { fn try_new( row_indices: &Range, - row_data: Vec>>, + row_data: Vec>, selected_columns: &[ColumnDescriptor], query_timeline: &Timeline, ) -> Result { @@ -360,7 +360,10 @@ impl egui_table::TableDelegate for DataframeTableDelegate<'_> { let display_data = match &self.display_data { Ok(display_data) => display_data, Err(err) => { - error_ui(ui, format!("Error with display data: {err}")); + error_ui( + ui, + format!("Error with display data: {}", re_error::format(err)), + ); return; } }; diff --git a/crates/viewer/re_view_dataframe/src/display_record_batch.rs b/crates/viewer/re_view_dataframe/src/display_record_batch.rs index 6260f74cb7e0..671326e0a412 100644 --- a/crates/viewer/re_view_dataframe/src/display_record_batch.rs +++ b/crates/viewer/re_view_dataframe/src/display_record_batch.rs @@ -1,19 +1,18 @@ //! Intermediate data structures to make `re_datastore`'s row data more amenable to displaying in a //! table. -use thiserror::Error; - -use re_chunk_store::external::arrow2::{ +use arrow::{ array::{ - Array as Arrow2Array, DictionaryArray as Arrow2DictionaryArray, - ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray, + Array as ArrowArray, ArrayRef as ArrowArrayRef, + Int32DictionaryArray as ArrowInt32DictionaryArray, Int64Array as ArrowInt64Array, + ListArray as ArrowListArray, TimestampNanosecondArray as ArrowTimestampNanosecondArray, }, - datatypes::DataType, - datatypes::DataType as Arrow2DataType, + datatypes::DataType as ArrowDataType, }; +use thiserror::Error; + use re_chunk_store::{ColumnDescriptor, ComponentColumnDescriptor, LatestAtQuery}; use re_log_types::{EntityPath, TimeInt, Timeline}; -use re_types::external::arrow2::datatypes::IntegerType; use re_types_core::ComponentName; use re_ui::UiExt; use re_viewer_context::{UiLayout, ViewerContext}; @@ -21,10 +20,10 @@ use re_viewer_context::{UiLayout, ViewerContext}; #[derive(Error, Debug)] pub(crate) enum DisplayRecordBatchError { #[error("Unexpected column data type for timeline '{0}': {1:?}")] - UnexpectedTimeColumnDataType(String, Arrow2DataType), + UnexpectedTimeColumnDataType(String, ArrowDataType), #[error("Unexpected column data type for component '{0}': {1:?}")] - UnexpectedComponentColumnDataType(String, Arrow2DataType), + UnexpectedComponentColumnDataType(String, ArrowDataType), } /// A single column of component data. @@ -33,38 +32,37 @@ pub(crate) enum DisplayRecordBatchError { #[derive(Debug)] pub(crate) enum ComponentData { Null, - ListArray(Arrow2ListArray), + ListArray(ArrowListArray), DictionaryArray { - dict: Arrow2DictionaryArray, - values: Arrow2ListArray, + dict: ArrowInt32DictionaryArray, + values: ArrowListArray, }, } impl ComponentData { - #[allow(clippy::borrowed_box)] // https://github.com/rust-lang/rust-clippy/issues/11940 fn try_new( descriptor: &ComponentColumnDescriptor, - column_data: &Box, + column_data: &ArrowArrayRef, ) -> Result { match column_data.data_type() { - DataType::Null => Ok(Self::Null), - DataType::List(_) => Ok(Self::ListArray( + ArrowDataType::Null => Ok(Self::Null), + ArrowDataType::List(_) => Ok(Self::ListArray( column_data .as_any() - .downcast_ref::>() + .downcast_ref::() .expect("`data_type` checked, failure is a bug in re_dataframe") .clone(), )), - DataType::Dictionary(IntegerType::Int32, _, _) => { + ArrowDataType::Dictionary(_, _) => { let dict = column_data .as_any() - .downcast_ref::>() + .downcast_ref::() .expect("`data_type` checked, failure is a bug in re_dataframe") .clone(); let values = dict .values() .as_any() - .downcast_ref::>() + .downcast_ref::() .expect("`data_type` checked, failure is a bug in re_dataframe") .clone(); Ok(Self::DictionaryArray { dict, values }) @@ -90,8 +88,8 @@ impl ComponentData { } } Self::DictionaryArray { dict, values } => { - if dict.is_valid(row_index) { - values.value(dict.key_value(row_index)).len() as u64 + if let Some(key) = dict.key(row_index) { + values.value(key).len() as u64 } else { 0 } @@ -131,22 +129,20 @@ impl ComponentData { Self::ListArray(list_array) => list_array .is_valid(row_index) .then(|| list_array.value(row_index)), - Self::DictionaryArray { dict, values } => dict - .is_valid(row_index) - .then(|| values.value(dict.key_value(row_index))), + Self::DictionaryArray { dict, values } => { + dict.key(row_index).map(|key| values.value(key)) + } }; if let Some(data) = data { let data_to_display = if let Some(instance_index) = instance_index { // Panics if the instance index is out of bound. This is checked in // `DisplayColumn::data_ui`. - data.sliced(instance_index as usize, 1) + data.slice(instance_index as usize, 1) } else { data }; - let data_to_display: arrow::array::ArrayRef = data_to_display.into(); - ctx.component_ui_registry.ui_raw( ctx, ui, @@ -169,7 +165,7 @@ impl ComponentData { pub(crate) enum DisplayColumn { Timeline { timeline: Timeline, - time_data: Arrow2PrimitiveArray, + time_data: ArrowInt64Array, }, Component { entity_path: EntityPath, @@ -179,26 +175,47 @@ pub(crate) enum DisplayColumn { } impl DisplayColumn { - #[allow(clippy::borrowed_box)] // https://github.com/rust-lang/rust-clippy/issues/11940 fn try_new( column_descriptor: &ColumnDescriptor, - column_data: &Box, + column_data: &ArrowArrayRef, ) -> Result { + fn int64_from_nanoseconds( + duration_array: &ArrowTimestampNanosecondArray, + ) -> Option { + let data = duration_array.to_data(); + let buffer = data.buffers().first()?.clone(); + arrow::array::ArrayData::builder(arrow::datatypes::DataType::Int64) + .len(duration_array.len()) + .add_buffer(buffer) + .build() + .ok() + .map(ArrowInt64Array::from) + } + match column_descriptor { ColumnDescriptor::Time(desc) => { - let time_data = column_data + let timeline = desc.timeline; + + // Sequence timelines are i64, but time columns are nanoseconds (also as i64) + let time_data_result = column_data .as_any() - .downcast_ref::>() - .ok_or_else(|| { - DisplayRecordBatchError::UnexpectedTimeColumnDataType( - desc.timeline.name().as_str().to_owned(), - column_data.data_type().to_owned(), - ) - })? - .clone(); + .downcast_ref::() + .cloned() + .or_else(|| { + column_data + .as_any() + .downcast_ref::() + .and_then(int64_from_nanoseconds) + }); + let time_data = time_data_result.ok_or_else(|| { + DisplayRecordBatchError::UnexpectedTimeColumnDataType( + timeline.name().as_str().to_owned(), + column_data.data_type().to_owned(), + ) + })?; Ok(Self::Timeline { - timeline: desc.timeline, + timeline, time_data, }) } @@ -307,7 +324,7 @@ impl DisplayRecordBatch { /// The columns in the record batch must match the selected columns. This is guaranteed by /// `re_datastore`. pub(crate) fn try_new( - row_data: &Vec>, + row_data: &Vec, selected_columns: &[ColumnDescriptor], ) -> Result { let num_rows = row_data.first().map(|arr| arr.len()).unwrap_or(0); diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index 563322bba0c5..6b6e51e19f62 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -649,6 +649,7 @@ impl PyRecordingView { /// This schema will only contain the columns that are included in the view via /// the view contents. fn schema(&self, py: Python<'_>) -> PyResult { + #![allow(clippy::unnecessary_wraps)] // In case of feature != "remote" match &self.recording { PyRecordingHandle::Local(recording) => { let borrowed: PyRef<'_, PyRecording> = recording.borrow(py);