Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Arrow IPC to encode the column schema #8821

Merged
merged 13 commits into from
Jan 28, 2025
4 changes: 1 addition & 3 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9579b9d8bce47aa41389fe344f2c6758279983b7c0ebb4013e283e3e91bb450e"
dependencies = [
"bitflags 2.8.0",
"serde",
]

[[package]]
Expand Down Expand Up @@ -5739,7 +5738,6 @@ dependencies = [
"ahash",
"anyhow",
"arrow",
"arrow-schema",
"criterion",
"document-features",
"indent",
Expand All @@ -5762,7 +5760,6 @@ dependencies = [
"re_tracing",
"re_types",
"re_types_core",
"serde_json",
"similar-asserts",
"thiserror 1.0.65",
"tinyvec",
Expand Down Expand Up @@ -7459,6 +7456,7 @@ dependencies = [
"re_memory",
"re_protos",
"re_sdk",
"re_sorbet",
"re_video",
"re_web_viewer_server",
"re_ws_comms",
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ anyhow = { version = "1.0", default-features = false }
argh = "0.1.12"
array-init = "2.1"
arrow = { version = "53.4", default-features = false }
arrow-schema = { version = "53.4", default-features = false }
arrow2 = { package = "re_arrow2", version = "0.18.2", features = ["arrow"] }
async-executor = "1.0"
async-stream = "0.3"
Expand Down
8 changes: 0 additions & 8 deletions crates/store/re_chunk_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ workspace = true
[package.metadata.docs.rs]
all-features = true

[package.metadata.cargo-machete]
ignored = [
# Needed to enable the serde feature of arrow `Datatype`
"arrow-schema",
]


[features]
default = []
Expand All @@ -50,14 +44,12 @@ re_types_core.workspace = true
ahash.workspace = true
anyhow.workspace = true
arrow.workspace = true
arrow-schema = { workspace = true, features = ["serde"] }
document-features.workspace = true
indent.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
once_cell.workspace = true
parking_lot = { workspace = true, features = ["arc_lock"] }
serde_json.workspace = true
thiserror.workspace = true
web-time.workspace = true

Expand Down
61 changes: 1 addition & 60 deletions crates/store/re_chunk_store/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,70 +13,11 @@ use itertools::Itertools;

use re_chunk::TimelineName;
use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, Timeline};
use re_sorbet::{ComponentColumnDescriptor, TimeColumnDescriptor};
use re_sorbet::{ColumnDescriptor, ComponentColumnDescriptor, TimeColumnDescriptor};
use re_types_core::ComponentName;

use crate::{ChunkStore, ColumnMetadata};

// --- Descriptors ---

// TODO(#6889): At some point all these descriptors needs to be interned and have handles or
// something. And of course they need to be codegen. But we'll get there once we're back to
// natively tagged components.

// Describes any kind of column.
//
// See:
// * [`TimeColumnDescriptor`]
// * [`ComponentColumnDescriptor`]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ColumnDescriptor {
Time(TimeColumnDescriptor),
Component(ComponentColumnDescriptor),
}

impl ColumnDescriptor {
#[inline]
pub fn entity_path(&self) -> Option<&EntityPath> {
match self {
Self::Time(_) => None,
Self::Component(descr) => Some(&descr.entity_path),
}
}

#[inline]
pub fn arrow_datatype(&self) -> ArrowDatatype {
match self {
Self::Time(descr) => descr.datatype.clone(),
Self::Component(descr) => descr.returned_datatype(),
}
}

#[inline]
pub fn to_arrow_field(&self) -> ArrowField {
match self {
Self::Time(descr) => descr.to_arrow_field(),
Self::Component(descr) => descr.to_arrow_field(),
}
}

#[inline]
pub fn short_name(&self) -> String {
match self {
Self::Time(descr) => descr.timeline.name().to_string(),
Self::Component(descr) => descr.component_name.short_name().to_owned(),
}
}

#[inline]
pub fn is_static(&self) -> bool {
match self {
Self::Time(_) => false,
Self::Component(descr) => descr.is_static,
}
}
}

// --- Selectors ---

/// Describes a column selection to return as part of a query.
Expand Down
6 changes: 3 additions & 3 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ mod protobuf_conversions;

pub use self::{
dataframe::{
ColumnDescriptor, ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue,
QueryExpression, SparseFillStrategy, TimeColumnSelector, ViewContentsSelector,
ColumnSelector, ComponentColumnSelector, Index, IndexRange, IndexValue, QueryExpression,
SparseFillStrategy, TimeColumnSelector, ViewContentsSelector,
},
events::{ChunkCompactionReport, ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent},
gc::{GarbageCollectionOptions, GarbageCollectionTarget},
stats::{ChunkStoreChunkStats, ChunkStoreStats},
store::{ChunkStore, ChunkStoreConfig, ChunkStoreGeneration, ChunkStoreHandle, ColumnMetadata},
subscribers::{ChunkStoreSubscriber, ChunkStoreSubscriberHandle, PerStoreChunkSubscriber},
};
pub use re_sorbet::{ComponentColumnDescriptor, TimeColumnDescriptor};
pub use re_sorbet::{ColumnDescriptor, ComponentColumnDescriptor, TimeColumnDescriptor};

pub(crate) use self::store::ColumnMetadataState;

Expand Down
163 changes: 3 additions & 160 deletions crates/store/re_chunk_store/src/protobuf_conversions.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use re_protos::invalid_field;
use re_protos::missing_field;
use re_protos::TypeConversionError;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet};

use re_protos::{missing_field, TypeConversionError};

impl TryFrom<re_protos::common::v0::ComponentColumnSelector> for crate::ComponentColumnSelector {
type Error = TypeConversionError;
Expand Down Expand Up @@ -258,112 +256,6 @@ impl From<crate::QueryExpression> for re_protos::common::v0::Query {
}
}

impl TryFrom<crate::ColumnDescriptor> for re_protos::common::v0::ColumnDescriptor {
type Error = TypeConversionError;

fn try_from(value: crate::ColumnDescriptor) -> Result<Self, Self::Error> {
match value {
crate::ColumnDescriptor::Time(time_descriptor) => Ok(Self {
descriptor_type: Some(
re_protos::common::v0::column_descriptor::DescriptorType::TimeColumn(
re_protos::common::v0::TimeColumnDescriptor {
timeline: Some(re_protos::common::v0::Timeline {
name: time_descriptor.timeline.name().to_string(),
}),
datatype: serde_json::to_string(&time_descriptor.datatype).map_err(
|err| invalid_field!(Self, "time column descriptor", err),
)?,
},
),
),
}),
crate::ColumnDescriptor::Component(component_descriptor) => Ok(Self {
descriptor_type: Some(
re_protos::common::v0::column_descriptor::DescriptorType::ComponentColumn(
re_protos::common::v0::ComponentColumnDescriptor {
entity_path: Some(component_descriptor.entity_path.into()),
archetype_name: component_descriptor
.archetype_name
.map(|an| an.to_string()),
archetype_field_name: component_descriptor
.archetype_field_name
.map(|afn| afn.to_string()),
component_name: component_descriptor.component_name.to_string(),
datatype: serde_json::to_string(&component_descriptor.store_datatype)
.map_err(|err| {
invalid_field!(Self, "component column descriptor", err)
})?,
is_static: component_descriptor.is_static,
is_tombstone: component_descriptor.is_tombstone,
is_semantically_empty: component_descriptor.is_semantically_empty,
is_indicator: component_descriptor.is_indicator,
},
),
),
}),
}
}
}

impl TryFrom<re_protos::common::v0::ColumnDescriptor> for crate::ColumnDescriptor {
type Error = TypeConversionError;

fn try_from(value: re_protos::common::v0::ColumnDescriptor) -> Result<Self, Self::Error> {
let descriptor = value.descriptor_type.ok_or(missing_field!(
re_protos::common::v0::ColumnDescriptor,
"descriptor_type",
))?;

match descriptor {
re_protos::common::v0::column_descriptor::DescriptorType::TimeColumn(
time_descriptor,
) => Ok(Self::Time(crate::TimeColumnDescriptor {
timeline: time_descriptor
.timeline
.ok_or(missing_field!(
re_protos::common::v0::TimeColumnDescriptor,
"timeline",
))?
.into(),
datatype: serde_json::from_str(&time_descriptor.datatype).map_err(|err| {
invalid_field!(
re_protos::common::v0::ColumnDescriptor,
"time column descriptor",
err
)
})?,
})),
re_protos::common::v0::column_descriptor::DescriptorType::ComponentColumn(
component_descriptor,
) => Ok(Self::Component(crate::ComponentColumnDescriptor {
entity_path: component_descriptor
.entity_path
.ok_or(missing_field!(
re_protos::common::v0::ComponentColumnDescriptor,
"entity_path",
))?
.try_into()?,
archetype_name: component_descriptor.archetype_name.map(Into::into),
archetype_field_name: component_descriptor.archetype_field_name.map(Into::into),
component_name: component_descriptor.component_name.into(),
store_datatype: serde_json::from_str(&component_descriptor.datatype).map_err(
|err| {
invalid_field!(
re_protos::common::v0::ColumnDescriptor,
"component column descriptor",
err
)
},
)?,
is_static: component_descriptor.is_static,
is_tombstone: component_descriptor.is_tombstone,
is_semantically_empty: component_descriptor.is_semantically_empty,
is_indicator: component_descriptor.is_indicator,
})),
}
}
}

#[cfg(test)]
mod tests {
use re_protos::common::v0::{
Expand Down Expand Up @@ -442,53 +334,4 @@ mod tests {

assert_eq!(grpc_query_before, grpc_query_after);
}

#[test]
fn test_time_column_descriptor_conversion() {
let time_descriptor = crate::TimeColumnDescriptor {
timeline: crate::Timeline::log_time(),
datatype: arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Nanosecond,
None,
),
};

let descriptor = crate::ColumnDescriptor::Time(time_descriptor.clone());

let proto_descriptor: re_protos::common::v0::ColumnDescriptor =
descriptor.try_into().unwrap();
let descriptor_after = proto_descriptor.try_into().unwrap();
let crate::ColumnDescriptor::Time(time_descriptor_after) = descriptor_after else {
panic!("Expected TimeColumnDescriptor")
};

assert_eq!(time_descriptor, time_descriptor_after);
}

#[test]
fn test_component_column_descriptor_conversion() {
let component_descriptor = crate::ComponentColumnDescriptor {
entity_path: re_log_types::EntityPath::from("/some/path"),
archetype_name: Some("archetype".to_owned().into()),
archetype_field_name: Some("field".to_owned().into()),
component_name: re_chunk::ComponentName::new("component"),
store_datatype: arrow::datatypes::DataType::Int64,
is_static: true,
is_tombstone: false,
is_semantically_empty: false,
is_indicator: true,
};

let descriptor = crate::ColumnDescriptor::Component(component_descriptor.clone());

let proto_descriptor: re_protos::common::v0::ColumnDescriptor =
descriptor.try_into().unwrap();
let descriptor_after = proto_descriptor.try_into().unwrap();
let crate::ColumnDescriptor::Component(component_descriptor_after) = descriptor_after
else {
panic!("Expected ComponentColumnDescriptor")
};

assert_eq!(component_descriptor, component_descriptor_after);
}
}
Loading
Loading