From b4b7ec4ba334836be66a0b8f04ade72e9f09c86e Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 31 May 2024 10:42:54 +0200 Subject: [PATCH] Client-side chunks 2: introduce `TransportChunk` (#6439) A `TransportChunk` is a `Chunk` that is ready for transport and/or storage. It is very cheap to go from `Chunk` to a `TransportChunk` and vice-versa. A `TransportChunk` maps 1:1 to a native Arrow `RecordBatch`. It has a stable ABI, and can be cheaply send across process boundaries. `arrow2` has no `RecordBatch` type; we will get one once we migrate to `arrow-rs`. A `TransportChunk` is self-describing: it contains all the data _and_ metadata needed to index it into storage. We rely heavily on chunk-level and field-level metadata to communicate Rerun-specific semantics over the wire, e.g. whether some columns are already properly sorted. The Arrow metadata system is fairly limited -- it's all untyped strings --, but for now that seems good enough. It will be trivial to switch to something else later, if need be. - Fixes https://github.com/rerun-io/rerun/issues/1760 - Fixes https://github.com/rerun-io/rerun/issues/1692 - Fixes #3360 - Fixes https://github.com/rerun-io/rerun/issues/1696 --- Part of a PR series to implement our new chunk-based data model on the client-side (SDKs): - #6437 - #6438 - #6439 - #6440 - #6441 --- crates/re_chunk/src/chunk.rs | 13 +- crates/re_chunk/src/lib.rs | 2 + crates/re_chunk/src/shuffle.rs | 6 +- crates/re_chunk/src/transport.rs | 699 ++++++++++++++++++ crates/re_log_types/src/example_components.rs | 108 +++ crates/re_tuid/src/lib.rs | 12 + 6 files changed, 835 insertions(+), 5 deletions(-) create mode 100644 crates/re_chunk/src/transport.rs diff --git a/crates/re_chunk/src/chunk.rs b/crates/re_chunk/src/chunk.rs index 8de6ca212f0e..39612502f721 100644 --- a/crates/re_chunk/src/chunk.rs +++ b/crates/re_chunk/src/chunk.rs @@ -35,7 +35,7 @@ pub type ChunkId = re_tuid::Tuid; /// Its time columns might or might not be ascendingly sorted, depending on how the data was logged. /// /// This is the in-memory representation of a chunk, optimized for efficient manipulation of the -/// data within. +/// data within. For transport, see [`crate::TransportChunk`] instead. #[derive(Debug, Clone)] pub struct Chunk { pub(crate) id: ChunkId, @@ -337,7 +337,16 @@ impl Chunk { } } -// TODO(cmc): display impl +impl std::fmt::Display for Chunk { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let chunk = self.to_transport().map_err(|err| { + re_log::error_once!("couldn't display Chunk: {err}"); + std::fmt::Error + })?; + chunk.fmt(f) + } +} // TODO(cmc): sizebytes impl + sizebytes caching + sizebytes in transport metadata diff --git a/crates/re_chunk/src/lib.rs b/crates/re_chunk/src/lib.rs index 1479770e8351..d832fd309073 100644 --- a/crates/re_chunk/src/lib.rs +++ b/crates/re_chunk/src/lib.rs @@ -6,9 +6,11 @@ mod chunk; mod shuffle; +mod transport; mod util; pub use self::chunk::{Chunk, ChunkError, ChunkId, ChunkResult, ChunkTimeline}; +pub use self::transport::TransportChunk; pub use self::util::arrays_to_list_array; pub mod external { diff --git a/crates/re_chunk/src/shuffle.rs b/crates/re_chunk/src/shuffle.rs index b27b0cc94c25..7683fabf398a 100644 --- a/crates/re_chunk/src/shuffle.rs +++ b/crates/re_chunk/src/shuffle.rs @@ -297,7 +297,7 @@ mod tests { components.clone().into_iter().collect(), )?; - // eprintln!("{chunk_sorted}"); + eprintln!("{chunk_sorted}"); assert!(chunk_sorted.is_sorted()); assert!(chunk_sorted.is_sorted_uncached()); @@ -308,7 +308,7 @@ mod tests { chunk_shuffled }; - // eprintln!("{chunk_shuffled}"); + eprintln!("{chunk_shuffled}"); assert!(!chunk_shuffled.is_sorted()); assert!(!chunk_shuffled.is_sorted_uncached()); @@ -320,7 +320,7 @@ mod tests { chunk_resorted }; - // eprintln!("{chunk_resorted}"); + eprintln!("{chunk_resorted}"); assert!(chunk_resorted.is_sorted()); assert!(chunk_resorted.is_sorted_uncached()); diff --git a/crates/re_chunk/src/transport.rs b/crates/re_chunk/src/transport.rs new file mode 100644 index 000000000000..977076a4081b --- /dev/null +++ b/crates/re_chunk/src/transport.rs @@ -0,0 +1,699 @@ +use std::collections::BTreeMap; + +use arrow2::{ + array::{Array as ArrowArray, PrimitiveArray as ArrowPrimitiveArray}, + chunk::Chunk as ArrowChunk, + datatypes::{ + DataType as ArrowDatatype, Field as ArrowField, Metadata as ArrowMetadata, + Schema as ArrowSchema, TimeUnit as ArrowTimeUnit, + }, +}; + +use re_log_types::{EntityPath, RowId, TimeInt, Timeline}; +use re_types_core::Loggable as _; + +use crate::{Chunk, ChunkError, ChunkId, ChunkResult, ChunkTimeline}; + +// --- + +/// A [`Chunk`] that is ready for transport. Obtained by calling [`Chunk::to_transport`]. +/// +/// Implemented as an Arrow dataframe: a schema and a batch. +/// +/// Use the `Display` implementation to dump the chunk as a nicely formatted table. +/// +/// This has a stable ABI! The entire point of this type is to allow users to send raw arrow data +/// into Rerun. +/// This means we have to be very careful when checking the validity of the data: slipping corrupt +/// data into the store could silently break all the index search logic (e.g. think of a chunk +/// claiming to be sorted while it is in fact not). +// +// TODO(#4184): Provide APIs in all SDKs to log these all at once (temporal batches). +#[derive(Debug)] +pub struct TransportChunk { + /// The schema of the dataframe, and all chunk-level and field-level metadata. + /// + /// Take a look at the `TransportChunk::CHUNK_METADATA_*` and `TransportChunk::FIELD_METADATA_*` + /// constants for more information about available metadata. + pub schema: ArrowSchema, + + /// All the control, time and component data. + pub data: ArrowChunk>, +} + +impl std::fmt::Display for TransportChunk { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + re_format_arrow::format_dataframe( + &self.schema.metadata, + &self.schema.fields, + self.data.iter().map(|list_array| &**list_array), + ) + .fmt(f) + } +} + +impl TransportChunk { + /// The key used to identify a Rerun [`ChunkId`] in chunk-level [`ArrowSchema`] metadata. + pub const CHUNK_METADATA_KEY_ID: &'static str = "rerun.id"; + + /// The key used to identify a Rerun [`EntityPath`] in chunk-level [`ArrowSchema`] metadata. + pub const CHUNK_METADATA_KEY_ENTITY_PATH: &'static str = "rerun.entity_path"; + + /// The marker used to identify whether a chunk is sorted in chunk-level [`ArrowSchema`] metadata. + /// + /// The associated value is irrelevant -- if this marker is present, then it is true. + /// + /// Chunks are ascendingly sorted by their `RowId` column. + pub const CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID: &'static str = "rerun.is_sorted"; + + /// The key used to identify the kind of a Rerun column in field-level [`ArrowSchema`] metadata. + /// + /// That is: control columns (e.g. `row_id`), time columns or component columns. + pub const FIELD_METADATA_KEY_KIND: &'static str = "rerun.kind"; + + /// The value used to identify a Rerun time column in field-level [`ArrowSchema`] metadata. + pub const FIELD_METADATA_VALUE_KIND_TIME: &'static str = "time"; + + /// The value used to identify a Rerun control column in field-level [`ArrowSchema`] metadata. + pub const FIELD_METADATA_VALUE_KIND_CONTROL: &'static str = "control"; + + /// The value used to identify a Rerun data column in field-level [`ArrowSchema`] metadata. + pub const FIELD_METADATA_VALUE_KIND_DATA: &'static str = "data"; + + /// The marker used to identify whether a column is sorted in field-level [`ArrowSchema`] metadata. + /// + /// The associated value is irrelevant -- if this marker is present, then it is true. + /// + /// Chunks are ascendingly sorted by their `RowId` column but, depending on whether the data + /// was logged out of order or not for a given time column, that column might follow the global + /// `RowId` while still being unsorted relative to its own time order. + pub const FIELD_METADATA_MARKER_IS_SORTED_BY_TIME: &'static str = + Self::CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID; + + /// Returns the appropriate chunk-level [`ArrowSchema`] metadata for a Rerun [`ChunkId`]. + #[inline] + pub fn chunk_metadata_id(id: ChunkId) -> ArrowMetadata { + [ + ( + Self::CHUNK_METADATA_KEY_ID.to_owned(), + format!("{:X}", id.as_u128()), + ), // + ] + .into() + } + + /// Returns the appropriate chunk-level [`ArrowSchema`] metadata for a Rerun [`EntityPath`]. + #[inline] + pub fn chunk_metadata_entity_path(entity_path: &EntityPath) -> ArrowMetadata { + [ + ( + Self::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(), + entity_path.to_string(), + ), // + ] + .into() + } + + /// Returns the appropriate chunk-level [`ArrowSchema`] metadata for an `IS_SORTED` marker. + #[inline] + pub fn chunk_metadata_is_sorted() -> ArrowMetadata { + [ + ( + Self::CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID.to_owned(), + String::new(), + ), // + ] + .into() + } + + /// Returns the appropriate field-level [`ArrowSchema`] metadata for a Rerun time column. + #[inline] + pub fn field_metadata_time_column() -> ArrowMetadata { + [ + ( + Self::FIELD_METADATA_KEY_KIND.to_owned(), + Self::FIELD_METADATA_VALUE_KIND_TIME.to_owned(), + ), // + ] + .into() + } + + /// Returns the appropriate field-level [`ArrowSchema`] metadata for a Rerun control column. + #[inline] + pub fn field_metadata_control_column() -> ArrowMetadata { + [ + ( + Self::FIELD_METADATA_KEY_KIND.to_owned(), + Self::FIELD_METADATA_VALUE_KIND_CONTROL.to_owned(), + ), // + ] + .into() + } + + /// Returns the appropriate field-level [`ArrowSchema`] metadata for a Rerun data column. + #[inline] + pub fn field_metadata_data_column() -> ArrowMetadata { + [ + ( + Self::FIELD_METADATA_KEY_KIND.to_owned(), + Self::FIELD_METADATA_VALUE_KIND_DATA.to_owned(), + ), // + ] + .into() + } + + /// Returns the appropriate field-level [`ArrowSchema`] metadata for an `IS_SORTED` marker. + #[inline] + pub fn field_metadata_is_sorted() -> ArrowMetadata { + [ + ( + Self::FIELD_METADATA_MARKER_IS_SORTED_BY_TIME.to_owned(), + String::new(), + ), // + ] + .into() + } +} + +impl TransportChunk { + #[inline] + pub fn id(&self) -> ChunkResult { + match self.schema.metadata.get(Self::CHUNK_METADATA_KEY_ID) { + Some(id) => { + let id = u128::from_str_radix(id, 16).map_err(|err| ChunkError::Malformed { + reason: format!("cannot deserialize chunk id: {err}"), + })?; + Ok(ChunkId::from_u128(id)) + } + None => Err(crate::ChunkError::Malformed { + reason: format!( + "chunk id missing from metadata ({:?})", + self.schema.metadata + ), + }), + } + } + + #[inline] + pub fn entity_path(&self) -> ChunkResult { + match self + .schema + .metadata + .get(Self::CHUNK_METADATA_KEY_ENTITY_PATH) + { + Some(entity_path) => Ok(EntityPath::parse_forgiving(entity_path)), + None => Err(crate::ChunkError::Malformed { + reason: format!( + "entity path missing from metadata ({:?})", + self.schema.metadata + ), + }), + } + } + + /// Looks in the chunk metadata for the `IS_SORTED` marker. + /// + /// It is possible that a chunk is sorted but didn't set that marker. + /// This is fine, although wasteful. + #[inline] + pub fn is_sorted(&self) -> bool { + self.schema + .metadata + .get(Self::CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID) + .is_some() + } + + /// Iterates all columns of the specified `kind`. + /// + /// See: + /// * [`Self::FIELD_METADATA_VALUE_KIND_TIME`] + /// * [`Self::FIELD_METADATA_VALUE_KIND_CONTROL`] + /// * [`Self::FIELD_METADATA_VALUE_KIND_DATA`] + #[inline] + pub fn columns<'a>( + &'a self, + kind: &'a str, + ) -> impl Iterator)> + 'a { + self.schema + .fields + .iter() + .enumerate() + .filter_map(|(i, field)| { + let actual_kind = field.metadata.get(Self::FIELD_METADATA_KEY_KIND); + (actual_kind.map(|s| s.as_str()) == Some(kind)) + .then(|| self.data.columns().get(i).map(|column| (field, column))) + .flatten() + }) + } + + /// Iterates all control columns present in this chunk. + #[inline] + pub fn controls(&self) -> impl Iterator)> { + self.columns(Self::FIELD_METADATA_VALUE_KIND_CONTROL) + } + + /// Iterates all data columns present in this chunk. + #[inline] + pub fn components(&self) -> impl Iterator)> { + self.columns(Self::FIELD_METADATA_VALUE_KIND_DATA) + } + + /// Iterates all timeline columns present in this chunk. + #[inline] + pub fn timelines(&self) -> impl Iterator)> { + self.columns(Self::FIELD_METADATA_VALUE_KIND_TIME) + } + + /// How many columns in total? Includes control, time, and component columns. + #[inline] + pub fn num_columns(&self) -> usize { + self.data.columns().len() + } + + #[inline] + pub fn num_controls(&self) -> usize { + self.controls().count() + } + + #[inline] + pub fn num_timelines(&self) -> usize { + self.timelines().count() + } + + #[inline] + pub fn num_components(&self) -> usize { + self.components().count() + } + + #[inline] + pub fn num_rows(&self) -> usize { + self.data.len() + } +} + +impl Chunk { + /// Prepare the [`Chunk`] for transport. + /// + /// It is probably a good idea to sort the chunk first. + pub fn to_transport(&self) -> ChunkResult { + self.sanity_check()?; + + re_tracing::profile_function!(format!( + "num_columns={} num_rows={}", + self.num_columns(), + self.num_rows() + )); + + let Self { + id, + entity_path, + is_sorted, + row_ids, + timelines, + components, + } = self; + + let mut schema = ArrowSchema::default(); + let mut columns = Vec::with_capacity(1 /* row_ids */ + timelines.len() + components.len()); + + // Chunk-level metadata + { + re_tracing::profile_scope!("metadata"); + + schema + .metadata + .extend(TransportChunk::chunk_metadata_id(*id)); + + schema + .metadata + .extend(TransportChunk::chunk_metadata_entity_path(entity_path)); + + if *is_sorted { + schema + .metadata + .extend(TransportChunk::chunk_metadata_is_sorted()); + } + } + + // Row IDs + { + re_tracing::profile_scope!("row ids"); + + let row_ids = RowId::to_arrow(row_ids)?; + schema.fields.push( + ArrowField::new( + RowId::name().to_string(), + row_ids.data_type().clone(), + false, + ) + .with_metadata(TransportChunk::field_metadata_control_column()), + ); + columns.push(row_ids); + } + + // Timelines + { + re_tracing::profile_scope!("timelines"); + + for (timeline, info) in timelines { + let ChunkTimeline { + times, + is_sorted, + time_range: _, + } = info; + + let times = { + let values = times.iter().map(|time| time.as_i64()).collect(); + ArrowPrimitiveArray::new( + arrow2::types::PrimitiveType::Int64.into(), + values, + None, + ) + .to(timeline.datatype()) + }; + + let field = ArrowField::new( + timeline.name().to_string(), + times.data_type().clone(), + false, // timelines within a single chunk are always dense + ) + .with_metadata({ + let mut metadata = TransportChunk::field_metadata_time_column(); + if *is_sorted { + metadata.extend(TransportChunk::field_metadata_is_sorted()); + } + metadata + }); + + schema.fields.push(field); + columns.push(Box::new(times)); + } + } + + // Components + { + re_tracing::profile_scope!("components"); + + for (component_name, data) in components { + schema.fields.push( + ArrowField::new(component_name.to_string(), data.data_type().clone(), true) + .with_metadata(TransportChunk::field_metadata_data_column()), + ); + columns.push(data.clone() /* refcounted (dyn Clone) */); + } + } + + Ok(TransportChunk { + schema, + data: ArrowChunk::new(columns), + }) + } + + pub fn from_transport(chunk: &TransportChunk) -> ChunkResult { + re_tracing::profile_function!(format!( + "num_columns={} num_rows={}", + chunk.num_columns(), + chunk.num_rows() + )); + + // Metadata + let (id, entity_path, is_sorted) = { + re_tracing::profile_scope!("metadata"); + (chunk.id()?, chunk.entity_path()?, chunk.is_sorted()) + }; + + // Row IDs + let row_ids = { + re_tracing::profile_scope!("row ids"); + + let Some(column) = chunk.controls().find_map(|(field, column)| { + (field.name == RowId::name().as_str()).then_some(column) + }) else { + return Err(ChunkError::Malformed { + reason: format!("missing row_id column ({:?})", chunk.schema), + }); + }; + + RowId::from_arrow(&**column).map_err(|err| ChunkError::Malformed { + reason: format!("row_id column is not deserializable: {err}"), + })? + }; + + // Timelines + let timelines = { + re_tracing::profile_scope!("timelines"); + + let mut timelines = BTreeMap::default(); + + for (field, column) in chunk.timelines() { + // See also [`Timeline::datatype`] + let timeline = match column.data_type().to_logical_type() { + ArrowDatatype::Int64 => Timeline::new_sequence(field.name.as_str()), + ArrowDatatype::Timestamp(ArrowTimeUnit::Nanosecond, None) => { + Timeline::new_temporal(field.name.as_str()) + } + _ => { + return Err(ChunkError::Malformed { + reason: format!( + "time column '{}' is not deserializable ({:?})", + field.name, + column.data_type() + ), + }); + } + }; + + let times = column + .as_any() + .downcast_ref::>() + .ok_or_else(|| ChunkError::Malformed { + reason: format!( + "time column '{}' is not deserializable ({:?})", + field.name, + column.data_type() + ), + })?; + + if times.validity().is_some() { + return Err(ChunkError::Malformed { + reason: format!( + "time column '{}' must be dense ({:?})", + field.name, + column.data_type() + ), + }); + } + + let is_sorted = field + .metadata + .get(TransportChunk::FIELD_METADATA_MARKER_IS_SORTED_BY_TIME) + .is_some(); + + let time_chunk = ChunkTimeline::new( + is_sorted.then_some(true), + times + .values_iter() + .copied() + .map(TimeInt::new_temporal) + .collect(), + ); + + if let Some(time_chunk) = time_chunk { + if timelines.insert(timeline, time_chunk).is_some() { + return Err(ChunkError::Malformed { + reason: format!( + "time column '{}' was specified more than once", + field.name, + ), + }); + } + } + } + + timelines + }; + + // Components + let components = { + let mut components = BTreeMap::default(); + + for (field, column) in chunk.components() { + if !matches!(column.data_type(), ArrowDatatype::List(_)) { + return Err(ChunkError::Malformed { + reason: format!( + "component column '{}' is not deserializable ({:?})", + field.name, + column.data_type() + ), + }); + } + + if components + .insert( + field.name.clone().into(), + column.clone(), /* refcount */ + ) + .is_some() + { + return Err(ChunkError::Malformed { + reason: format!( + "component column '{}' was specified more than once", + field.name, + ), + }); + } + } + + components + }; + + Self::new( + id, + entity_path, + is_sorted.then_some(true), + row_ids, + timelines, + components, + ) + } +} + +#[cfg(test)] +mod tests { + use re_log_types::{ + example_components::{MyColor, MyPoint}, + TimeInt, Timeline, + }; + + use crate::arrays_to_list_array; + + use super::*; + + #[test] + fn roundtrip() -> anyhow::Result<()> { + let entity_path = EntityPath::parse_forgiving("a/b/c"); + + let timeline1 = Timeline::new_temporal("log_time"); + let timelines1 = std::iter::once(( + timeline1, + ChunkTimeline::new( + Some(true), + [42, 43, 44, 45].map(TimeInt::new_temporal).to_vec(), + ) + .unwrap(), + )) + .collect(); + + let timelines2 = BTreeMap::default(); // static + + let points1 = MyPoint::to_arrow([ + MyPoint::new(1.0, 2.0), + MyPoint::new(3.0, 4.0), + MyPoint::new(5.0, 6.0), + ])?; + let points2 = None; + let points3 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0)])?; + let points4 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + + let colors1 = MyColor::to_arrow([ + MyColor::from_rgb(1, 2, 3), + MyColor::from_rgb(4, 5, 6), + MyColor::from_rgb(7, 8, 9), + ])?; + let colors2 = MyColor::to_arrow([MyColor::from_rgb(10, 20, 30)])?; + let colors3 = None; + let colors4 = None; + + let components = [ + (MyPoint::name(), { + let list_array = arrays_to_list_array(&[ + Some(&*points1), + points2, + Some(&*points3), + Some(&*points4), + ]) + .unwrap(); + assert_eq!(4, list_array.len()); + list_array + }), + (MyPoint::name(), { + let list_array = + arrays_to_list_array(&[Some(&*colors1), Some(&*colors2), colors3, colors4]) + .unwrap(); + assert_eq!(4, list_array.len()); + list_array + }), + ]; + + let row_ids = vec![RowId::new(), RowId::new(), RowId::new(), RowId::new()]; + + for timelines in [timelines1, timelines2] { + let chunk_original = Chunk::new( + ChunkId::new(), + entity_path.clone(), + None, + row_ids.clone(), + timelines.clone(), + components.clone().into_iter().collect(), + )?; + let mut chunk_before = chunk_original.clone(); + + for _ in 0..3 { + let chunk_in_transport = chunk_before.to_transport()?; + let chunk_after = Chunk::from_transport(&chunk_in_transport)?; + + assert_eq!( + chunk_in_transport.entity_path()?, + *chunk_original.entity_path() + ); + assert_eq!( + chunk_in_transport.entity_path()?, + *chunk_after.entity_path() + ); + assert_eq!( + chunk_in_transport.num_columns(), + chunk_original.num_columns() + ); + assert_eq!(chunk_in_transport.num_columns(), chunk_after.num_columns()); + assert_eq!(chunk_in_transport.num_rows(), chunk_original.num_rows()); + assert_eq!(chunk_in_transport.num_rows(), chunk_after.num_rows()); + + assert_eq!( + chunk_in_transport.num_controls(), + chunk_original.num_controls() + ); + assert_eq!( + chunk_in_transport.num_controls(), + chunk_after.num_controls() + ); + assert_eq!( + chunk_in_transport.num_timelines(), + chunk_original.num_timelines() + ); + assert_eq!( + chunk_in_transport.num_timelines(), + chunk_after.num_timelines() + ); + assert_eq!( + chunk_in_transport.num_components(), + chunk_original.num_components() + ); + assert_eq!( + chunk_in_transport.num_components(), + chunk_after.num_components() + ); + + // eprintln!("{chunk_before}"); + eprintln!("{chunk_in_transport}"); + // eprintln!("{chunk_after}"); + + assert_eq!(chunk_before, chunk_after); + + chunk_before = chunk_after; + } + } + + Ok(()) + } +} diff --git a/crates/re_log_types/src/example_components.rs b/crates/re_log_types/src/example_components.rs index 1a9615bc7803..8fa4613b6825 100644 --- a/crates/re_log_types/src/example_components.rs +++ b/crates/re_log_types/src/example_components.rs @@ -148,6 +148,114 @@ impl Loggable for MyPoint { // ---------------------------------------------------------------------------- +#[derive(Clone, Copy, Debug, Default, PartialEq)] +pub struct MyPoint64 { + pub x: f64, + pub y: f64, +} + +impl MyPoint64 { + #[allow(clippy::should_implement_trait)] + #[inline] + pub fn from_iter(it: impl IntoIterator) -> Vec { + it.into_iter() + .map(|i| Self::new(i as f64, i as f64)) + .collect() + } +} + +impl MyPoint64 { + #[inline] + pub fn new(x: f64, y: f64) -> Self { + Self { x, y } + } +} + +re_types_core::macros::impl_into_cow!(MyPoint64); + +impl SizeBytes for MyPoint64 { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { x: _, y: _ } = self; + 0 + } +} + +impl Loggable for MyPoint64 { + type Name = re_types_core::ComponentName; + + fn name() -> Self::Name { + "example.MyPoint64".into() + } + + fn arrow_datatype() -> arrow2::datatypes::DataType { + use arrow2::datatypes::DataType::Float64; + arrow2::datatypes::DataType::Struct(Arc::new(vec![ + arrow2::datatypes::Field::new("x", Float64, false), + arrow2::datatypes::Field::new("y", Float64, false), + ])) + } + + fn to_arrow_opt<'a>( + data: impl IntoIterator>>>, + ) -> re_types_core::SerializationResult> + where + Self: 'a, + { + let (xs, ys): (Vec<_>, Vec<_>) = data + .into_iter() + .map(Option::unwrap) + .map(Into::into) + .map(|p| (p.x, p.y)) + .unzip(); + + let x_array = arrow2::array::Float64Array::from_vec(xs).boxed(); + let y_array = arrow2::array::Float64Array::from_vec(ys).boxed(); + + Ok( + arrow2::array::StructArray::new(Self::arrow_datatype(), vec![x_array, y_array], None) + .boxed(), + ) + } + + fn from_arrow_opt( + data: &dyn arrow2::array::Array, + ) -> re_types_core::DeserializationResult>> { + let array = data + .as_any() + .downcast_ref::() + .ok_or(DeserializationError::downcast_error::< + arrow2::array::StructArray, + >())?; + + let x_array = array.values()[0].as_ref(); + let y_array = array.values()[1].as_ref(); + + let xs = x_array + .as_any() + .downcast_ref::() + .ok_or(DeserializationError::downcast_error::< + arrow2::array::Float64Array, + >())?; + let ys = y_array + .as_any() + .downcast_ref::() + .ok_or(DeserializationError::downcast_error::< + arrow2::array::Float64Array, + >())?; + + Ok(xs + .values_iter() + .copied() + .zip(ys.values_iter().copied()) + .map(|(x, y)| Self { x, y }) + .map(Some) + .collect()) + } +} + +// ---------------------------------------------------------------------------- + #[derive(Clone, Copy, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] #[repr(transparent)] diff --git a/crates/re_tuid/src/lib.rs b/crates/re_tuid/src/lib.rs index f1ac0774b041..1fd133417d48 100644 --- a/crates/re_tuid/src/lib.rs +++ b/crates/re_tuid/src/lib.rs @@ -106,6 +106,14 @@ impl Tuid { Self { time_ns, inc } } + #[inline] + pub fn from_u128(id: u128) -> Self { + Self { + time_ns: (id >> 64) as u64, + inc: (id & (!0 >> 64)) as u64, + } + } + #[inline] pub fn as_u128(&self) -> u128 { ((self.time_ns as u128) << 64) | (self.inc as u128) @@ -225,4 +233,8 @@ fn test_tuid() { assert!(is_sorted(&ids)); assert_eq!(ids.iter().copied().collect::>().len(), num); assert_eq!(ids.iter().copied().collect::>().len(), num); + + for id in ids { + assert_eq!(id, Tuid::from_u128(id.as_u128())); + } }