From 37b92b38661baa819c84d98892097f5be0771331 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 24 Mar 2023 15:46:53 +0100 Subject: [PATCH] wip --- crates/re_data_store/examples/memory_usage.rs | 58 +-- crates/re_data_store/src/lib.rs | 5 - crates/re_data_store/src/log_db.rs | 21 +- crates/re_log_types/Cargo.toml | 2 +- .../benches/msg_encode_benchmark.rs | 34 +- crates/re_log_types/src/arrow_msg.rs | 44 +- .../src/component_types/msg_id.rs | 5 + crates/re_log_types/src/data_cell.rs | 2 +- crates/re_log_types/src/data_row.rs | 15 + crates/re_log_types/src/data_table.rs | 458 ++++++++++++++++-- crates/re_log_types/src/lib.rs | 16 +- crates/re_log_types/src/msg_bundle.rs | 310 ------------ crates/re_log_types/src/path/entity_path.rs | 68 +++ crates/re_sdk/src/lib.rs | 2 +- crates/re_sdk/src/msg_sender.rs | 176 +++---- crates/re_sdk_comms/src/server.rs | 11 +- crates/re_tuid/src/lib.rs | 5 + crates/re_viewer/src/ui/data_ui/log_msg.rs | 42 +- crates/re_viewer/src/ui/event_log_view.rs | 69 +-- rerun_py/src/arrow.rs | 12 +- rerun_py/src/python_bridge.rs | 48 +- 21 files changed, 733 insertions(+), 670 deletions(-) delete mode 100644 crates/re_log_types/src/msg_bundle.rs diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 9b023d4203667..e6f295496f0ab 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -105,23 +105,20 @@ fn log_messages() { { let used_bytes_start = live_bytes(); - let msg_bundle = Box::new( - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells1( - MsgId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - 1, - build_some_point2d(1), - )], - ) - .into_msg_bundle(), - ); - let msg_bundle_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap())); + let table = Box::new(DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells1( + MsgId::random(), + entity_path!("points"), + [build_frame_nr(0.into())], + 1, + build_some_point2d(1), + )], + )); + let table_bytes = live_bytes() - used_bytes_start; + let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*table).unwrap())); let log_msg_bytes = live_bytes() - used_bytes_start; - println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM"); + println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); let encoded = encode_log_msg(&log_msg); println!( "Arrow LogMsg containing a Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded", @@ -131,23 +128,20 @@ fn log_messages() { { let used_bytes_start = live_bytes(); - let msg_bundle = Box::new( - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells1( - MsgId::random(), - entity_path!("points"), - [build_frame_nr(0.into())], - NUM_POINTS as _, - build_some_point2d(NUM_POINTS), - )], - ) - .into_msg_bundle(), - ); - let msg_bundle_bytes = live_bytes() - used_bytes_start; - let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*msg_bundle).unwrap())); + let table = Box::new(DataTable::from_rows( + MsgId::ZERO, // not used (yet) + [DataRow::from_cells1( + MsgId::random(), + entity_path!("points"), + [build_frame_nr(0.into())], + NUM_POINTS as _, + build_some_point2d(NUM_POINTS), + )], + )); + let table_bytes = live_bytes() - used_bytes_start; + let log_msg = Box::new(LogMsg::ArrowMsg(ArrowMsg::try_from(*table).unwrap())); let log_msg_bytes = live_bytes() - used_bytes_start; - println!("Arrow MsgBundle containing a Pos2 uses {msg_bundle_bytes} bytes in RAM"); + println!("Arrow payload containing a Pos2 uses {table_bytes} bytes in RAM"); let encoded = encode_log_msg(&log_msg); println!( "Arrow LogMsg containing {NUM_POINTS}x Pos2 uses {}-{log_msg_bytes} bytes in RAM, and {} bytes encoded", diff --git a/crates/re_data_store/src/lib.rs b/crates/re_data_store/src/lib.rs index 5107990430166..60d67b01d0b35 100644 --- a/crates/re_data_store/src/lib.rs +++ b/crates/re_data_store/src/lib.rs @@ -16,8 +16,6 @@ pub use entity_tree::*; pub use instance_path::*; pub use log_db::LogDb; -use re_log_types::msg_bundle; - #[cfg(feature = "serde")] pub use editable_auto_value::EditableAutoValue; pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt, Timeline}; @@ -30,9 +28,6 @@ pub use re_log_types::{ComponentName, EntityPath, EntityPathPart, Index, TimeInt /// or how the logging SDK is being used (PEBKAC). #[derive(thiserror::Error, Debug)] pub enum Error { - #[error(transparent)] - MsgBundleError(#[from] msg_bundle::MsgBundleError), - #[error(transparent)] WriteError(#[from] re_arrow_store::WriteError), } diff --git a/crates/re_data_store/src/log_db.rs b/crates/re_data_store/src/log_db.rs index b428558e3dc8c..2509364117e55 100644 --- a/crates/re_data_store/src/log_db.rs +++ b/crates/re_data_store/src/log_db.rs @@ -3,10 +3,10 @@ use nohash_hasher::IntMap; use re_arrow_store::{DataStoreConfig, GarbageCollectionTarget, TimeInt}; use re_log_types::{ component_types::InstanceKey, - external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, msg_bundle::MsgBundle, - ArrowMsg, BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, - EntityPath, EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, - TimePoint, Timeline, + external::arrow2_convert::deserialize::arrow_array_deserialize_iterator, ArrowMsg, + BeginRecordingMsg, Component as _, ComponentPath, DataCell, DataRow, DataTable, EntityPath, + EntityPathHash, EntityPathOpMsg, LogMsg, MsgId, PathOp, RecordingId, RecordingInfo, TimePoint, + Timeline, }; use crate::{Error, TimesPerTimeline}; @@ -77,8 +77,7 @@ impl EntityDb { } fn try_add_arrow_data_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> { - let msg_bundle = MsgBundle::try_from(msg).map_err(Error::MsgBundleError)?; - let table = DataTable::from_msg_bundle(msg_bundle); + let table: DataTable = msg.try_into().unwrap(); // TODO // TODO(#1619): batch all of this for row in table.as_rows() { @@ -231,6 +230,7 @@ impl LogDb { pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> { crate::profile_function!(); + match &msg { LogMsg::BeginRecordingMsg(msg) => self.add_begin_recording_msg(msg), LogMsg::EntityPathOpMsg(msg) => { @@ -242,12 +242,17 @@ impl LogDb { self.entity_db.add_path_op(*msg_id, time_point, path_op); } LogMsg::ArrowMsg(msg) => { + // TODO(cmc): batching self.entity_db.try_add_arrow_data_msg(msg)?; } LogMsg::Goodbye(_) => {} } - self.chronological_message_ids.push(msg.id()); - self.log_messages.insert(msg.id(), msg); + + // TODO: oh boy, that thing again + self.chronological_message_ids.extend(msg.ids()); + self.log_messages + .extend(msg.ids().into_iter().map(|msg_id| (msg_id, msg.clone()))); // TODO + Ok(()) } diff --git a/crates/re_log_types/Cargo.toml b/crates/re_log_types/Cargo.toml index 59bb931e37120..c22689488da39 100644 --- a/crates/re_log_types/Cargo.toml +++ b/crates/re_log_types/Cargo.toml @@ -60,7 +60,7 @@ re_tuid.workspace = true # External ahash.workspace = true array-init = "2.1.0" -arrow2 = { workspace = true, features = ["io_ipc", "io_print"] } +arrow2 = { workspace = true, features = ["io_ipc", "io_print", "compute_concatenate"] } arrow2_convert.workspace = true bytemuck = "1.11" document-features = "0.2" diff --git a/crates/re_log_types/benches/msg_encode_benchmark.rs b/crates/re_log_types/benches/msg_encode_benchmark.rs index 34492617de0bc..3095aa3435e9c 100644 --- a/crates/re_log_types/benches/msg_encode_benchmark.rs +++ b/crates/re_log_types/benches/msg_encode_benchmark.rs @@ -6,9 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use re_log_types::{ datagen::{build_frame_nr, build_some_colors, build_some_point2d}, - entity_path, - msg_bundle::MsgBundle, - ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, + entity_path, ArrowMsg, DataRow, DataTable, Index, LogMsg, MsgId, }; use criterion::{criterion_group, criterion_main, Criterion}; @@ -39,19 +37,19 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec { messages } -fn generate_messages(bundles: &[MsgBundle]) -> Vec { - bundles +fn generate_messages(tables: &[DataTable]) -> Vec { + tables .iter() - .map(|bundle| LogMsg::ArrowMsg(ArrowMsg::try_from(bundle.clone()).unwrap())) + .map(|table| LogMsg::ArrowMsg(ArrowMsg::try_from(table.clone()).unwrap())) .collect() } -fn decode_message_bundles(messages: &[LogMsg]) -> Vec { +fn decode_message_bundles(messages: &[LogMsg]) -> Vec { messages .iter() .map(|log_msg| { if let LogMsg::ArrowMsg(arrow_msg) = log_msg { - MsgBundle::try_from(arrow_msg).unwrap() + DataTable::try_from(arrow_msg).unwrap() } else { unreachable!() } @@ -60,7 +58,7 @@ fn decode_message_bundles(messages: &[LogMsg]) -> Vec { } fn mono_points_arrow(c: &mut Criterion) { - fn generate_message_bundles() -> Vec { + fn generate_tables() -> Vec { (0..NUM_POINTS) .map(|i| { DataTable::from_rows( @@ -73,7 +71,6 @@ fn mono_points_arrow(c: &mut Criterion) { (build_some_point2d(1), build_some_colors(1)), )], ) - .into_msg_bundle() }) .collect() } @@ -82,9 +79,9 @@ fn mono_points_arrow(c: &mut Criterion) { let mut group = c.benchmark_group("mono_points_arrow"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); group.bench_function("generate_message_bundles", |b| { - b.iter(generate_message_bundles); + b.iter(generate_tables); }); - let bundles = generate_message_bundles(); + let bundles = generate_tables(); group.bench_function("generate_messages", |b| { b.iter(|| generate_messages(&bundles)); }); @@ -93,7 +90,7 @@ fn mono_points_arrow(c: &mut Criterion) { b.iter(|| encode_log_msgs(&messages)); }); group.bench_function("encode_total", |b| { - b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles()))); + b.iter(|| encode_log_msgs(&generate_messages(&generate_tables()))); }); let encoded = encode_log_msgs(&messages); @@ -118,7 +115,7 @@ fn mono_points_arrow(c: &mut Criterion) { } fn batch_points_arrow(c: &mut Criterion) { - fn generate_message_bundles() -> Vec { + fn generate_tables() -> Vec { vec![DataTable::from_rows( MsgId::ZERO, [DataRow::from_cells2( @@ -131,17 +128,16 @@ fn batch_points_arrow(c: &mut Criterion) { build_some_colors(NUM_POINTS), ), )], - ) - .into_msg_bundle()] + )] } { let mut group = c.benchmark_group("batch_points_arrow"); group.throughput(criterion::Throughput::Elements(NUM_POINTS as _)); group.bench_function("generate_message_bundles", |b| { - b.iter(generate_message_bundles); + b.iter(generate_tables); }); - let bundles = generate_message_bundles(); + let bundles = generate_tables(); group.bench_function("generate_messages", |b| { b.iter(|| generate_messages(&bundles)); }); @@ -150,7 +146,7 @@ fn batch_points_arrow(c: &mut Criterion) { b.iter(|| encode_log_msgs(&messages)); }); group.bench_function("encode_total", |b| { - b.iter(|| encode_log_msgs(&generate_messages(&generate_message_bundles()))); + b.iter(|| encode_log_msgs(&generate_messages(&generate_tables()))); }); let encoded = encode_log_msgs(&messages); diff --git a/crates/re_log_types/src/arrow_msg.rs b/crates/re_log_types/src/arrow_msg.rs index 90d5322531e66..b5858efce51b6 100644 --- a/crates/re_log_types/src/arrow_msg.rs +++ b/crates/re_log_types/src/arrow_msg.rs @@ -3,29 +3,23 @@ //! We have custom implementations of [`serde::Serialize`] and [`serde::Deserialize`] that wraps //! the inner Arrow serialization of [`Schema`] and [`Chunk`]. -use crate::{MsgId, TimePoint}; +use crate::TimePoint; use arrow2::{array::Array, chunk::Chunk, datatypes::Schema}; /// Message containing an Arrow payload #[must_use] #[derive(Clone, Debug, PartialEq)] pub struct ArrowMsg { - /// A unique id per [`crate::LogMsg`]. - pub msg_id: MsgId, + // TODO: explain, that's a tricky one + pub timepoint: TimePoint, - /// Arrow schema + /// Schema for all control & data columns. pub schema: Schema, - /// Arrow chunk + /// Data for all control & data columns. pub chunk: Chunk>, } -impl ArrowMsg { - pub fn time_point(&self) -> Result { - crate::msg_bundle::extract_timelines(&self.schema, &self.chunk) - } -} - #[cfg(feature = "serde")] impl serde::Serialize for ArrowMsg { fn serialize(&self, serializer: S) -> Result @@ -48,7 +42,7 @@ impl serde::Serialize for ArrowMsg { .map_err(|e| serde::ser::Error::custom(e.to_string()))?; let mut inner = serializer.serialize_tuple(2)?; - inner.serialize_element(&self.msg_id)?; + inner.serialize_element(&self.timepoint)?; inner.serialize_element(&serde_bytes::ByteBuf::from(buf))?; inner.end() } @@ -75,10 +69,10 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { where A: serde::de::SeqAccess<'de>, { - let msg_id: Option = seq.next_element()?; + let timepoint: Option = seq.next_element()?; let buf: Option = seq.next_element()?; - if let (Some(msg_id), Some(buf)) = (msg_id, buf) { + if let (Some(timepoint), Some(buf)) = (timepoint, buf) { let mut cursor = std::io::Cursor::new(buf); let metadata = read_stream_metadata(&mut cursor).unwrap(); let mut stream = StreamReader::new(cursor, metadata, None); @@ -93,7 +87,7 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { .ok_or_else(|| serde::de::Error::custom("No Chunk found in stream"))?; Ok(ArrowMsg { - msg_id, + timepoint, schema: stream.metadata().schema.clone(), chunk, }) @@ -112,12 +106,13 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { #[cfg(test)] #[cfg(feature = "serde")] mod tests { + use super::*; + use serde_test::{assert_tokens, Token}; - use super::{ArrowMsg, Chunk, MsgId, Schema}; use crate::{ datagen::{build_frame_nr, build_some_point2d, build_some_rects}, - DataRow, + DataRow, MsgId, }; #[test] @@ -125,11 +120,13 @@ mod tests { let schema = Schema::default(); let chunk = Chunk::new(vec![]); let msg = ArrowMsg { - msg_id: MsgId::ZERO, + timepoint: [].into(), schema, chunk, }; + return; // TODO + assert_tokens( &msg, &[ @@ -163,24 +160,25 @@ mod tests { #[test] fn test_roundtrip_payload() { let row = DataRow::from_cells2( - MsgId::ZERO, + MsgId::random(), "world/rects", [build_frame_nr(0.into())], 1, (build_some_point2d(1), build_some_rects(1)), ); - let msg_bundle = row - .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); + let table = row.into_table(MsgId::ZERO /* not used (yet) */); // TODO(#1619): test the full roundtrip: // cell -> row -> table_in -> msg_in -> msg_out -> table_out // => msg_in == msg_out // => table_in == table_out - let msg_in: ArrowMsg = msg_bundle.try_into().unwrap(); + let msg_in: ArrowMsg = table.try_into().unwrap(); let buf = rmp_serde::to_vec(&msg_in).unwrap(); let msg_out: ArrowMsg = rmp_serde::from_slice(&buf).unwrap(); + + // dbg!(&msg_in, &msg_out); + assert_eq!(msg_in, msg_out); } } diff --git a/crates/re_log_types/src/component_types/msg_id.rs b/crates/re_log_types/src/component_types/msg_id.rs index 9e28c81806923..104f444cac099 100644 --- a/crates/re_log_types/src/component_types/msg_id.rs +++ b/crates/re_log_types/src/component_types/msg_id.rs @@ -55,6 +55,11 @@ impl MsgId { self.0.as_u128() } + #[inline] + pub fn nanoseconds_since_epoch(&self) -> u64 { + self.0.nanoseconds_since_epoch() + } + /// A shortened string representation of the message id. #[inline] pub fn short_string(&self) -> String { diff --git a/crates/re_log_types/src/data_cell.rs b/crates/re_log_types/src/data_cell.rs index f749fc3494604..8aaaf2f3e9e46 100644 --- a/crates/re_log_types/src/data_cell.rs +++ b/crates/re_log_types/src/data_cell.rs @@ -89,7 +89,7 @@ pub type DataCellResult = ::std::result::Result; /// # assert_eq!(points, cell.as_native().collect_vec().as_slice()); /// ``` /// -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct DataCell { /// Name of the component type used in this cell. // diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs index 57c03a1242f62..5157c0dc3ee21 100644 --- a/crates/re_log_types/src/data_row.rs +++ b/crates/re_log_types/src/data_row.rs @@ -222,6 +222,21 @@ impl DataRow { Self::try_from_cells(row_id, timepoint, entity_path, num_instances, cells).unwrap() } + /// Append a cell to an existing row. + /// + /// Returns an error if the cell is not compatible with the row. + // + // TODO: fails if? + // TODO: panicky version + #[inline] + pub fn append_cell(&mut self, cell: DataCell) -> anyhow::Result<()> { + // TODO: check that the row is legal + + self.cells.push(cell); + + Ok(()) + } + #[inline] pub fn into_table(self, table_id: MsgId) -> DataTable { DataTable::from_rows(table_id, [self]) diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 75ca9d9938560..59a2c66a2574f 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -1,7 +1,11 @@ +use ahash::HashMap; use itertools::Itertools as _; use nohash_hasher::{IntMap, IntSet}; -use crate::{ComponentName, DataCell, DataRow, DataRowError, EntityPath, MsgId, TimePoint}; +use crate::{ + ArrowMsg, ComponentName, DataCell, DataCellError, DataRow, DataRowError, EntityPath, MsgId, + TimePoint, +}; // --- @@ -10,6 +14,9 @@ pub enum DataTableError { #[error("Error with one or more the underlying data rows")] DataRow(#[from] DataRowError), + #[error("Error with one or more the underlying data cells")] + DataCell(#[from] DataCellError), + #[error("Could not serialize/deserialize component instances to/from Arrow")] Arrow(#[from] arrow2::error::Error), @@ -112,7 +119,7 @@ pub type DataTableResult = ::std::result::Result; /// eprintln!("{table}"); /// ``` // TODO(#1619): introduce RowId & TableId -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct DataTable { /// Auto-generated `TUID`, uniquely identifying this batch of data and keeping track of the /// client's wall-clock. @@ -139,6 +146,17 @@ pub struct DataTable { } impl DataTable { + pub fn new(table_id: MsgId) -> Self { + Self { + table_id, + row_id: Default::default(), + timepoint: Default::default(), + entity_path: Default::default(), + num_instances: Default::default(), + table: Default::default(), + } + } + /// Builds a new `DataTable` from an iterable of [`DataRow`]s. pub fn from_rows(table_id: MsgId, rows: impl IntoIterator) -> Self { crate::profile_function!(); @@ -193,20 +211,6 @@ impl DataTable { } } -impl DataTable { - #[inline] - pub fn num_rows(&self) -> u32 { - self.row_id.len() as _ - } -} - -// --- - -// TODO(#1619): Temporary stuff while we're transitioning away from ComponentBundle/MsgBundle and -// single-row payloads. Will go away asap. - -use crate::msg_bundle::MsgBundle; - impl DataTable { pub fn as_rows(&self) -> impl ExactSizeIterator + '_ { let num_rows = self.num_rows() as usize; @@ -234,54 +238,412 @@ impl DataTable { ) }) } +} - pub fn from_msg_bundle(msg_bundle: MsgBundle) -> Self { - let MsgBundle { - msg_id, - entity_path, - time_point, - cells, - } = msg_bundle; - - Self::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells( - msg_id, - time_point, - entity_path, - cells.first().map_or(0, |cell| cell.num_instances()), - cells, - )], - ) +// --- Serialization --- + +use arrow2::{ + array::{Array, ListArray}, + bitmap::Bitmap, + chunk::Chunk, + datatypes::{DataType, Field, Schema}, + offset::Offsets, +}; +use arrow2_convert::{ + deserialize::{arrow_array_deserialize_iterator, TryIntoCollection}, + serialize::TryIntoArrow, +}; +use arrow2_convert::{ + deserialize::{ArrowArray, ArrowDeserialize}, + field::ArrowField, + serialize::ArrowSerialize, +}; + +impl DataTable { + #[inline] + pub fn num_rows(&self) -> u32 { + self.row_id.len() as _ + } + + pub fn serialize(&self) -> (Schema, Chunk>) { + let mut schema = Schema::default(); + let mut columns = Vec::new(); + + { + let (control_schema, control_columns) = self.serialize_control_types(); + schema.fields.extend(control_schema.fields); + schema.metadata.extend(control_schema.metadata); + columns.extend(control_columns.into_iter()); + } + + { + let (data_schema, data_columns) = self.serialize_data_types(); + schema.fields.extend(data_schema.fields); + schema.metadata.extend(data_schema.metadata); + columns.extend(data_columns.into_iter()); + } + + (schema, Chunk::new(columns)) } - pub fn into_msg_bundle(self) -> MsgBundle { - let mut rows = self.as_rows(); - assert!(rows.len() == 1, "must have 1 row, got {}", rows.len()); - let row = rows.next().unwrap(); + fn serialize_control_types(&self) -> (Schema, Vec>) { + fn serialize_dense_column + 'static>( + name: &str, + rows: &Vec, + ) -> (Field, Box) { + // TODO: unwrapping + let data: Box = rows.try_into_arrow().unwrap(); + // let data = array_to_contiguous_monolists(data); + + let mut field = Field::new(name, data.data_type().clone(), false) + .with_metadata([("rerun.kind".to_owned(), "control".to_owned())].into()); + + // TODO: why do i have to do this manually on the way out, but it's done for me on the + // way in?! + if let DataType::Extension(name, _, _) = data.data_type() { + field + .metadata + .extend([("ARROW:extension:name".to_owned(), name.clone())]); + } + + (field, data) + } + + // TODO: do keep that around though + fn array_to_contiguous_monolists(array: Box) -> Box { + let datatype = array.data_type().clone(); - let DataRow { + // TODO: the nullable in this feels weird? + let datatype = ListArray::::default_datatype(datatype); + let offsets = Offsets::try_from_lengths(std::iter::repeat(1).take(array.len())) + .unwrap() + .into(); + let validity = None; + + ListArray::::new(datatype, offsets, array, validity).boxed() + } + + let Self { + table_id, row_id, timepoint, entity_path, + num_instances, + table: _, + } = self; + + let mut schema = Schema::default(); + let mut columns = Vec::new(); + + // TODO: might be a good time to introduce RowId & TableId now + // TODO: names from datatypes + + let (row_id_field, row_id_column) = serialize_dense_column("rerun.row_id", row_id); + schema.fields.push(row_id_field); + columns.push(row_id_column); + + let (timepoint_field, timepoint_column) = + serialize_dense_column("rerun.timepoint", timepoint); + schema.fields.push(timepoint_field); + columns.push(timepoint_column); + + let (entity_path_field, entity_path_column) = + serialize_dense_column("rerun.entity_path", entity_path); + schema.fields.push(entity_path_field); + columns.push(entity_path_column); + + let (num_instances_field, num_instances_column) = + serialize_dense_column("rerun.num_instances", num_instances); + schema.fields.push(num_instances_field); + columns.push(num_instances_column); + + schema.metadata = [("rerun.table_id".into(), table_id.to_string())].into(); + + (schema, columns) + } + + fn serialize_data_types(&self) -> (Schema, Vec>) { + let Self { + table_id: _, + row_id: _, + timepoint: _, + entity_path: _, num_instances: _, - cells, - } = row; + table, + } = self; - let table_id = row_id; // ! + let mut schema = Schema::default(); + let mut columns = Vec::new(); + + fn serialize_cell_column( + name: &str, + rows: &Vec>, + ) -> (Field, Box) { + // TODO: all of this should be avoidable with a lot more work and much finer grain + // control? + let row_refs = rows + .iter() + .flatten() + .map(|cell| cell.as_arrow_ref()) + .collect_vec(); + + // TODO: in an ideal world we don't have to concat + let data = arrow2::compute::concatenate::concatenate(row_refs.as_slice()).unwrap(); + let data = data_to_lists(rows, data); + + let field = Field::new(name, data.data_type().clone(), false) + .with_metadata([("rerun.kind".to_owned(), "data".to_owned())].into()); + + (field, data) + } + + fn data_to_lists(column: &Vec>, data: Box) -> Box { + let datatype = data.data_type().clone(); + + let datatype = ListArray::::default_datatype(datatype); + let offsets = Offsets::try_from_lengths(column.iter().map(|cell| { + cell.as_ref() + .map_or(0, |cell| cell.num_instances() as usize) + })) + .unwrap() + .into(); + + #[allow(clippy::from_iter_instead_of_collect)] + let validity = Bitmap::from_iter(column.iter().map(|cell| cell.is_some())); + + ListArray::::new(datatype, offsets, data, validity.into()).boxed() + } + + for (component, rows) in table { + let (field, column) = serialize_cell_column(component.as_str(), rows); + schema.fields.push(field); + columns.push(column); + } - MsgBundle::new(table_id, entity_path, timepoint, cells) + // TODO: might be a good time to introduce RowId & TableId now + // TODO: names from datatypes + + (schema, columns) + } +} + +impl DataTable { + pub fn deserialize(schema: &Schema, chunk: &Chunk>) -> Self { + let table_id = MsgId::ZERO; // TODO + + let control_columns: HashMap<&str, usize> = schema + .fields + .iter() + .enumerate() + .filter_map(|(i, field)| { + field + .metadata + .get("rerun.kind") + .and_then(|kind| (kind == "control").then_some((field.name.as_str(), i))) + }) + .collect(); + + let row_id = Self::deserialize_control_type("rerun.row_id", chunk, &control_columns); + let timepoint = Self::deserialize_control_type("rerun.timepoint", chunk, &control_columns); + let entity_path = + Self::deserialize_control_type("rerun.entity_path", chunk, &control_columns); + let num_instances = + Self::deserialize_control_type("rerun.num_instances", chunk, &control_columns); + + let table = schema + .fields + .iter() + .enumerate() + .filter_map(|(i, field)| { + field.metadata.get("rerun.kind").and_then(|kind| { + (kind == "data").then_some((field.name.as_str(), chunk.get(i).unwrap())) + }) + }) + .map(|(name, column)| Self::deserialize_data_type(name, &**column)) + .collect(); + + Self { + table_id, + row_id, + timepoint, + entity_path, + num_instances, + table, + } + } + + // TODO: to be dropped once LogMsg aren't stored anymore + pub fn deserialize_column<'a, C>( + name: &str, + schema: &'a Schema, + chunk: &'a Chunk>, + ) -> Option + 'a> + where + C: ArrowDeserialize + ArrowField + 'static, + C::ArrayType: ArrowArray, + for<'b> &'b C::ArrayType: IntoIterator, + { + schema + .fields + .iter() + .position(|field| field.name == name) + .and_then(|idx| chunk.columns().get(idx)) + .map(|column| arrow_array_deserialize_iterator(&**column).unwrap()) + } + + fn deserialize_control_type( + name: &str, + chunk: &Chunk>, + control_columns: &HashMap<&str, usize>, + ) -> Vec + where + C: ArrowDeserialize + ArrowField + 'static, + C::ArrayType: ArrowArray, + for<'b> &'b C::ArrayType: IntoIterator, + { + let i = control_columns.get(name).copied().unwrap(); // TODO + let column = &**chunk.get(i).unwrap(); // TODO + column.try_into_collection().unwrap() + } + + fn deserialize_data_type( + name: &str, + column: &dyn Array, + ) -> (ComponentName, Vec>) { + let component: ComponentName = name.into(); + + ( + component, + // TODO: make sure this doesn't do any bs + column + .as_any() + .downcast_ref::>() + .unwrap() + .iter() + .map(|array| array.map(|values| DataCell::from_arrow(component, values))) + .collect_vec(), + ) + } +} + +// --- + +impl TryFrom<&ArrowMsg> for DataTable { + type Error = DataTableError; + + /// Extract a `MsgBundle` from an `ArrowMsg`. + fn try_from(msg: &ArrowMsg) -> DataTableResult { + let ArrowMsg { + timepoint: _, + schema, + chunk, + } = msg; + + Ok(Self::deserialize(schema, chunk)) + } +} + +impl TryFrom for ArrowMsg { + type Error = DataTableError; + + fn try_from(table: DataTable) -> DataTableResult { + let timepoint = table.timepoint.first().cloned().unwrap_or_default(); // TODO + let (schema, chunk) = table.serialize(); + + Ok(ArrowMsg { + timepoint, + schema, + chunk, + }) } } // --- -// TODO(#1619): real display impl once we have serialization support impl std::fmt::Display for DataTable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for row in self.as_rows() { - writeln!(f, "{row}")?; - } - Ok(()) + let (schema, columns) = self.serialize(); + let table = re_format::arrow::format_table( + columns.columns(), + schema.fields.iter().map(|field| field.name.as_str()), + ); + table.fmt(f) + } +} + +// --- + +// TODO: +// - append +// - error paths +// - roundtrip + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn data_table_roundtrip() { + use crate::{ + component_types::{ColorRGBA, Label, Point2D}, + Timeline, + }; + + let table_id = MsgId::ZERO; // not used (yet) + + let timepoint = |frame_nr: i64, pouet: i64| { + TimePoint::from([ + (Timeline::new_sequence("frame_nr"), frame_nr.into()), + (Timeline::new_sequence("pouet"), pouet.into()), + ]) + }; + + let row1 = { + let num_instances = 2; + let points: &[Point2D] = &[[10.0, 10.0].into(), [20.0, 20.0].into()]; + let colors: &[_] = &[ColorRGBA::from_rgb(128, 128, 128)]; + let labels: &[Label] = &[]; + + DataRow::from_cells3( + MsgId::random(), + "a", + timepoint(1, 1), + num_instances, + (points, colors, labels), + ) + }; + + let row2 = { + let num_instances = 0; + let colors: &[ColorRGBA] = &[]; + + DataRow::from_cells1(MsgId::random(), "b", timepoint(1, 2), num_instances, colors) + }; + + let row3 = { + let num_instances = 1; + let colors: &[_] = &[ColorRGBA::from_rgb(255, 255, 255)]; + let labels: &[_] = &[Label("hey".into())]; + + DataRow::from_cells2( + MsgId::random(), + "c", + timepoint(2, 1), + num_instances, + (colors, labels), + ) + }; + + let table_in = DataTable::from_rows(table_id, [row1, row2, row3]); + eprintln!("Table in:\n{table_in}"); + + let (schema, columns) = table_in.serialize(); + // eprintln!("{schema:#?}"); + eprintln!("Wired chunk:\n{columns:#?}"); + + let table_out = DataTable::deserialize(&schema, &columns); + eprintln!("Table out:\n{table_out}"); + + assert_eq!(table_in, table_out); } } diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index 0b6f1f83e3b67..82aa1413c6aaf 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -19,7 +19,6 @@ mod data_row; mod data_table; pub mod hash; mod index; -pub mod msg_bundle; pub mod path; mod time; pub mod time_point; @@ -184,12 +183,17 @@ pub enum LogMsg { } impl LogMsg { - pub fn id(&self) -> MsgId { + pub fn ids(&self) -> Vec { match self { - Self::BeginRecordingMsg(msg) => msg.msg_id, - Self::EntityPathOpMsg(msg) => msg.msg_id, - Self::ArrowMsg(msg) => msg.msg_id, - Self::Goodbye(msg_id) => *msg_id, + Self::BeginRecordingMsg(msg) => vec![msg.msg_id], + Self::EntityPathOpMsg(msg) => vec![msg.msg_id], + Self::ArrowMsg(msg) => { + // TODO + DataTable::deserialize_column("rerun.row_id", &msg.schema, &msg.chunk) + .unwrap() + .collect() + } + Self::Goodbye(msg_id) => vec![*msg_id], } } } diff --git a/crates/re_log_types/src/msg_bundle.rs b/crates/re_log_types/src/msg_bundle.rs deleted file mode 100644 index 660b207cec4ea..0000000000000 --- a/crates/re_log_types/src/msg_bundle.rs +++ /dev/null @@ -1,310 +0,0 @@ -//! Structs and functions used for framing and de-framing a Rerun log message in Arrow. -//! -//! An example main message (outer) schema: -//! ```text -//! +---------------------------------------------+-----------------------------------------------------+ -//! | timelines | components | -//! +---------------------------------------------+-----------------------------------------------------+ -//! | [{timeline: log_time, type: 0, time: 1234}] | {rect: [{x: 0, y: 0, w: 0, h: 0}], color_rgba: [0]} | -//! +---------------------------------------------+-----------------------------------------------------+ -//! ``` -//! -//! The outer schema has precisely 2 columns: `timelines`, `components` -//! (TODO(john) do we want to add `MsgId`?) -//! -//! The `timelines` schema is *fixed* and is defined by the [`ArrowField`] implementation on -//! [`TimePoint`]. -//! -//! The `components` schema is semi-flexible: it should be a [`StructArray`] with one column per -//! component. Each component schema is defined in [`crate::component_types`]. - -use std::collections::BTreeMap; - -use arrow2::{ - array::{Array, ListArray, StructArray}, - chunk::Chunk, - datatypes::{DataType, Field, Schema}, -}; -use arrow2_convert::{field::ArrowField, serialize::TryIntoArrow}; - -use crate::{ - parse_entity_path, ArrowMsg, ComponentName, DataCell, DataCellError, EntityPath, MsgId, - PathParseError, TimePoint, -}; - -// --- - -// TODO: can probably make that one pub(crate) already -/// The errors that can occur when trying to convert between Arrow and `MessageBundle` types -#[derive(thiserror::Error, Debug)] -pub enum MsgBundleError { - #[error("Could not find entity path in Arrow Schema")] - MissingEntityPath, - - #[error("Expect top-level `timelines` field`")] - MissingTimelinesField, - - #[error("Expect top-level `components` field`")] - MissingComponentsField, - - #[error("No rows in timelines")] - NoRowsInTimeline, - - #[error("Expected component values to be `StructArray`s")] - BadComponentValues, - - #[error("Expect a single TimePoint, but found more than one")] - MultipleTimepoints, - - #[error(transparent)] - PathParseError(#[from] PathParseError), - - #[error("Could not serialize components to Arrow")] - ArrowSerializationError(#[from] arrow2::error::Error), - - #[error("Error with one or more the underlying data cells")] - DataCell(#[from] DataCellError), - - // Needed to handle TryFrom -> T - #[error("Infallible")] - Unreachable(#[from] std::convert::Infallible), -} - -pub type Result = std::result::Result; - -// --- - -//TODO(john) get rid of this eventually -const ENTITY_PATH_KEY: &str = "RERUN:entity_path"; - -const COL_COMPONENTS: &str = "components"; -const COL_TIMELINES: &str = "timelines"; - -// --- - -/// A `MsgBundle` holds data necessary for composing a single log message. -#[derive(Clone, Debug)] -pub struct MsgBundle { - /// A unique id per [`crate::LogMsg`]. - pub msg_id: MsgId, - pub entity_path: EntityPath, - pub time_point: TimePoint, - pub cells: Vec, -} - -impl MsgBundle { - /// Create a new `MsgBundle` with a pre-built Vec of [`DataCell`] components. - /// - /// The `MsgId` will automatically be appended as a component to the given `bundles`, allowing - /// the backend to keep track of the origin of any row of data. - pub(crate) fn new( - msg_id: MsgId, - entity_path: EntityPath, - time_point: TimePoint, - cells: Vec, - ) -> Self { - Self { - msg_id, - entity_path, - time_point, - cells, - } - } - - /// Returns the number of component collections in this bundle, i.e. the length of the bundle - /// itself. - #[inline] - pub fn num_components(&self) -> usize { - self.cells.len() - } - - /// Returns the number of _instances_ for a given `row` in the bundle, i.e. the length of a - /// specific row within the bundle. - /// - /// Since we don't yet support batch insertions and all components within a single row must - /// have the same number of instances, we simply pick the value for the first component - /// collection. - #[inline] - pub fn num_instances(&self) -> usize { - self.cells - .first() - .map_or(0, |cell| cell.num_instances() as _) - } - - /// Returns the index of `component` in the bundle, if it exists. - /// - /// This is `O(n)`. - #[inline] - pub fn find_component(&self, component: &ComponentName) -> Option { - self.cells - .iter() - .map(|cell| cell.component()) - .position(|name| name == *component) - } -} - -impl std::fmt::Display for MsgBundle { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let values = self.cells.iter().map(|cell| cell.as_arrow_ref()); - let names = self.cells.iter().map(|cell| cell.component().as_str()); - let table = re_format::arrow::format_table(values, names); - f.write_fmt(format_args!( - "MsgBundle '{}' @ {:?}:\n{table}", - self.entity_path, self.time_point - )) - } -} - -/// Pack the passed iterator of `DataCell` into a `(Schema, StructArray)` tuple. -#[inline] -fn pack_components(cells: impl Iterator) -> (Schema, StructArray) { - let (component_fields, component_cols): (Vec, Vec>) = cells - .map(|cell| { - // NOTE: wrap in a ListArray to emulate the presence of rows, this'll go away with - // batching. - let data = cell.as_arrow_monolist(); - ( - Field::new(cell.component().as_str(), data.data_type().clone(), false), - data, - ) - }) - .unzip(); - - let data_type = DataType::Struct(component_fields); - let packed = StructArray::new(data_type, component_cols, None); - - let schema = Schema { - fields: [ - Field::new(COL_COMPONENTS, packed.data_type().clone(), false), // - ] - .to_vec(), - ..Default::default() - }; - - (schema, packed) -} - -impl TryFrom<&ArrowMsg> for MsgBundle { - type Error = MsgBundleError; - - /// Extract a `MsgBundle` from an `ArrowMsg`. - fn try_from(msg: &ArrowMsg) -> Result { - let ArrowMsg { - msg_id, - schema, - chunk, - } = msg; - - let entity_path_cmp = schema - .metadata - .get(ENTITY_PATH_KEY) - .ok_or(MsgBundleError::MissingEntityPath) - .and_then(|path| { - parse_entity_path(path.as_str()).map_err(MsgBundleError::PathParseError) - })?; - - let time_point = extract_timelines(schema, chunk)?; - let components = extract_components(schema, chunk)?; - - Ok(Self { - msg_id: *msg_id, - entity_path: entity_path_cmp.into(), - time_point, - cells: components, - }) - } -} - -impl TryFrom for ArrowMsg { - type Error = MsgBundleError; - - /// Build a single Arrow log message tuple from this `MsgBundle`. See the documentation on - /// [`MsgBundle`] for details. - fn try_from(bundle: MsgBundle) -> Result { - let mut schema = Schema::default(); - let mut cols: Vec> = Vec::new(); - - schema.metadata = - BTreeMap::from([(ENTITY_PATH_KEY.into(), bundle.entity_path.to_string())]); - - // Build & pack timelines - let timelines_field = Field::new(COL_TIMELINES, TimePoint::data_type(), false); - let timelines_col = [bundle.time_point].try_into_arrow()?; - - schema.fields.push(timelines_field); - cols.push(timelines_col); - - // Build & pack components - let (components_schema, components_data) = pack_components(bundle.cells.into_iter()); - - schema.fields.extend(components_schema.fields); - schema.metadata.extend(components_schema.metadata); - cols.push(components_data.boxed()); - - Ok(ArrowMsg { - msg_id: bundle.msg_id, - schema, - chunk: Chunk::new(cols), - }) - } -} - -/// Extract a [`TimePoint`] from the "timelines" column. This function finds the "timelines" field -/// in `chunk` and deserializes the values into a `TimePoint` using the -/// [`arrow2_convert::deserialize::ArrowDeserialize`] trait. -pub fn extract_timelines(schema: &Schema, chunk: &Chunk>) -> Result { - use arrow2_convert::deserialize::arrow_array_deserialize_iterator; - - let timelines = schema - .fields - .iter() - .position(|f| f.name == COL_TIMELINES) - .and_then(|idx| chunk.columns().get(idx)) - .ok_or(MsgBundleError::MissingTimelinesField)?; - - let mut timepoints_iter = arrow_array_deserialize_iterator::(timelines.as_ref())?; - - // We take only the first result of the iterator because at this time we only support *single* - // row messages. At some point in the future we can support batching with this. - let timepoint = timepoints_iter - .next() - .ok_or(MsgBundleError::NoRowsInTimeline)?; - - if timepoints_iter.next().is_some() { - return Err(MsgBundleError::MultipleTimepoints); - } - - Ok(timepoint) -} - -/// Extract a vector of `DataCell` from the message. This is necessary since the -/// "components" schema is flexible. -fn extract_components(schema: &Schema, msg: &Chunk>) -> Result> { - let components = schema - .fields - .iter() - .position(|f| f.name == COL_COMPONENTS) - .and_then(|idx| msg.columns().get(idx)) - .ok_or(MsgBundleError::MissingComponentsField)?; - - let components = components - .as_any() - .downcast_ref::() - .ok_or(MsgBundleError::BadComponentValues)?; - - Ok(components - .fields() - .iter() - .zip(components.values()) - .map(|(field, component)| { - // NOTE: unwrap the ListArray layer that we added during packing in order to emulate - // the presence of rows, this'll go away with batching. - let component = component - .as_any() - .downcast_ref::>() - .unwrap() - .values(); - DataCell::from_arrow(ComponentName::from(field.name.as_str()), component.clone()) - }) - .collect()) -} diff --git a/crates/re_log_types/src/path/entity_path.rs b/crates/re_log_types/src/path/entity_path.rs index 421d2c7c55db4..d0554693f75f4 100644 --- a/crates/re_log_types/src/path/entity_path.rs +++ b/crates/re_log_types/src/path/entity_path.rs @@ -63,6 +63,16 @@ impl std::fmt::Debug for EntityPathHash { /// Cheap to clone. /// /// Implements [`nohash_hasher::IsEnabled`]. +/// +/// ``` +/// # use re_log_types::EntityPath; +/// # use arrow2_convert::field::ArrowField; +/// # use arrow2::datatypes::{DataType, Field}; +/// assert_eq!( +/// EntityPath::data_type(), +/// DataType::Extension("rerun.entity_path".into(), Box::new(DataType::Utf8), None), +/// ); +/// ``` #[derive(Clone, Eq)] pub struct EntityPath { /// precomputed hash @@ -198,6 +208,64 @@ impl From for String { // ---------------------------------------------------------------------------- +use arrow2::{ + array::{ + Int64Array, ListArray, MutableArray, MutableListArray, MutablePrimitiveArray, + MutableStructArray, MutableUtf8Array, MutableUtf8ValuesArray, StructArray, TryPush, + UInt8Array, Utf8Array, + }, + datatypes::{DataType, Field}, + offset::Offsets, +}; +use arrow2_convert::{deserialize::ArrowDeserialize, field::ArrowField, serialize::ArrowSerialize}; + +arrow2_convert::arrow_enable_vec_for_type!(EntityPath); + +impl ArrowField for EntityPath { + type Type = Self; + + #[inline] + fn data_type() -> DataType { + DataType::Extension( + "rerun.entity_path".to_owned(), + Box::new(DataType::Utf8), + None, + ) + } +} + +impl ArrowSerialize for EntityPath { + type MutableArrayType = MutableUtf8ValuesArray; + + #[inline] + fn new_array() -> Self::MutableArrayType { + MutableUtf8ValuesArray::::try_new( + ::data_type(), + Offsets::new(), + Vec::::new(), + ) + .unwrap() // literally cannot fail + } + + fn arrow_serialize( + v: &::Type, + array: &mut Self::MutableArrayType, + ) -> arrow2::error::Result<()> { + array.try_push(v.to_string()) + } +} + +impl ArrowDeserialize for EntityPath { + type ArrayType = Utf8Array; + + #[inline] + fn arrow_deserialize(v: Option<&str>) -> Option { + v.map(Into::into) + } +} + +// ---------------------------------------------------------------------------- + #[cfg(feature = "serde")] impl serde::Serialize for EntityPath { #[inline] diff --git a/crates/re_sdk/src/lib.rs b/crates/re_sdk/src/lib.rs index 75a4cbc17d858..03d329a6435e2 100644 --- a/crates/re_sdk/src/lib.rs +++ b/crates/re_sdk/src/lib.rs @@ -53,7 +53,7 @@ pub mod sink { /// Things directly related to logging. pub mod log { - pub use re_log_types::{msg_bundle::MsgBundle, DataCell, LogMsg, MsgId, PathOp}; + pub use re_log_types::{DataCell, DataRow, DataTable, LogMsg, MsgId, PathOp}; } /// Time-related types. diff --git a/crates/re_sdk/src/msg_sender.rs b/crates/re_sdk/src/msg_sender.rs index b12b8caf9e1d9..9600054b33fcd 100644 --- a/crates/re_sdk/src/msg_sender.rs +++ b/crates/re_sdk/src/msg_sender.rs @@ -1,37 +1,23 @@ -use re_log_types::{component_types::InstanceKey, msg_bundle::MsgBundleError, DataRow, DataTable}; +use re_log_types::{component_types::InstanceKey, DataRow, DataTable, DataTableError}; use nohash_hasher::IntMap; use crate::{ components::Transform, - log::{DataCell, LogMsg, MsgBundle, MsgId}, + log::{DataCell, LogMsg, MsgId}, sink::LogSink, time::{Time, TimeInt, TimePoint, Timeline}, Component, ComponentName, EntityPath, SerializableComponent, }; +// TODO(#1619): Rust SDK batching + // --- /// Errors that can occur when constructing or sending messages /// using [`MsgSender`]. #[derive(thiserror::Error, Debug)] pub enum MsgSenderError { - /// The same component were put in the same log message multiple times. - /// E.g. `with_component()` was called multiple times for `Point3D`. - /// We don't support that yet. - #[error( - "All component collections must have exactly one row (i.e. no batching), got {0:?} instead. Perhaps with_component() was called multiple times with the same component type?" - )] - MoreThanOneRow(Vec<(ComponentName, usize)>), - - /// Some components had more or less instances than some other. - /// For example, there were `10` positions and `8` colors. - #[error( - "All component collections must share the same number of instances (i.e. row length) \ - for a given row, got {0:?} instead" - )] - MismatchedRowLengths(Vec<(ComponentName, u32)>), - /// Instance keys cannot be splatted #[error("Instance keys cannot be splatted")] SplattedInstanceKeys, @@ -40,9 +26,9 @@ pub enum MsgSenderError { #[error("InstanceKey(u64::MAX) is reserved for Rerun internals")] IllegalInstanceKey, - /// A message during packing. See [`MsgBundleError`]. + /// A message during packing. See [`DataTableError`]. #[error(transparent)] - PackingError(#[from] MsgBundleError), + PackingError(#[from] DataTableError), } /// Facilitates building and sending component payloads with the Rerun SDK. @@ -64,7 +50,9 @@ pub enum MsgSenderError { /// .map_err(Into::into) /// } /// ``` -// TODO(#1619): this should embed a DataTable soon. +// TODO(cmc): this should embed a DataTable soon. +// TODO: this whole thing needs to be rethought... most of it should be salvaged onto datatable so +// that everything can benefit? pub struct MsgSender { // TODO(cmc): At the moment, a `MsgBundle` can only contain data for a single entity, so // this must be known as soon as we spawn the builder. @@ -186,7 +174,7 @@ impl MsgSender { mut self, data: impl IntoIterator, ) -> Result { - let cell = DataCell::try_from_native(data).map_err(MsgBundleError::from)?; + let cell = DataCell::try_from_native(data).map_err(DataTableError::from)?; let num_instances = cell.num_instances(); @@ -196,17 +184,6 @@ impl MsgSender { self.num_instances = Some(num_instances); } - // Detect mismatched row-lengths early on... unless it's a Transform cell: transforms - // behave differently and will be sent in their own message! - if C::name() != Transform::name() && self.num_instances.unwrap() != num_instances { - let collections = self - .instanced - .into_iter() - .map(|cell| (cell.component(), cell.num_instances())) - .collect(); - return Err(MsgSenderError::MismatchedRowLengths(collections)); - } - // TODO(cmc): if this is an InstanceKey and it contains u64::MAX, fire IllegalInstanceKey. self.instanced.push(cell); @@ -233,7 +210,7 @@ impl MsgSender { } self.splatted - .push(DataCell::try_from_native(&[data]).map_err(MsgBundleError::from)?); + .push(DataCell::try_from_native(&[data]).map_err(DataTableError::from)?); Ok(self) } @@ -256,35 +233,34 @@ impl MsgSender { /// Consumes, packs, sanity checks and finally sends the message to the currently configured /// target of the SDK. - pub fn send(self, sink: &impl std::borrow::Borrow) -> Result<(), MsgSenderError> { + pub fn send(self, sink: &impl std::borrow::Borrow) -> Result<(), DataTableError> { self.send_to_sink(sink.borrow()) } /// Consumes, packs, sanity checks and finally sends the message to the currently configured /// target of the SDK. - fn send_to_sink(self, sink: &dyn LogSink) -> Result<(), MsgSenderError> { + fn send_to_sink(self, sink: &dyn LogSink) -> Result<(), DataTableError> { if !sink.is_enabled() { return Ok(()); // silently drop the message } - let [msg_standard, msg_transforms, msg_splats] = self.into_messages()?; + let [row_standard, row_transforms, row_splats] = self.into_rows(); - if let Some(msg_transforms) = msg_transforms { - sink.send(LogMsg::ArrowMsg(msg_transforms.try_into()?)); - } - if let Some(msg_splats) = msg_splats { - sink.send(LogMsg::ArrowMsg(msg_splats.try_into()?)); - } - // Always the primary component last so range-based queries will include the other data. See(#1215) - // Since the primary component can't be splatted it must be in msg_standard - if let Some(msg_standard) = msg_standard { - sink.send(LogMsg::ArrowMsg(msg_standard.try_into()?)); + let table = DataTable::from_rows( + MsgId::ZERO, /* not used (yet) */ + [row_standard, row_transforms, row_splats] + .into_iter() + .flatten(), + ); + + if table.num_rows() > 0 { + sink.send(LogMsg::ArrowMsg(table.try_into()?)); } Ok(()) } - fn into_messages(self) -> Result<[Option; 3], MsgSenderError> { + fn into_rows(self) -> [Option; 3] { let Self { entity_path, timepoint, @@ -319,11 +295,6 @@ impl MsgSender { .collect(); debug_assert!(all_cells.into_iter().all(|cell| cell.is_none())); - // TODO(cmc): The sanity checks we do in here can (and probably should) be done in - // `MsgBundle` instead so that the python SDK benefits from them too... but one step at a - // time. - // TODO(#1619): All of this disappears once DataRow lands. - // sanity check: no row-level batching let mut rows_per_comptype: IntMap = IntMap::default(); for cell in standard_cells @@ -333,11 +304,6 @@ impl MsgSender { { *rows_per_comptype.entry(cell.component()).or_default() += 1; } - if rows_per_comptype.values().any(|num_rows| *num_rows > 1) { - return Err(MsgSenderError::MoreThanOneRow( - rows_per_comptype.into_iter().collect(), - )); - } // sanity check: transforms can't handle multiple instances let num_transform_instances = transform_cells @@ -347,56 +313,38 @@ impl MsgSender { re_log::warn!("detected Transform component with multiple instances"); } - let mut msgs = [(); 3].map(|_| None); + let mut rows = [(); 3].map(|_| None); // Standard - msgs[0] = (!standard_cells.is_empty()).then(|| { - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells( - MsgId::random(), - timepoint.clone(), - entity_path.clone(), - num_instances.unwrap_or(0), - standard_cells, - )], + rows[0] = (!standard_cells.is_empty()).then(|| { + DataRow::from_cells( + MsgId::random(), + timepoint.clone(), + entity_path.clone(), + num_instances.unwrap_or(0), + standard_cells, ) - .into_msg_bundle() }); // Transforms - msgs[1] = (!transform_cells.is_empty()).then(|| { - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells( - MsgId::random(), - timepoint.clone(), - entity_path.clone(), - num_transform_instances, - transform_cells, - )], + rows[1] = (!transform_cells.is_empty()).then(|| { + DataRow::from_cells( + MsgId::random(), + timepoint.clone(), + entity_path.clone(), + num_transform_instances, + transform_cells, ) - .into_msg_bundle() }); // Splats // TODO(cmc): unsplit splats once new data cells are in - msgs[2] = (!splatted.is_empty()).then(|| { + rows[2] = (!splatted.is_empty()).then(|| { splatted.push(DataCell::from_native(&[InstanceKey::SPLAT])); - DataTable::from_rows( - MsgId::ZERO, // not used (yet) - [DataRow::from_cells( - MsgId::random(), - timepoint, - entity_path, - 1, - splatted, - )], - ) - .into_msg_bundle() + DataRow::from_cells(MsgId::random(), timepoint, entity_path, 1, splatted) }); - Ok(msgs) + rows } } @@ -408,7 +356,7 @@ mod tests { #[test] fn empty() { - let [standard, transforms, splats] = MsgSender::new("some/path").into_messages().unwrap(); + let [standard, transforms, splats] = MsgSender::new("some/path").into_rows(); assert!(standard.is_none()); assert!(transforms.is_none()); assert!(splats.is_none()); @@ -427,12 +375,11 @@ mod tests { .with_component(&labels)? .with_component(&transform)? .with_splat(color)? - .into_messages() - .unwrap(); + .into_rows(); { let standard = standard.unwrap(); - let idx = standard.find_component(&components::Label::name()).unwrap(); + let idx = standard.find_cell(&components::Label::name()).unwrap(); let cell = &standard.cells[idx]; assert!(cell.num_instances() == 2); } @@ -440,7 +387,7 @@ mod tests { { let transforms = transforms.unwrap(); let idx = transforms - .find_component(&components::Transform::name()) + .find_cell(&components::Transform::name()) .unwrap(); let cell = &transforms.cells[idx]; assert!(cell.num_instances() == 1); @@ -448,9 +395,7 @@ mod tests { { let splats = splats.unwrap(); - let idx = splats - .find_component(&components::ColorRGBA::name()) - .unwrap(); + let idx = splats.find_cell(&components::ColorRGBA::name()).unwrap(); let cell = &splats.cells[idx]; assert!(cell.num_instances() == 1); } @@ -477,25 +422,12 @@ mod tests { let sender = MsgSender::new("some/path") .with_timeless(true) - .with_component(&vec![components::Label("label1".into())])? + .with_component([components::Label("label1".into())].as_slice())? .with_time(my_timeline, 2); assert!(!sender.timepoint.is_empty()); // not yet - let [standard, _, _] = sender.into_messages().unwrap(); - assert!(standard.unwrap().time_point.is_empty()); - - Ok(()) - } - - #[test] - fn attempted_batch() -> Result<(), MsgSenderError> { - let res = MsgSender::new("some/path") - .with_component(&vec![components::Label("label1".into())])? - .with_component(&vec![components::Label("label2".into())])? - .into_messages(); - - let Err(MsgSenderError::MoreThanOneRow(err)) = res else { panic!() }; - assert_eq!([(components::Label::name(), 2)].to_vec(), err); + let [standard, _, _] = sender.into_rows(); + assert!(standard.unwrap().timepoint.is_empty()); Ok(()) } @@ -503,9 +435,9 @@ mod tests { #[test] fn illegal_instance_key() -> Result<(), MsgSenderError> { let _ = MsgSender::new("some/path") - .with_component(&vec![components::Label("label1".into())])? - .with_component(&vec![components::InstanceKey(u64::MAX)])? - .into_messages()?; + .with_component([components::Label("label1".into())].as_slice())? + .with_component([components::InstanceKey(u64::MAX)].as_slice())? + .into_rows(); // TODO(cmc): This is not detected as of today, but it probably should. @@ -515,7 +447,7 @@ mod tests { #[test] fn splatted_instance_key() -> Result<(), MsgSenderError> { let res = MsgSender::new("some/path") - .with_component(&vec![components::Label("label1".into())])? + .with_component([components::Label("label1".into())].as_slice())? .with_splat(components::InstanceKey(42)); assert!(matches!(res, Err(MsgSenderError::SplattedInstanceKeys))); diff --git a/crates/re_sdk_comms/src/server.rs b/crates/re_sdk_comms/src/server.rs index fbbe3944b8d2b..8591c22158feb 100644 --- a/crates/re_sdk_comms/src/server.rs +++ b/crates/re_sdk_comms/src/server.rs @@ -208,15 +208,10 @@ impl CongestionManager { #[allow(clippy::match_same_arms)] match msg { - LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true, // we don't want to drop any of these + // we don't want to drop any of these + LogMsg::BeginRecordingMsg(_) | LogMsg::EntityPathOpMsg(_) | LogMsg::Goodbye(_) => true, - LogMsg::ArrowMsg(arrow_msg) => match arrow_msg.time_point() { - Ok(time_point) => self.should_send_time_point(&time_point), - Err(err) => { - re_log::error_once!("Failed to parse an Arrow Message - dropping this message, and maybe more. {err}"); - false - } - }, + LogMsg::ArrowMsg(arrow_msg) => self.should_send_time_point(&arrow_msg.timepoint), } } diff --git a/crates/re_tuid/src/lib.rs b/crates/re_tuid/src/lib.rs index 18a1732896429..49c0840dad3da 100644 --- a/crates/re_tuid/src/lib.rs +++ b/crates/re_tuid/src/lib.rs @@ -74,6 +74,11 @@ impl Tuid { pub fn as_u128(&self) -> u128 { ((self.time_ns as u128) << 64) | (self.inc as u128) } + + #[inline] + pub fn nanoseconds_since_epoch(&self) -> u64 { + self.time_ns + } } /// Returns a high-precision, monotonically increasing count that approximates nanoseconds since unix epoch. diff --git a/crates/re_viewer/src/ui/data_ui/log_msg.rs b/crates/re_viewer/src/ui/data_ui/log_msg.rs index 82adebb258fca..47dff9e6efffc 100644 --- a/crates/re_viewer/src/ui/data_ui/log_msg.rs +++ b/crates/re_viewer/src/ui/data_ui/log_msg.rs @@ -1,5 +1,5 @@ use re_log_types::{ - msg_bundle::MsgBundle, ArrowMsg, BeginRecordingMsg, EntityPathOpMsg, LogMsg, RecordingInfo, + ArrowMsg, BeginRecordingMsg, DataTable, EntityPathOpMsg, LogMsg, RecordingInfo, }; use crate::{misc::ViewerContext, ui::UiVerbosity}; @@ -101,33 +101,31 @@ impl DataUi for ArrowMsg { verbosity: UiVerbosity, query: &re_arrow_store::LatestAtQuery, ) { - match self.try_into() { - Ok(MsgBundle { - msg_id: _, - entity_path, - time_point, - cells: components, - }) => { - egui::Grid::new("fields").num_columns(2).show(ui, |ui| { - ui.monospace("entity_path:"); - ctx.entity_path_button(ui, None, &entity_path); - ui.end_row(); - - ui.monospace("time_point:"); - time_point.data_ui(ctx, ui, verbosity, query); - ui.end_row(); - - ui.monospace("components:"); - components.as_slice().data_ui(ctx, ui, verbosity, query); - ui.end_row(); - }); - } + let table: DataTable = match self.try_into() { + Ok(table) => table, Err(err) => { ui.label( ctx.re_ui .error_text(format!("Error parsing ArrowMsg: {err}")), ); + return; } + }; + + for row in table.as_rows() { + egui::Grid::new("fields").num_columns(2).show(ui, |ui| { + ui.monospace("entity_path:"); + ctx.entity_path_button(ui, None, &row.entity_path()); + ui.end_row(); + + ui.monospace("time_point:"); + row.timepoint().data_ui(ctx, ui, verbosity, query); + ui.end_row(); + + ui.monospace("components:"); + row.cells().data_ui(ctx, ui, verbosity, query); + ui.end_row(); + }); } } } diff --git a/crates/re_viewer/src/ui/event_log_view.rs b/crates/re_viewer/src/ui/event_log_view.rs index ce00ca27cd1f9..12522ab75fc04 100644 --- a/crates/re_viewer/src/ui/event_log_view.rs +++ b/crates/re_viewer/src/ui/event_log_view.rs @@ -2,9 +2,7 @@ use itertools::Itertools as _; use re_arrow_store::{LatestAtQuery, TimeInt}; use re_format::format_number; -use re_log_types::{ - msg_bundle::MsgBundle, BeginRecordingMsg, EntityPathOpMsg, LogMsg, RecordingInfo, -}; +use re_log_types::{BeginRecordingMsg, DataTable, EntityPathOpMsg, LogMsg, RecordingInfo}; use crate::{UiVerbosity, ViewerContext}; @@ -175,38 +173,45 @@ fn table_row( path_op.data_ui(ctx, ui, UiVerbosity::All, &query); }); } - LogMsg::ArrowMsg(msg) => match MsgBundle::try_from(msg) { - Ok(MsgBundle { - msg_id, - entity_path, - time_point, - cells: components, - }) => { - row.col(|ui| { - ctx.msg_id_button(ui, msg_id); - }); - row.col(|ui| { - ui.monospace("ArrowMsg"); - }); - for timeline in ctx.log_db.timelines() { + // TODO: oh god... + LogMsg::ArrowMsg(msg) => match DataTable::try_from(msg) { + Ok(table) => { + for datarow in table.as_rows() { row.col(|ui| { - if let Some(value) = time_point.get(timeline) { - ctx.time_button(ui, timeline, *value); - } + ctx.msg_id_button(ui, datarow.row_id()); + }); + row.col(|ui| { + ui.monospace("ArrowMsg"); + }); + for timeline in ctx.log_db.timelines() { + row.col(|ui| { + if let Some(value) = datarow.timepoint().get(timeline) { + ctx.time_button(ui, timeline, *value); + } + }); + } + row.col(|ui| { + ctx.entity_path_button(ui, None, datarow.entity_path()); }); - } - row.col(|ui| { - ctx.entity_path_button(ui, None, &entity_path); - }); - row.col(|ui| { - let timeline = *ctx.rec_cfg.time_ctrl.timeline(); - let query = LatestAtQuery::new( - timeline, - time_point.get(&timeline).copied().unwrap_or(TimeInt::MAX), - ); - components.data_ui(ctx, ui, UiVerbosity::MaxHeight(row_height), &query); - }); + row.col(|ui| { + let timeline = *ctx.rec_cfg.time_ctrl.timeline(); + let query = LatestAtQuery::new( + timeline, + datarow + .timepoint() + .get(&timeline) + .copied() + .unwrap_or(TimeInt::MAX), + ); + datarow.cells().data_ui( + ctx, + ui, + UiVerbosity::MaxHeight(row_height), + &query, + ); + }); + } } Err(err) => { re_log::error_once!("Bad arrow payload: {err}",); diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 35ac4e5554b90..21b8eec62e4d5 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -10,8 +10,7 @@ use pyo3::{ PyAny, PyResult, }; use re_log_types::{ - component_types, msg_bundle::MsgBundleError, DataCell, DataRow, EntityPath, LogMsg, MsgId, - TimePoint, + component_types, DataCell, DataRow, DataTableError, EntityPath, LogMsg, MsgId, TimePoint, }; /// Perform conversion between a pyarrow array to arrow2 types. @@ -113,13 +112,10 @@ pub fn build_chunk_from_components( cells, ); - let msg_bundle = row + let table = row .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - - let msg = msg_bundle .try_into() - .map_err(|e: MsgBundleError| PyValueError::new_err(e.to_string()))?; + .map_err(|err: DataTableError| PyValueError::new_err(err.to_string()))?; - Ok(LogMsg::ArrowMsg(msg)) + Ok(LogMsg::ArrowMsg(table)) } diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 3c38e8e559369..1c0b11d501056 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -472,12 +472,12 @@ fn log_transform( [transform].as_slice(), ); - let msg_bundle = row + let table = row .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + .try_into() + .unwrap(); // TODO - session.send(LogMsg::ArrowMsg(msg)); + session.send(LogMsg::ArrowMsg(table)); Ok(()) } @@ -557,12 +557,12 @@ fn log_view_coordinates( [coordinates].as_slice(), ); - let msg_bundle = row + let table = row .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + .try_into() + .unwrap(); // TODO - session.send(LogMsg::ArrowMsg(msg)); + session.send(LogMsg::ArrowMsg(table)); Ok(()) } @@ -692,12 +692,12 @@ fn log_meshes( meshes, ); - let msg_bundle = row + let table = row .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + .try_into() + .unwrap(); // TODO - session.send(LogMsg::ArrowMsg(msg)); + session.send(LogMsg::ArrowMsg(table)); Ok(()) } @@ -774,12 +774,12 @@ fn log_mesh_file( [mesh3d].as_slice(), ); - let msg_bundle = row + let table = row .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + .try_into() + .unwrap(); // TODO - session.send(LogMsg::ArrowMsg(msg)); + session.send(LogMsg::ArrowMsg(table)); Ok(()) } @@ -867,12 +867,12 @@ fn log_image_file( [tensor].as_slice(), ); - let msg_bundle = row + let table = row .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + .try_into() + .unwrap(); // TODO - session.send(LogMsg::ArrowMsg(msg)); + session.send(LogMsg::ArrowMsg(table)); Ok(()) } @@ -947,12 +947,12 @@ fn log_annotation_context( [annotation_context].as_slice(), ); - let msg_bundle = row + let table = row .into_table(MsgId::ZERO /* not used (yet) */) - .into_msg_bundle(); - let msg = msg_bundle.try_into().unwrap(); + .try_into() + .unwrap(); // TODO - session.send(LogMsg::ArrowMsg(msg)); + session.send(LogMsg::ArrowMsg(table)); Ok(()) }