diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index d38e3fce7df9..f9bbcffd04eb 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -181,9 +181,10 @@ Update instructions: | --------------- | --------------------------------------------------------------------------------------------- | | re_chunk | A chunk of Rerun data, encoded using Arrow. Used for logging, transport, storage and compute. | | re_chunk_store | An in-memory time series database for Rerun log data, based on Apache Arrow. | +| re_format_arrow | Formatting of Apache Arrow tables. | | re_log_types | The basic building blocks of the Rerun data types and tables. | +| re_sorbet | Rerun arrow metadata and record batch definitions. | | re_types_core | The core traits and types that power Rerun's data model. | -| re_format_arrow | Formatting of Apache Arrow tables. | ### Data flow diff --git a/Cargo.lock b/Cargo.lock index 53ebfcc9378d..1dad0b098aa9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5758,6 +5758,7 @@ dependencies = [ "re_log_encoding", "re_log_types", "re_protos", + "re_sorbet", "re_tracing", "re_types", "re_types_core", @@ -6469,6 +6470,17 @@ dependencies = [ "web-time", ] +[[package]] +name = "re_sorbet" +version = "0.22.0-alpha.1+dev" +dependencies = [ + "arrow", + "re_log", + "re_log_types", + "re_types_core", + "thiserror 1.0.65", +] + [[package]] name = "re_string_interner" version = "0.22.0-alpha.1+dev" diff --git a/Cargo.toml b/Cargo.toml index cd9451fa1177..e68596999711 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ re_log_encoding = { path = "crates/store/re_log_encoding", version = "=0.22.0-al re_log_types = { path = "crates/store/re_log_types", version = "=0.22.0-alpha.1", default-features = false } re_query = { path = "crates/store/re_query", version = "=0.22.0-alpha.1", default-features = false } re_sdk_comms = { path = "crates/store/re_sdk_comms", version = "=0.22.0-alpha.1", default-features = false } +re_sorbet = { path = "crates/store/re_sorbet", version = "=0.22.0-alpha.1", default-features = false } re_types = { path = "crates/store/re_types", version = "=0.22.0-alpha.1", default-features = false } re_types_core = { path = "crates/store/re_types_core", version = "=0.22.0-alpha.1", default-features = false } re_ws_comms = { path = "crates/store/re_ws_comms", version = "=0.22.0-alpha.1", default-features = false } diff --git a/crates/store/re_chunk_store/Cargo.toml b/crates/store/re_chunk_store/Cargo.toml index f0fbfbfadb19..f57eda12dbaf 100644 --- a/crates/store/re_chunk_store/Cargo.toml +++ b/crates/store/re_chunk_store/Cargo.toml @@ -41,6 +41,7 @@ re_log = { workspace = true, features = ["setup"] } re_log_encoding = { workspace = true, features = ["decoder"] } re_log_types.workspace = true re_protos.workspace = true +re_sorbet.workspace = true re_tracing.workspace = true re_types_core.workspace = true diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index 894eab9f6606..61c4fb96650b 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -12,8 +12,9 @@ use arrow::{ use itertools::Itertools; use re_chunk::TimelineName; -use re_log_types::{ComponentPath, EntityPath, ResolvedTimeRange, TimeInt, Timeline}; -use re_types_core::{ArchetypeFieldName, ArchetypeName, ComponentDescriptor, ComponentName}; +use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, Timeline}; +use re_sorbet::{ComponentColumnDescriptor, TimeColumnDescriptor}; +use re_types_core::ComponentName; use crate::{ChunkStore, ColumnMetadata}; @@ -76,299 +77,6 @@ impl ColumnDescriptor { } } -/// Describes a time column, such as `log_time`. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct TimeColumnDescriptor { - /// The timeline this column is associated with. - pub timeline: Timeline, - - /// The Arrow datatype of the column. - pub datatype: ArrowDatatype, -} - -impl PartialOrd for TimeColumnDescriptor { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for TimeColumnDescriptor { - #[inline] - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - let Self { - timeline, - datatype: _, - } = self; - timeline.cmp(&other.timeline) - } -} - -impl TimeColumnDescriptor { - /// Used when returning a null column, i.e. when a lookup failed. - #[inline] - pub fn new_null(name: TimelineName) -> Self { - Self { - // TODO(cmc): I picked a sequence here because I have to pick something. - // It doesn't matter, only the name will remain in the Arrow schema anyhow. - timeline: Timeline::new_sequence(name), - datatype: ArrowDatatype::Null, - } - } - - #[inline] - pub fn timeline(&self) -> Timeline { - self.timeline - } - - #[inline] - pub fn name(&self) -> &TimelineName { - self.timeline.name() - } - - #[inline] - pub fn typ(&self) -> re_log_types::TimeType { - self.timeline.typ() - } - - #[inline] - pub fn datatype(&self) -> &ArrowDatatype { - &self.datatype - } - - #[inline] - pub fn to_arrow_field(&self) -> ArrowField { - let Self { timeline, datatype } = self; - - let nullable = true; // Time column must be nullable since static data doesn't have a time. - - let metadata = std::iter::once(Some(( - "sorbet.index_name".to_owned(), - timeline.name().to_string(), - ))) - .flatten() - .collect(); - - ArrowField::new(timeline.name().to_string(), datatype.clone(), nullable) - .with_metadata(metadata) - } -} - -/// Describes a data/component column, such as `Position3D`. -// -// TODO(#6889): Fully sorbetize this thing? `ArchetypeName` and such don't make sense in that -// context. And whatever `archetype_field_name` ends up being, it needs interning. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ComponentColumnDescriptor { - /// The path of the entity. - pub entity_path: EntityPath, - - /// Optional name of the `Archetype` associated with this data. - /// - /// `None` if the data wasn't logged through an archetype. - /// - /// Example: `rerun.archetypes.Points3D`. - pub archetype_name: Option, - - /// Optional name of the field within `Archetype` associated with this data. - /// - /// `None` if the data wasn't logged through an archetype. - /// - /// Example: `positions`. - pub archetype_field_name: Option, - - /// Semantic name associated with this data. - /// - /// This is fully implied by `archetype_name` and `archetype_field`, but - /// included for semantic convenience. - /// - /// Example: `rerun.components.Position3D`. - pub component_name: ComponentName, - - /// The Arrow datatype of the stored column. - /// - /// This is the log-time datatype corresponding to how this data is encoded - /// in a chunk. Currently this will always be an [`arrow::array::ListArray`], but as - /// we introduce mono-type optimization, this might be a native type instead. - pub store_datatype: ArrowDatatype, - - /// Whether this column represents static data. - pub is_static: bool, - - /// Whether this column represents an indicator component. - pub is_indicator: bool, - - /// Whether this column represents a [`Clear`]-related components. - /// - /// [`Clear`]: re_types_core::archetypes::Clear - pub is_tombstone: bool, - - /// Whether this column contains either no data or only contains null and/or empty values (`[]`). - pub is_semantically_empty: bool, -} - -impl PartialOrd for ComponentColumnDescriptor { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for ComponentColumnDescriptor { - #[inline] - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - let Self { - entity_path, - archetype_name, - archetype_field_name, - component_name, - store_datatype: _, - is_static: _, - is_indicator: _, - is_tombstone: _, - is_semantically_empty: _, - } = self; - - entity_path - .cmp(&other.entity_path) - .then_with(|| component_name.cmp(&other.component_name)) - .then_with(|| archetype_name.cmp(&other.archetype_name)) - .then_with(|| archetype_field_name.cmp(&other.archetype_field_name)) - } -} - -impl std::fmt::Display for ComponentColumnDescriptor { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Self { - entity_path, - archetype_name, - archetype_field_name, - component_name, - store_datatype: _, - is_static, - is_indicator: _, - is_tombstone: _, - is_semantically_empty: _, - } = self; - - let descriptor = ComponentDescriptor { - archetype_name: *archetype_name, - archetype_field_name: *archetype_field_name, - component_name: *component_name, - }; - - let s = format!("{entity_path}@{}", descriptor.short_name()); - - if *is_static { - f.write_fmt(format_args!("|{s}|")) - } else { - f.write_str(&s) - } - } -} - -impl From for re_types_core::ComponentDescriptor { - #[inline] - fn from(descr: ComponentColumnDescriptor) -> Self { - Self { - archetype_name: descr.archetype_name, - archetype_field_name: descr.archetype_field_name, - component_name: descr.component_name, - } - } -} - -impl From<&ComponentColumnDescriptor> for re_types_core::ComponentDescriptor { - #[inline] - fn from(descr: &ComponentColumnDescriptor) -> Self { - Self { - archetype_name: descr.archetype_name, - archetype_field_name: descr.archetype_field_name, - component_name: descr.component_name, - } - } -} - -impl ComponentColumnDescriptor { - pub fn component_path(&self) -> ComponentPath { - ComponentPath { - entity_path: self.entity_path.clone(), - component_name: self.component_name, - } - } - - #[inline] - pub fn matches(&self, entity_path: &EntityPath, component_name: &str) -> bool { - &self.entity_path == entity_path && self.component_name.matches(component_name) - } - - fn metadata(&self) -> std::collections::HashMap { - let Self { - entity_path, - archetype_name, - archetype_field_name, - component_name, - store_datatype: _, - is_static, - is_indicator, - is_tombstone, - is_semantically_empty, - } = self; - - [ - (*is_static).then_some(("sorbet.is_static".to_owned(), "yes".to_owned())), - (*is_indicator).then_some(("sorbet.is_indicator".to_owned(), "yes".to_owned())), - (*is_tombstone).then_some(("sorbet.is_tombstone".to_owned(), "yes".to_owned())), - (*is_semantically_empty) - .then_some(("sorbet.is_semantically_empty".to_owned(), "yes".to_owned())), - Some(("sorbet.path".to_owned(), entity_path.to_string())), - Some(( - "sorbet.semantic_type".to_owned(), - component_name.short_name().to_owned(), - )), - archetype_name.map(|name| { - ( - "sorbet.semantic_family".to_owned(), - name.short_name().to_owned(), - ) - }), - archetype_field_name - .as_ref() - .map(|name| ("sorbet.logical_type".to_owned(), name.to_string())), - ] - .into_iter() - .flatten() - .collect() - } - - #[inline] - pub fn returned_datatype(&self) -> ArrowDatatype { - self.store_datatype.clone() - } - - #[inline] - pub fn to_arrow_field(&self) -> ArrowField { - let entity_path = &self.entity_path; - let descriptor = ComponentDescriptor { - archetype_name: self.archetype_name, - archetype_field_name: self.archetype_field_name, - component_name: self.component_name, - }; - - ArrowField::new( - // NOTE: Uncomment this to expose fully-qualified names in the Dataframe APIs! - // I'm not doing that right now, to avoid breaking changes (and we need to talk about - // what the syntax for these fully-qualified paths need to look like first). - format!("{}:{}", entity_path, descriptor.component_name.short_name()), - // format!("{entity_path}@{}", descriptor.short_name()), - self.returned_datatype(), - true, /* nullable */ - ) - // TODO(#6889): This needs some proper sorbetization -- I just threw these names randomly. - .with_metadata(self.metadata()) - } -} - // --- Selectors --- /// Describes a column selection to return as part of a query. @@ -728,12 +436,10 @@ impl ChunkStore { pub fn schema(&self) -> Vec { re_tracing::profile_function!(); - let timelines = self.all_timelines_sorted().into_iter().map(|timeline| { - ColumnDescriptor::Time(TimeColumnDescriptor { - timeline, - datatype: timeline.datatype(), - }) - }); + let timelines = self + .all_timelines_sorted() + .into_iter() + .map(|timeline| ColumnDescriptor::Time(TimeColumnDescriptor::from(timeline))); let mut components = self .per_column_metadata @@ -759,16 +465,17 @@ impl ChunkStore { } = metadata; ComponentColumnDescriptor { - entity_path: entity_path.clone(), - archetype_name: component_descr.archetype_name, - archetype_field_name: component_descr.archetype_field_name, - component_name: component_descr.component_name, // NOTE: The data is always a at least a list, whether it's latest-at or range. // It might be wrapped further in e.g. a dict, but at the very least // it's a list. store_datatype: ArrowListArray::DATA_TYPE_CONSTRUCTOR( ArrowField::new("item", datatype.clone(), true).into(), ), + + entity_path: entity_path.clone(), + archetype_name: component_descr.archetype_name, + archetype_field_name: component_descr.archetype_field_name, + component_name: component_descr.component_name, is_static, is_indicator, is_tombstone, @@ -777,18 +484,7 @@ impl ChunkStore { }) .collect_vec(); - components.sort_by(|descr1, descr2| { - descr1 - .entity_path - .cmp(&descr2.entity_path) - .then(descr1.archetype_name.cmp(&descr2.archetype_name)) - .then( - descr1 - .archetype_field_name - .cmp(&descr2.archetype_field_name), - ) - .then(descr1.component_name.cmp(&descr2.component_name)) - }); + components.sort(); timelines .chain(components.into_iter().map(ColumnDescriptor::Component)) @@ -805,10 +501,7 @@ impl ChunkStore { .copied() .unwrap_or_else(|| Timeline::new_temporal(selector.timeline)); - TimeColumnDescriptor { - timeline, - datatype: timeline.datatype(), - } + TimeColumnDescriptor::from(timeline) } /// Given a [`ComponentColumnSelector`], returns the corresponding [`ComponentColumnDescriptor`]. diff --git a/crates/store/re_chunk_store/src/lib.rs b/crates/store/re_chunk_store/src/lib.rs index 6c64a98e6a6f..3bb2dcb85f7a 100644 --- a/crates/store/re_chunk_store/src/lib.rs +++ b/crates/store/re_chunk_store/src/lib.rs @@ -26,22 +26,18 @@ mod writes; mod protobuf_conversions; -pub use self::dataframe::{ - ColumnDescriptor, ColumnSelector, ComponentColumnDescriptor, ComponentColumnSelector, Index, - IndexRange, IndexValue, QueryExpression, SparseFillStrategy, TimeColumnDescriptor, - TimeColumnSelector, ViewContentsSelector, -}; -pub use self::events::{ - ChunkCompactionReport, ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent, -}; -pub use self::gc::{GarbageCollectionOptions, GarbageCollectionTarget}; -pub use self::stats::{ChunkStoreChunkStats, ChunkStoreStats}; -pub use self::store::{ - ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ChunkStoreHandle, ColumnMetadata, -}; -pub use self::subscribers::{ - ChunkStoreSubscriber, ChunkStoreSubscriberHandle, PerStoreChunkSubscriber, +pub use self::{ + dataframe::{ + ColumnDescriptor, ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue, + QueryExpression, SparseFillStrategy, TimeColumnSelector, ViewContentsSelector, + }, + events::{ChunkCompactionReport, ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent}, + gc::{GarbageCollectionOptions, GarbageCollectionTarget}, + stats::{ChunkStoreChunkStats, ChunkStoreStats}, + store::{ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ChunkStoreHandle, ColumnMetadata}, + subscribers::{ChunkStoreSubscriber, ChunkStoreSubscriberHandle, PerStoreChunkSubscriber}, }; +pub use re_sorbet::{ComponentColumnDescriptor, TimeColumnDescriptor}; pub(crate) use self::store::ColumnMetadataState; diff --git a/crates/store/re_log_types/src/time_point/mod.rs b/crates/store/re_log_types/src/time_point/mod.rs index 2626fb0b4a88..e8977ddfaab0 100644 --- a/crates/store/re_log_types/src/time_point/mod.rs +++ b/crates/store/re_log_types/src/time_point/mod.rs @@ -3,19 +3,22 @@ use std::{ sync::Arc, }; -mod non_min_i64; -mod time_int; -mod timeline; +use arrow::datatypes::DataType as ArrowDataType; use crate::{ time::{Time, TimeZone}, ResolvedTimeRange, }; -// Re-exports -pub use non_min_i64::{NonMinI64, TryFromIntError}; -pub use time_int::TimeInt; -pub use timeline::{Timeline, TimelineName}; +mod non_min_i64; +mod time_int; +mod timeline; + +pub use self::{ + non_min_i64::{NonMinI64, TryFromIntError}, + time_int::TimeInt, + timeline::{Timeline, TimelineName}, +}; /// A point in time on any number of [`Timeline`]s. /// @@ -188,12 +191,18 @@ impl TimeType { /// Returns the appropriate arrow datatype to represent this timeline. #[inline] - pub fn datatype(self) -> arrow::datatypes::DataType { + pub fn datatype(self) -> ArrowDataType { match self { - Self::Time => { - arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None) - } - Self::Sequence => arrow::datatypes::DataType::Int64, + Self::Time => ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Nanosecond, None), + Self::Sequence => ArrowDataType::Int64, + } + } + + pub fn from_arrow_datatype(datatype: &ArrowDataType) -> Option { + match datatype { + ArrowDataType::Timestamp(_, _) => Some(Self::Time), + ArrowDataType::Int64 => Some(Self::Sequence), + _ => None, } } diff --git a/crates/store/re_sorbet/Cargo.toml b/crates/store/re_sorbet/Cargo.toml new file mode 100644 index 000000000000..9cbedebbf68e --- /dev/null +++ b/crates/store/re_sorbet/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "re_sorbet" +authors.workspace = true +description = "Rerun arrow metadata definitions" +edition.workspace = true +homepage.workspace = true +include.workspace = true +license.workspace = true +publish = true +readme = "README.md" +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[lints] +workspace = true + +[package.metadata.docs.rs] +all-features = true + + +[dependencies] +re_log_types.workspace = true +re_log.workspace = true +re_types_core.workspace = true + +arrow.workspace = true +thiserror.workspace = true + +# Keep this crate simple, with few dependencies. diff --git a/crates/store/re_sorbet/README.md b/crates/store/re_sorbet/README.md new file mode 100644 index 000000000000..5864e0eeaf46 --- /dev/null +++ b/crates/store/re_sorbet/README.md @@ -0,0 +1,12 @@ +# re_sorbet + +Part of the [`rerun`](https://github.com/rerun-io/rerun) family of crates. + +[![Latest version](https://img.shields.io/crates/v/re_sorbet.svg)](https://crates.io/crates/store/re_sorbet?speculative-link) +[![Documentation](https://docs.rs/re_sorbet/badge.svg)](https://docs.rs/re_sorbet?speculative-link) +![MIT](https://img.shields.io/badge/license-MIT-blue.svg) +![Apache](https://img.shields.io/badge/license-Apache-blue.svg) + +Rerun arrow metadata and record batch definitions. + +Handles the structure of arrow record batches and their meta data for different use cases for Rerun. diff --git a/crates/store/re_sorbet/src/data_column_schema.rs b/crates/store/re_sorbet/src/data_column_schema.rs new file mode 100644 index 000000000000..4c8ef9811dc8 --- /dev/null +++ b/crates/store/re_sorbet/src/data_column_schema.rs @@ -0,0 +1,241 @@ +use arrow::datatypes::{DataType as ArrowDatatype, Field as ArrowField}; + +use re_log_types::{ComponentPath, EntityPath}; +use re_types_core::{ArchetypeFieldName, ArchetypeName, ComponentDescriptor, ComponentName}; + +use crate::{ArrowFieldMetadata, MetadataExt as _, MissingFieldMetadata}; + +/// Describes a data/component column, such as `Position3D`, in a dataframe. +/// +/// This is an [`ArrowField`] that contains specific meta-data. +// +// TODO(#6889): Fully sorbetize this thing? `ArchetypeName` and such don't make sense in that +// context. And whatever `archetype_field_name` ends up being, it needs interning. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ComponentColumnDescriptor { + /// The Arrow datatype of the stored column. + /// + /// This is the log-time datatype corresponding to how this data is encoded + /// in a chunk. Currently this will always be an [`arrow::array::ListArray`], but as + /// we introduce mono-type optimization, this might be a native type instead. + pub store_datatype: ArrowDatatype, + + /// The path of the entity. + pub entity_path: EntityPath, + + /// Optional name of the `Archetype` associated with this data. + /// + /// `None` if the data wasn't logged through an archetype. + /// + /// Example: `rerun.archetypes.Points3D`. + pub archetype_name: Option, + + /// Optional name of the field within `Archetype` associated with this data. + /// + /// `None` if the data wasn't logged through an archetype. + /// + /// Example: `positions`. + pub archetype_field_name: Option, + + /// Semantic name associated with this data. + /// + /// This is fully implied by `archetype_name` and `archetype_field`, but + /// included for semantic convenience. + /// + /// Example: `rerun.components.Position3D`. + pub component_name: ComponentName, + + /// Whether this column represents static data. + pub is_static: bool, + + /// Whether this column represents an indicator component. + pub is_indicator: bool, + + /// Whether this column represents a [`Clear`]-related components. + /// + /// [`Clear`]: re_types_core::archetypes::Clear + pub is_tombstone: bool, + + /// Whether this column contains either no data or only contains null and/or empty values (`[]`). + pub is_semantically_empty: bool, +} + +impl PartialOrd for ComponentColumnDescriptor { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ComponentColumnDescriptor { + #[inline] + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + let Self { + entity_path, + archetype_name, + archetype_field_name, + component_name, + store_datatype: _, + is_static: _, + is_indicator: _, + is_tombstone: _, + is_semantically_empty: _, + } = self; + + entity_path + .cmp(&other.entity_path) + .then_with(|| archetype_name.cmp(&other.archetype_name)) + .then_with(|| archetype_field_name.cmp(&other.archetype_field_name)) + .then_with(|| component_name.cmp(&other.component_name)) + } +} + +impl std::fmt::Display for ComponentColumnDescriptor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + entity_path, + archetype_name, + archetype_field_name, + component_name, + store_datatype: _, + is_static, + is_indicator: _, + is_tombstone: _, + is_semantically_empty: _, + } = self; + + let descriptor = ComponentDescriptor { + archetype_name: *archetype_name, + archetype_field_name: *archetype_field_name, + component_name: *component_name, + }; + + let s = format!("{entity_path}@{}", descriptor.short_name()); + + if *is_static { + f.write_fmt(format_args!("|{s}|")) + } else { + f.write_str(&s) + } + } +} + +impl From for re_types_core::ComponentDescriptor { + #[inline] + fn from(descr: ComponentColumnDescriptor) -> Self { + Self { + archetype_name: descr.archetype_name, + archetype_field_name: descr.archetype_field_name, + component_name: descr.component_name, + } + } +} + +impl From<&ComponentColumnDescriptor> for re_types_core::ComponentDescriptor { + #[inline] + fn from(descr: &ComponentColumnDescriptor) -> Self { + Self { + archetype_name: descr.archetype_name, + archetype_field_name: descr.archetype_field_name, + component_name: descr.component_name, + } + } +} + +impl ComponentColumnDescriptor { + pub fn component_path(&self) -> ComponentPath { + ComponentPath { + entity_path: self.entity_path.clone(), + component_name: self.component_name, + } + } + + #[inline] + pub fn matches(&self, entity_path: &EntityPath, component_name: &str) -> bool { + &self.entity_path == entity_path && self.component_name.matches(component_name) + } + + fn metadata(&self) -> ArrowFieldMetadata { + let Self { + entity_path, + archetype_name, + archetype_field_name, + component_name, + store_datatype: _, + is_static, + is_indicator, + is_tombstone, + is_semantically_empty, + } = self; + + // TODO(#6889): This needs some proper sorbetization -- I just threw these names randomly. + [ + (*is_static).then_some(("sorbet.is_static".to_owned(), "yes".to_owned())), + (*is_indicator).then_some(("sorbet.is_indicator".to_owned(), "yes".to_owned())), + (*is_tombstone).then_some(("sorbet.is_tombstone".to_owned(), "yes".to_owned())), + (*is_semantically_empty) + .then_some(("sorbet.is_semantically_empty".to_owned(), "yes".to_owned())), + Some(("sorbet.path".to_owned(), entity_path.to_string())), + Some(( + "sorbet.semantic_type".to_owned(), + component_name.short_name().to_owned(), + )), + archetype_name.map(|name| { + ( + "sorbet.semantic_family".to_owned(), + name.short_name().to_owned(), + ) + }), + archetype_field_name + .as_ref() + .map(|name| ("sorbet.logical_type".to_owned(), name.to_string())), + ] + .into_iter() + .flatten() + .collect() + } + + #[inline] + pub fn returned_datatype(&self) -> ArrowDatatype { + self.store_datatype.clone() + } + + #[inline] + pub fn to_arrow_field(&self) -> ArrowField { + let entity_path = &self.entity_path; + let descriptor = ComponentDescriptor { + archetype_name: self.archetype_name, + archetype_field_name: self.archetype_field_name, + component_name: self.component_name, + }; + + ArrowField::new( + // NOTE: Uncomment this to expose fully-qualified names in the Dataframe APIs! + // I'm not doing that right now, to avoid breaking changes (and we need to talk about + // what the syntax for these fully-qualified paths need to look like first). + format!("{}:{}", entity_path, descriptor.component_name.short_name()), + // format!("{entity_path}@{}", descriptor.short_name()), + self.returned_datatype(), + true, /* nullable */ + ) + .with_metadata(self.metadata()) + } +} + +impl TryFrom<&ArrowField> for ComponentColumnDescriptor { + type Error = MissingFieldMetadata; + + fn try_from(field: &ArrowField) -> Result { + Ok(Self { + store_datatype: field.data_type().clone(), + entity_path: EntityPath::parse_forgiving(field.get_or_err("sorbet.path")?), + archetype_name: field.get_opt("sorbet.semantic_family").map(|x| x.into()), + archetype_field_name: field.get_opt("sorbet.logical_type").map(|x| x.into()), + component_name: field.get_or_err("sorbet.semantic_type")?.into(), + is_static: field.get_bool("sorbet.is_static"), + is_indicator: field.get_bool("sorbet.is_indicator"), + is_tombstone: field.get_bool("sorbet.is_tombstone"), + is_semantically_empty: field.get_bool("sorbet.is_semantically_empty"), + }) + } +} diff --git a/crates/store/re_sorbet/src/index_column_schema.rs b/crates/store/re_sorbet/src/index_column_schema.rs new file mode 100644 index 000000000000..255ea8857865 --- /dev/null +++ b/crates/store/re_sorbet/src/index_column_schema.rs @@ -0,0 +1,118 @@ +use arrow::datatypes::{DataType as ArrowDatatype, Field as ArrowField}; +use re_log_types::{Timeline, TimelineName}; + +#[derive(thiserror::Error, Debug)] +#[error("Unsupported time type: {datatype:?}")] +pub struct UnsupportedTimeType { + pub datatype: ArrowDatatype, +} + +/// Describes a time column, such as `log_time`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct TimeColumnDescriptor { + /// The timeline this column is associated with. + pub timeline: Timeline, + + /// The Arrow datatype of the column. + pub datatype: ArrowDatatype, +} + +impl PartialOrd for TimeColumnDescriptor { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimeColumnDescriptor { + #[inline] + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + let Self { + timeline, + datatype: _, + } = self; + timeline.cmp(&other.timeline) + } +} + +impl TimeColumnDescriptor { + /// Used when returning a null column, i.e. when a lookup failed. + #[inline] + pub fn new_null(name: TimelineName) -> Self { + Self { + // TODO(cmc): I picked a sequence here because I have to pick something. + // It doesn't matter, only the name will remain in the Arrow schema anyhow. + timeline: Timeline::new_sequence(name), + datatype: ArrowDatatype::Null, + } + } + + #[inline] + pub fn timeline(&self) -> Timeline { + self.timeline + } + + #[inline] + pub fn name(&self) -> &TimelineName { + self.timeline.name() + } + + #[inline] + pub fn typ(&self) -> re_log_types::TimeType { + self.timeline.typ() + } + + #[inline] + pub fn datatype(&self) -> &ArrowDatatype { + &self.datatype + } + + #[inline] + pub fn to_arrow_field(&self) -> ArrowField { + let Self { timeline, datatype } = self; + + let nullable = true; // Time column must be nullable since static data doesn't have a time. + + let metadata = std::iter::once(Some(( + "sorbet.index_name".to_owned(), + timeline.name().to_string(), + ))) + .flatten() + .collect(); + + ArrowField::new(timeline.name().to_string(), datatype.clone(), nullable) + .with_metadata(metadata) + } +} + +impl From for TimeColumnDescriptor { + fn from(timeline: Timeline) -> Self { + Self { + timeline, + datatype: timeline.datatype(), + } + } +} + +impl TryFrom<&ArrowField> for TimeColumnDescriptor { + type Error = UnsupportedTimeType; + + fn try_from(field: &ArrowField) -> Result { + let name = if let Some(name) = field.metadata().get("sorbet.index_name") { + name.to_owned() + } else { + re_log::warn_once!("Timeline '{}' is missing 'sorbet.index_name' metadata. Falling back on field/column name", field.name()); + field.name().to_owned() + }; + + let datatype = field.data_type().clone(); + + let Some(time_type) = re_log_types::TimeType::from_arrow_datatype(&datatype) else { + return Err(UnsupportedTimeType { datatype }); + }; + + let timeline = Timeline::new(name, time_type); + + Ok(Self { timeline, datatype }) + } +} diff --git a/crates/store/re_sorbet/src/lib.rs b/crates/store/re_sorbet/src/lib.rs new file mode 100644 index 000000000000..4d2495dbb515 --- /dev/null +++ b/crates/store/re_sorbet/src/lib.rs @@ -0,0 +1,16 @@ +//! Rerun arrow metadata and record batch definitions. +//! +//! Handles the structure of arrow record batches and their meta data for different use cases for Rerun. + +mod data_column_schema; +mod index_column_schema; +mod metadata; + +pub use self::{ + data_column_schema::ComponentColumnDescriptor, + index_column_schema::TimeColumnDescriptor, + metadata::{ + ArrowBatchMetadata, ArrowFieldMetadata, MetadataExt, MissingFieldMetadata, + MissingMetadataKey, + }, +}; diff --git a/crates/store/re_sorbet/src/metadata.rs b/crates/store/re_sorbet/src/metadata.rs new file mode 100644 index 000000000000..fe21a0df7ea6 --- /dev/null +++ b/crates/store/re_sorbet/src/metadata.rs @@ -0,0 +1,69 @@ +use std::collections::HashMap; + +use arrow::datatypes::Field as ArrowField; + +/// Arrow metadata for an arrow record batch. +pub type ArrowBatchMetadata = HashMap; + +/// Arrow metadata for a column/field. +pub type ArrowFieldMetadata = HashMap; + +#[derive(thiserror::Error, Debug)] +#[error("Missing metadata {key:?}")] +pub struct MissingMetadataKey { + pub key: String, +} + +#[derive(thiserror::Error, Debug)] +#[error("Field {field_name:?} is missing metadata {metadata_key:?}")] +pub struct MissingFieldMetadata { + pub field_name: String, + pub metadata_key: String, +} + +/// Make it more ergonomic to work with arrow metadata. +pub trait MetadataExt { + type Error; + + fn missing_key_error(&self, key: &str) -> Self::Error; + fn get_opt(&self, key: &str) -> Option<&str>; + + fn get_or_err(&self, key: &str) -> Result<&str, Self::Error> { + self.get_opt(key).ok_or_else(|| self.missing_key_error(key)) + } + + fn get_bool(&self, key: &str) -> bool { + self.get_opt(key) + .map(|value| !matches!(value.to_lowercase().as_str(), "false" | "no")) + .unwrap_or(false) + } +} + +impl MetadataExt for HashMap { + type Error = MissingMetadataKey; + + fn missing_key_error(&self, key: &str) -> Self::Error { + MissingMetadataKey { + key: key.to_owned(), + } + } + + fn get_opt(&self, key: &str) -> Option<&str> { + self.get(key).map(|value| value.as_str()) + } +} + +impl MetadataExt for ArrowField { + type Error = MissingFieldMetadata; + + fn missing_key_error(&self, key: &str) -> Self::Error { + MissingFieldMetadata { + field_name: self.name().clone(), + metadata_key: key.to_owned(), + } + } + + fn get_opt(&self, key: &str) -> Option<&str> { + self.metadata().get(key).map(|v| v.as_str()) + } +} diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index 9ffaad4c54d9..271beda23cec 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -107,15 +107,11 @@ pub fn build_chunk_from_components( .into_iter() .zip(timeline_names) .map(|(array, timeline_name)| { - let timeline = match array.data_type() { - arrow::datatypes::DataType::Int64 => Ok(Timeline::new_sequence(timeline_name)), - arrow::datatypes::DataType::Timestamp(_, _) => { - Ok(Timeline::new_temporal(timeline_name)) - } - _ => Err(ChunkError::Malformed { + let time_type = re_log_types::TimeType::from_arrow_datatype(array.data_type()) + .ok_or_else(|| ChunkError::Malformed { reason: format!("Invalid data_type for timeline: {timeline_name}"), - }), - }?; + })?; + let timeline = Timeline::new(timeline_name, time_type); let timeline_data = TimeColumn::read_array(&ArrowArrayRef::from(array)).map_err(|err| { ChunkError::Malformed {