From c30a10b2a687dcf063ecf2d61b0720588397af98 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Mon, 27 May 2024 16:58:31 +0200 Subject: [PATCH] introduce re_chunk and shuffling routines --- ARCHITECTURE.md | 13 +- Cargo.lock | 30 ++ Cargo.toml | 1 + crates/re_chunk/Cargo.toml | 66 ++++ crates/re_chunk/README.md | 10 + crates/re_chunk/src/chunk.rs | 478 +++++++++++++++++++++++ crates/re_chunk/src/lib.rs | 16 + crates/re_chunk/src/shuffle.rs | 379 ++++++++++++++++++ crates/re_chunk/src/util.rs | 49 +++ examples/rust/objectron/src/objectron.rs | 1 + 10 files changed, 1037 insertions(+), 6 deletions(-) create mode 100644 crates/re_chunk/Cargo.toml create mode 100644 crates/re_chunk/README.md create mode 100644 crates/re_chunk/src/chunk.rs create mode 100644 crates/re_chunk/src/lib.rs create mode 100644 crates/re_chunk/src/shuffle.rs create mode 100644 crates/re_chunk/src/util.rs diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 8c364bda6cf3b..81467a8519a07 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -167,12 +167,13 @@ Update instructions: ### Low-level store -| Crate | Description | -|-----------------|-----------------------------------------------------------------------------| -| re_data_store | An in-memory time series database for Rerun log data, based on Apache Arrow | -| re_log_types | The basic building blocks of the Rerun data types and tables. | -| re_types_core | The core traits and types that power Rerun's data model. | -| re_format_arrow | Formatting of Apache Arrow tables | +| Crate | Description | +|-----------------|-----------------------------------------------------------------------------------------------| +| re_chunk | A chunk of Rerun data, encoded using Arrow. Used for logging, transport, storage and compute. | +| re_data_store | An in-memory time series database for Rerun log data, based on Apache Arrow. | +| re_log_types | The basic building blocks of the Rerun data types and tables. | +| re_types_core | The core traits and types that power Rerun's data model. | +| re_format_arrow | Formatting of Apache Arrow tables. | ### Data flow diff --git a/Cargo.lock b/Cargo.lock index e11b9955cf1c1..0ee091366858b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4212,6 +4212,36 @@ dependencies = [ "walkdir", ] +[[package]] +name = "re_chunk" +version = "0.17.0-alpha.2" +dependencies = [ + "ahash", + "anyhow", + "backtrace", + "criterion", + "crossbeam", + "document-features", + "itertools 0.12.0", + "mimalloc", + "nohash-hasher", + "rand", + "re_arrow2", + "re_build_info", + "re_format", + "re_format_arrow", + "re_log", + "re_log_types", + "re_string_interner", + "re_tracing", + "re_tuid", + "re_types_core", + "similar-asserts", + "smallvec", + "static_assertions", + "thiserror", +] + [[package]] name = "re_context_menu" version = "0.17.0-alpha.2" diff --git a/Cargo.toml b/Cargo.toml index a494bfbcdf25e..08e1a06e2542c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ re_analytics = { path = "crates/re_analytics", version = "=0.17.0-alpha.2", defa re_blueprint_tree = { path = "crates/re_blueprint_tree", version = "=0.17.0-alpha.2", default-features = false } re_build_info = { path = "crates/re_build_info", version = "=0.17.0-alpha.2", default-features = false } re_build_tools = { path = "crates/re_build_tools", version = "=0.17.0-alpha.2", default-features = false } +re_chunk = { path = "crates/re_chunk", version = "=0.17.0-alpha.2", default-features = false } re_crash_handler = { path = "crates/re_crash_handler", version = "=0.17.0-alpha.2", default-features = false } re_context_menu = { path = "crates/re_context_menu", version = "=0.17.0-alpha.2", default-features = false } re_data_loader = { path = "crates/re_data_loader", version = "=0.17.0-alpha.2", default-features = false } diff --git a/crates/re_chunk/Cargo.toml b/crates/re_chunk/Cargo.toml new file mode 100644 index 0000000000000..e9f1c56ac4c99 --- /dev/null +++ b/crates/re_chunk/Cargo.toml @@ -0,0 +1,66 @@ +[package] +name = "re_chunk" +authors.workspace = true +description = "A chunk of Rerun data, encoded using Arrow. Used for logging, transport, storage and compute." +edition.workspace = true +homepage.workspace = true +include.workspace = true +license.workspace = true +publish = true +readme = "README.md" +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[lints] +workspace = true + +[package.metadata.docs.rs] +all-features = true + + +[features] +default = [] + + +[dependencies] + +# Rerun +re_build_info.workspace = true +re_format.workspace = true +re_format_arrow.workspace = true +re_log.workspace = true +re_log_types.workspace = true +re_string_interner.workspace = true +re_tracing.workspace = true +re_tuid.workspace = true +re_types_core.workspace = true + +# External +ahash.workspace = true +anyhow.workspace = true +arrow2 = { workspace = true, features = [ + "io_ipc", + "io_print", + "compute_comparison", + "compute_concatenate", +] } +backtrace.workspace = true +document-features.workspace = true +itertools.workspace = true +nohash-hasher.workspace = true +rand = { workspace = true, features = ["std_rng"] } +similar-asserts.workspace = true +smallvec.workspace = true +static_assertions.workspace = true +thiserror.workspace = true + +# Native dependencies: +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +crossbeam.workspace = true + + +[dev-dependencies] +criterion.workspace = true +mimalloc.workspace = true +similar-asserts.workspace = true diff --git a/crates/re_chunk/README.md b/crates/re_chunk/README.md new file mode 100644 index 0000000000000..fe68e9a755ce6 --- /dev/null +++ b/crates/re_chunk/README.md @@ -0,0 +1,10 @@ +# re_chunk + +Part of the [`rerun`](https://github.com/rerun-io/rerun) family of crates. + +[![Latest version](https://img.shields.io/crates/v/re_chunk.svg)](https://crates.io/crates/re_chunk?speculative-link) +[![Documentation](https://docs.rs/re_chunk/badge.svg)](https://docs.rs/re_chunk?speculative-link) +![MIT](https://img.shields.io/badge/license-MIT-blue.svg) +![Apache](https://img.shields.io/badge/license-Apache-blue.svg) + +A chunk of Rerun data, encoded using Arrow. Used for logging, transport, storage and compute. diff --git a/crates/re_chunk/src/chunk.rs b/crates/re_chunk/src/chunk.rs new file mode 100644 index 0000000000000..e5ef457126ac2 --- /dev/null +++ b/crates/re_chunk/src/chunk.rs @@ -0,0 +1,478 @@ +use std::collections::BTreeMap; + +use arrow2::array::Array as ArrowArray; + +use re_log_types::{EntityPath, ResolvedTimeRange, RowId, TimeInt, TimePoint, Timeline}; +use re_types_core::{ComponentName, SerializationError}; + +// --- + +/// Errors that can occur when creating/manipulating a [`Chunk`]s. +#[derive(thiserror::Error, Debug)] +pub enum ChunkError { + #[error("Detected malformed Chunk: {reason}")] + Malformed { reason: String }, + + #[error(transparent)] + Serialization(#[from] SerializationError), + + #[error("Chunks cannot be empty")] + Empty, +} + +pub type ChunkResult = Result; + +// --- + +/// Unique identifier for a [`Chunk`], using a [`re_tuid::Tuid`]. +pub type ChunkId = re_tuid::Tuid; + +/// Dense arrow-based storage of N rows of multi-component multi-temporal data for a specific entity. +/// +/// This is our core datastructure for logging, storing, querying and transporting data around. +/// +/// The chunk as a whole is always ascendingly sorted by [`RowId`] before it gets manipulated in any way. +/// Its time columns might or might not be ascendingly sorted, depending on how the data was logged. +/// +/// This is the in-memory representation of a chunk, optimized for efficient manipulation of the +/// data within. +#[derive(Debug, Clone)] +pub struct Chunk { + pub(crate) id: ChunkId, + pub(crate) entity_path: EntityPath, + + /// Is the chunk as a whole sorted by [`RowId`]? + pub(crate) is_sorted: bool, + + /// The respective [`RowId`]s for each row of data. + pub(crate) row_ids: Vec, + + /// The time columns. + /// + /// Each column must be the same length as `row_ids`. + /// + /// Empty if this is a static chunk. + pub(crate) timelines: BTreeMap, + + /// A sparse `ListArray` for each component. + /// + /// Each `ListArray` must be the same length as `row_ids`. + /// + /// Sparse so that we can e.g. log a `Position` at one timestamp but not a `Color`. + pub(crate) components: BTreeMap>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChunkTimeline { + /// Every single timestamp for this timeline. + /// + /// * This might or might not be sorted, depending on how the data was logged. + /// * This is guaranteed to always be dense, because chunks are split anytime a timeline is + /// added or removed. + /// * This can never contain `TimeInt::STATIC`, since static data doesn't even have timelines. + // + // TODO(cmc): maybe this would be better as raw i64s so getting time columns in and out of + // chunks is just a blind memcpy… it's probably not worth the hassle for now though. + // We'll see how things evolve as we start putting chunks in the backend. + pub(crate) times: Vec, + + /// Is [`Self::times`] sorted? + /// + /// This is completely independent of [`Chunk::is_sorted`]: a timeline doesn't necessarily + /// follow the global [`RowId`]-based order, although it does in most cases (happy path). + pub(crate) is_sorted: bool, + + /// The time range covered by [`Self::times`]. + /// + /// Not necessarily contiguous! Just the min and max value found in [`Self::times`]. + pub(crate) time_range: ResolvedTimeRange, +} + +impl Default for ChunkTimeline { + #[inline] + fn default() -> Self { + Self { + times: Default::default(), + is_sorted: true, + time_range: ResolvedTimeRange::EMPTY, + } + } +} + +#[cfg(test)] // do not ever use this outside internal testing, it's extremely slow and hackish +impl PartialEq for Chunk { + #[inline] + fn eq(&self, rhs: &Self) -> bool { + let Self { + id: _, // we're comparing the contents + entity_path, + is_sorted, + row_ids, + timelines, + components, + } = self; + + use itertools::Itertools as _; + + *entity_path == rhs.entity_path + && *is_sorted == rhs.is_sorted + && *row_ids == rhs.row_ids + && *timelines == rhs.timelines + && components.keys().collect_vec() == rhs.components.keys().collect_vec() + && components.iter().all(|(component_name, list_array)| { + let Some(rhs_list_array) = rhs + .components + .get(component_name) + .map(|list_array| &**list_array) + else { + return false; + }; + + // `arrow2::compute::comparison` has very limited support for the different arrow + // types, so we just do our best here. + // This is just a testing/debugging tool. + if arrow2::compute::comparison::can_eq(list_array.data_type()) { + arrow2::compute::comparison::eq(&**list_array, rhs_list_array) + .values_iter() + .all(|v| v) + } else { + list_array.data_type() == rhs_list_array.data_type() + && list_array.len() == rhs_list_array.len() + } + }) + } +} + +#[cfg(test)] // do not ever use this outside internal testing, it's extremely slow and hackish +impl Eq for Chunk {} + +impl Chunk { + /// Creates a new [`Chunk`]. + /// + /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`] + /// for details. + /// + /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`. + /// When left unspecified (`None`), it will be computed in O(n) time. + pub fn new( + id: ChunkId, + entity_path: EntityPath, + is_sorted: Option, + row_ids: Vec, + timelines: BTreeMap, + components: BTreeMap>, + ) -> ChunkResult { + if row_ids.is_empty() { + return Err(ChunkError::Empty); + } + + let mut chunk = Self { + id, + entity_path, + is_sorted: false, + row_ids, + timelines: timelines + .into_iter() + .filter(|(_, time_chunk)| !time_chunk.times.is_empty()) + .collect(), + components, + }; + + chunk.is_sorted = is_sorted.unwrap_or_else(|| chunk.is_sorted_uncached()); + + chunk.sanity_check()?; + + Ok(chunk) + } + + /// Simple helper for [`Self::new`] for static data. + #[inline] + pub fn new_static( + id: ChunkId, + entity_path: EntityPath, + is_sorted: Option, + row_ids: Vec, + components: BTreeMap>, + ) -> ChunkResult { + Self::new( + id, + entity_path, + is_sorted, + row_ids, + Default::default(), + components, + ) + } +} + +impl ChunkTimeline { + /// Creates a new [`ChunkTimeline`]. + /// + /// Returns `None` if `times` is empty. + /// + /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`. + /// When left unspecified (`None`), it will be computed in O(n) time. + pub fn new(is_sorted: Option, times: Vec) -> Option { + re_tracing::profile_function!(format!("{} times", times.len())); + + if times.is_empty() { + return None; + } + + let is_sorted = + is_sorted.unwrap_or_else(|| times.windows(2).all(|times| times[0] <= times[1])); + + let time_range = if is_sorted { + // NOTE: The 'or' in 'unwrap_or' is never hit, but better safe than sorry. + let min_time = times.first().copied().unwrap_or(TimeInt::MIN); + let max_time = times.last().copied().unwrap_or(TimeInt::MAX); + ResolvedTimeRange::new(min_time, max_time) + } else { + // NOTE: Do the iteration multiple times in a cache-friendly way rather than the opposite. + // NOTE: The 'or' in 'unwrap_or' is never hit, but better safe than sorry. + let min_time = times.iter().min().copied().unwrap_or(TimeInt::MIN); + let max_time = times.iter().max().copied().unwrap_or(TimeInt::MAX); + ResolvedTimeRange::new(min_time, max_time) + }; + + Some(Self { + times, + is_sorted, + time_range, + }) + } + + /// Push a single time value at the end of this chunk. + #[inline] + pub fn push(&mut self, time: TimeInt) { + let Self { + times, + is_sorted, + time_range, + } = self; + + *is_sorted &= times.last().copied().unwrap_or(TimeInt::MIN) <= time; + time_range.set_min(TimeInt::min(time_range.min(), time)); + time_range.set_max(TimeInt::max(time_range.max(), time)); + times.push(time); + } +} + +// --- + +impl Chunk { + #[inline] + pub fn id(&self) -> ChunkId { + self.id + } + + #[inline] + pub fn entity_path(&self) -> &EntityPath { + &self.entity_path + } + + /// How many columns in total? Includes control, time, and component columns. + #[inline] + pub fn num_columns(&self) -> usize { + let Self { + id: _, + entity_path: _, // not an actual column + is_sorted: _, + row_ids: _, + timelines, + components, + } = self; + + 1 /* row_ids */ + timelines.len() + components.len() + } + + #[inline] + pub fn num_controls(&self) -> usize { + _ = self; + 1 /* row_ids */ + } + + #[inline] + pub fn num_timelines(&self) -> usize { + self.timelines.len() + } + + #[inline] + pub fn num_components(&self) -> usize { + self.components.len() + } + + #[inline] + pub fn num_rows(&self) -> usize { + self.row_ids.len() + } + + /// Returns the [`RowId`]-range in this [`Chunk`]. + /// + /// This is O(1) if the chunk is sorted, O(n) otherwise. + #[inline] + pub fn row_id_range(&self) -> (RowId, RowId) { + #[allow(clippy::unwrap_used)] // cannot create empty chunks + if self.is_sorted() { + ( + self.row_ids.first().copied().unwrap(), + self.row_ids.last().copied().unwrap(), + ) + } else { + ( + self.row_ids.iter().min().copied().unwrap(), + self.row_ids.iter().max().copied().unwrap(), + ) + } + } + + /// Computes the maximum value for each and every timeline present across this entire chunk, + /// and returns the corresponding [`TimePoint`]. + #[inline] + pub fn timepoint_max(&self) -> TimePoint { + self.timelines + .iter() + .map(|(timeline, info)| (*timeline, info.time_range.max())) + .collect() + } +} + +// TODO(cmc): display impl + +// TODO(cmc): sizebytes impl + sizebytes caching + sizebytes in transport metadata + +// TODO(cmc): methods to merge chunks (compaction). + +// --- Sanity checks --- + +impl Chunk { + /// Returns an error if the Chunk's invariants are not upheld. + /// + /// Costly checks are only run in debug builds. + pub fn sanity_check(&self) -> ChunkResult<()> { + re_tracing::profile_function!(); + + let Self { + id: _, + entity_path: _, + is_sorted, + row_ids, + timelines, + components, + } = self; + + if row_ids.is_empty() || components.is_empty() { + return Err(ChunkError::Empty); + } + + // Row IDs + #[allow(clippy::collapsible_if)] // readability + if cfg!(debug_assertions) { + if *is_sorted != self.is_sorted_uncached() { + return Err(ChunkError::Malformed { + reason: format!( + "Chunk is marked as {}sorted but isn't: {row_ids:?}", + if *is_sorted { "" } else { "un" }, + ), + }); + } + } + + // Timelines + for (timeline, time_chunk) in timelines { + if time_chunk.times.len() != row_ids.len() { + return Err(ChunkError::Malformed { + reason: format!( + "All timelines in a chunk must have the same number of timestamps, matching the number of row IDs.\ + Found {} row IDs but {} timestamps for timeline {:?}", + row_ids.len(), time_chunk.times.len(), timeline.name(), + ), + }); + } + + time_chunk.sanity_check()?; + } + + // Components + for (component_name, list_array) in components { + if !matches!(list_array.data_type(), arrow2::datatypes::DataType::List(_)) { + return Err(ChunkError::Malformed { + reason: format!( + "The outer array in a chunked component batch must be a sparse list, got {:?}", + list_array.data_type(), + ), + }); + } + if let arrow2::datatypes::DataType::List(field) = list_array.data_type() { + if !field.is_nullable { + return Err(ChunkError::Malformed { + reason: format!( + "The outer array in chunked component batch must be a sparse list, got {:?}", + list_array.data_type(), + ), + }); + } + } + if list_array.len() != row_ids.len() { + return Err(ChunkError::Malformed { + reason: format!( + "All component batches in a chunk must have the same number of rows, matching the number of row IDs.\ + Found {} row IDs but {} rows for component batch {component_name}", + row_ids.len(), list_array.len(), + ), + }); + } + } + + Ok(()) + } +} + +impl ChunkTimeline { + /// Returns an error if the Chunk's invariants are not upheld. + /// + /// Costly checks are only run in debug builds. + pub fn sanity_check(&self) -> ChunkResult<()> { + let Self { + times, + is_sorted, + time_range, + } = self; + + #[allow(clippy::collapsible_if)] // readability + if cfg!(debug_assertions) { + if *is_sorted != times.windows(2).all(|times| times[0] <= times[1]) { + return Err(ChunkError::Malformed { + reason: format!( + "Chunk timeline is marked as {}sorted but isn't: {times:?}", + if *is_sorted { "" } else { "un" }, + ), + }); + } + } + + #[allow(clippy::collapsible_if)] // readability + if cfg!(debug_assertions) { + for &time in times { + if time < time_range.min() || time > time_range.max() { + return Err(ChunkError::Malformed { + reason: format!( + "Chunk timeline's cached time range is wrong.\ + Found a time value of {} while its time range is {time_range:?}", + time.as_i64(), + ), + }); + } + + if time.is_static() { + return Err(ChunkError::Malformed { + reason: "A chunk's timeline should never contain a static time value." + .to_owned(), + }); + } + } + } + + Ok(()) + } +} diff --git a/crates/re_chunk/src/lib.rs b/crates/re_chunk/src/lib.rs new file mode 100644 index 0000000000000..1479770e83515 --- /dev/null +++ b/crates/re_chunk/src/lib.rs @@ -0,0 +1,16 @@ +//! A chunk of Rerun data, encoded using Arrow. Used for logging, transport, storage and compute. +//! +//! ## Feature flags +#![doc = document_features::document_features!()] +//! + +mod chunk; +mod shuffle; +mod util; + +pub use self::chunk::{Chunk, ChunkError, ChunkId, ChunkResult, ChunkTimeline}; +pub use self::util::arrays_to_list_array; + +pub mod external { + pub use arrow2; +} diff --git a/crates/re_chunk/src/shuffle.rs b/crates/re_chunk/src/shuffle.rs new file mode 100644 index 0000000000000..fdc5f4e28c0dc --- /dev/null +++ b/crates/re_chunk/src/shuffle.rs @@ -0,0 +1,379 @@ +use arrow2::{ + array::{Array as ArrowArray, ListArray as ArrowListArray}, + offset::Offsets as ArrowOffsets, +}; +use itertools::Itertools as _; + +use crate::{Chunk, ChunkTimeline}; + +// --- + +impl Chunk { + /// Is the chunk currently ascendingly sorted by [`re_log_types::RowId`]? + /// + /// This is O(1) (cached). + /// + /// See also [`Self::is_sorted_uncached`]. + #[inline] + pub fn is_sorted(&self) -> bool { + self.is_sorted + } + + /// Like [`Self::is_sorted`], but actually checks the entire dataset rather than relying on the + /// cached value. + /// + /// O(n). Useful for tests/debugging, or when you just don't know. + /// + /// See also [`Self::is_sorted`]. + #[inline] + pub fn is_sorted_uncached(&self) -> bool { + re_tracing::profile_function!(); + + self.row_ids + .windows(2) + .all(|row_ids| row_ids[0] <= row_ids[1]) + } + + /// Sort the chunk, if needed. + /// + /// If `make_contiguous` is `true`, the underlying arrow data will be copied and shuffled in + /// memory in order to make it contiguous. + /// Otherwise, only the offsets will be shuffled. + /// For most use cases, the small gains in cache locality are not worth the much higher cost + /// of copying the data around. + // + // TODO(cmc): `sort_if_unsorted(false)` followed by `sort_if_unsorted(true)` will not do + // anything, which is really not worth the hassle right now. + #[inline] + pub fn sort_if_unsorted(&mut self, make_contiguous: bool) { + if self.is_sorted() { + return; + } + + re_tracing::profile_function!(); + + let now = std::time::Instant::now(); + + let swaps = { + re_tracing::profile_scope!("swaps"); + let mut swaps = (0..self.row_ids.len()).collect::>(); + swaps.sort_by_key(|&i| self.row_ids[i]); + swaps + }; + + self.shuffle_with(&swaps, make_contiguous); + + re_log::trace!( + entity_path = %self.entity_path, + num_rows = self.row_ids.len(), + elapsed = ?now.elapsed(), + "chunk sorted", + ); + + #[cfg(debug_assertions)] + #[allow(clippy::unwrap_used)] // dev only + self.sanity_check().unwrap(); + } + + /// Randomly shuffles the chunk using the given `seed`. + /// + /// If `make_contiguous` is `true`, the underlying arrow data will be copied and shuffled in + /// memory in order to make it contiguous. + /// Otherwise, only the offsets will be shuffled. + /// For most use cases, the small gains in cache locality are not worth the much higher cost + /// of copying the data around. + #[inline] + pub fn shuffle_random(&mut self, seed: u64, make_contiguous: bool) { + re_tracing::profile_function!(); + + let now = std::time::Instant::now(); + + use rand::{seq::SliceRandom as _, SeedableRng as _}; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + + let swaps = { + re_tracing::profile_scope!("swaps"); + let mut swaps = (0..self.row_ids.len()).collect::>(); + swaps.shuffle(&mut rng); + swaps + }; + + self.shuffle_with(&swaps, make_contiguous); + + re_log::trace!( + entity_path = %self.entity_path, + num_rows = self.row_ids.len(), + elapsed = ?now.elapsed(), + "chunk shuffled", + ); + } + + /// Shuffle the chunk according to the specified `swaps`. + /// + /// `swaps` is a slice that maps an implicit destination index to its explicit source index. + /// E.g. `swap[0] = 3` means that the entry at index `3` in the original chunk should be move to index `0`. + /// + /// If `make_contiguous` is `true`, the underlying arrow data will be copied and shuffled in + /// memory in order to make it contiguous. + /// Otherwise, only the offsets will be shuffled. + /// For most use cases, the small gains in cache locality are not worth the much higher cost + /// of copying the data around. + pub(crate) fn shuffle_with(&mut self, swaps: &[usize], make_contiguous: bool) { + re_tracing::profile_function!(); + + let Self { + id: _, + entity_path: _, + is_sorted: _, + row_ids, + timelines, + components, + } = self; + + // Row IDs + { + re_tracing::profile_scope!("row ids"); + + let original = row_ids.clone(); + for (to, from) in swaps.iter().copied().enumerate() { + row_ids[to] = original[from]; + } + } + + // Timelines + { + re_tracing::profile_scope!("timelines"); + + for info in timelines.values_mut() { + let ChunkTimeline { + times, + is_sorted, + time_range: _, + } = info; + + let original = times.clone(); + for (to, from) in swaps.iter().copied().enumerate() { + times[to] = original[from]; + } + + *is_sorted = times.windows(2).all(|times| times[0] <= times[1]); + } + } + + // Components + // + // Reminder: these are all `ListArray`s. + if !make_contiguous { + re_tracing::profile_scope!("components (offsets only)"); + + for original in components.values_mut() { + #[allow(clippy::unwrap_used)] // a chunk's column is always a list array + let original_list = original + .as_any() + .downcast_ref::>() + .unwrap(); + + let datatype = original.data_type().clone(); + #[allow(clippy::unwrap_used)] // yep, these are in fact lengths + let offsets = ArrowOffsets::try_from_lengths( + swaps.iter().map(|&from| original_list.value(from).len()), + ) + .unwrap(); + let validity = original_list + .validity() + .map(|validity| swaps.iter().map(|&from| validity.get_bit(from)).collect()); + + *original = ArrowListArray::::new( + datatype, + offsets.into(), + original_list.values().clone(), + validity, + ) + .boxed(); + } + } + // Alternative: dont just swap the offsets -- make the data itself contiguous. + // Much higher sorting costs, but potentially slightly better cache locality later. + else { + re_tracing::profile_scope!("components (offsets & data)"); + + for original in components.values_mut() { + #[allow(clippy::unwrap_used)] // a chunk's column is always a list array + let original_list = original + .as_any() + .downcast_ref::>() + .unwrap(); + + let sorted_arrays = swaps + .iter() + .copied() + .map(|from| original_list.value(from)) + .collect_vec(); + let sorted_arrays = sorted_arrays + .iter() + .map(|array| &**array as &dyn ArrowArray) + .collect_vec(); + + let datatype = original.data_type().clone(); + #[allow(clippy::unwrap_used)] // yep, these are in fact lengths + let offsets = + ArrowOffsets::try_from_lengths(sorted_arrays.iter().map(|array| array.len())) + .unwrap(); + #[allow(clippy::unwrap_used)] // these are slices of the same outer array + let values = arrow2::compute::concatenate::concatenate(&sorted_arrays).unwrap(); + let validity = original_list + .validity() + .map(|validity| swaps.iter().map(|&from| validity.get_bit(from)).collect()); + + *original = + ArrowListArray::::new(datatype, offsets.into(), values, validity).boxed(); + } + } + + self.is_sorted = self.is_sorted_uncached(); + } +} + +impl ChunkTimeline { + /// Is the timeline sorted? + /// + /// This is O(1) (cached). + /// + /// See also [`Self::is_sorted_uncached`]. + #[inline] + pub fn is_sorted(&self) -> bool { + self.is_sorted + } + + /// Like [`Self::is_sorted`], but actually checks the entire dataset rather than relying on the + /// cached value. + /// + /// O(n). Useful for tests/debugging, or when you just don't know. + /// + /// See also [`Self::is_sorted`]. + #[inline] + pub fn is_sorted_uncached(&self) -> bool { + re_tracing::profile_function!(); + self.times.windows(2).all(|times| times[0] <= times[1]) + } +} + +#[cfg(test)] +mod tests { + use re_log_types::{ + example_components::{MyColor, MyPoint}, + EntityPath, RowId, TimeInt, Timeline, + }; + use re_types_core::Loggable as _; + + use crate::{arrays_to_list_array, ChunkId}; + + use super::*; + + #[test] + fn sort() -> anyhow::Result<()> { + let entity_path: EntityPath = "a/b/c".into(); + + let timeline1 = Timeline::new_temporal("log_time"); + let timeline2 = Timeline::new_temporal("frame_nr"); + + let points1 = MyPoint::to_arrow([ + MyPoint::new(1.0, 2.0), + MyPoint::new(3.0, 4.0), + MyPoint::new(5.0, 6.0), + ])?; + let points2 = None; + let points3 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0)])?; + let points4 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?; + + let colors1 = MyColor::to_arrow([ + MyColor::from_rgb(1, 2, 3), + MyColor::from_rgb(4, 5, 6), + MyColor::from_rgb(7, 8, 9), + ])?; + let colors2 = MyColor::to_arrow([MyColor::from_rgb(10, 20, 30)])?; + let colors3 = None; + let colors4 = MyColor::to_arrow([ + MyColor::from_rgb(101, 102, 103), + MyColor::from_rgb(104, 105, 106), + ])?; + + let timelines = [ + ( + timeline1, + ChunkTimeline::new( + Some(true), + [1000, 1001, 1002, 1003].map(TimeInt::new_temporal).to_vec(), + ) + .unwrap(), + ), + ( + timeline2, + ChunkTimeline::new( + Some(true), + [42, 43, 44, 45].map(TimeInt::new_temporal).to_vec(), + ) + .unwrap(), + ), + ]; + + let components = [ + ( + MyPoint::name(), + arrays_to_list_array(&[Some(&*points1), points2, Some(&*points3), Some(&*points4)]) + .unwrap(), + ), + ( + MyPoint::name(), + arrays_to_list_array(&[Some(&*colors1), Some(&*colors2), colors3, Some(&*colors4)]) + .unwrap(), + ), + ]; + + let row_ids = vec![RowId::new(), RowId::new(), RowId::new(), RowId::new()]; + + let make_contiguous = [false, true]; + + for make_contiguous in make_contiguous { + let chunk_sorted = Chunk::new( + ChunkId::new(), + entity_path.clone(), + Some(true), + row_ids.clone(), + timelines.clone().into_iter().collect(), + components.clone().into_iter().collect(), + )?; + + // eprintln!("{chunk_sorted}"); + + assert!(chunk_sorted.is_sorted()); + assert!(chunk_sorted.is_sorted_uncached()); + + let chunk_shuffled = { + let mut chunk_shuffled = chunk_sorted.clone(); + chunk_shuffled.shuffle_random(666, make_contiguous); + chunk_shuffled + }; + + // eprintln!("{chunk_shuffled}"); + + assert!(!chunk_shuffled.is_sorted()); + assert!(!chunk_shuffled.is_sorted_uncached()); + assert_ne!(chunk_sorted, chunk_shuffled); + + let chunk_resorted = { + let mut chunk_resorted = chunk_shuffled.clone(); + chunk_resorted.sort_if_unsorted(make_contiguous); + chunk_resorted + }; + + // eprintln!("{chunk_resorted}"); + + assert!(chunk_resorted.is_sorted()); + assert!(chunk_resorted.is_sorted_uncached()); + assert_eq!(chunk_sorted, chunk_resorted); + } + + Ok(()) + } +} diff --git a/crates/re_chunk/src/util.rs b/crates/re_chunk/src/util.rs new file mode 100644 index 0000000000000..1fe1356ffbd29 --- /dev/null +++ b/crates/re_chunk/src/util.rs @@ -0,0 +1,49 @@ +use arrow2::{ + array::{Array as ArrowArray, ListArray as ArrowListArray}, + bitmap::Bitmap as ArrowBitmap, + offset::Offsets as ArrowOffsets, +}; +use itertools::Itertools as _; + +// --- + +/// Create a sparse list-array out of an array of arrays. +/// +/// All arrays must have the same datatype. +/// +/// Returns `None` if `arrays` is empty. +pub fn arrays_to_list_array(arrays: &[Option<&dyn ArrowArray>]) -> Option> { + let arrays_dense = arrays.iter().flatten().copied().collect_vec(); + + if arrays_dense.is_empty() { + return None; + } + + let data = arrow2::compute::concatenate::concatenate(&arrays_dense) + .map_err(|err| { + re_log::warn_once!("failed to concatenate arrays: {err}"); + err + }) + .ok()?; + + let datatype = arrays_dense + .first() + .map(|array| array.data_type().clone())?; + debug_assert!(arrays_dense + .iter() + .all(|array| *array.data_type() == datatype)); + let datatype = ArrowListArray::::default_datatype(datatype); + + #[allow(clippy::unwrap_used)] // yes, there are indeed lengths + let offsets = ArrowOffsets::try_from_lengths( + arrays + .iter() + .map(|array| array.map_or(0, |array| array.len())), + ) + .unwrap(); + + #[allow(clippy::from_iter_instead_of_collect)] + let validity = ArrowBitmap::from_iter(arrays.iter().map(Option::is_some)); + + Some(ArrowListArray::::new(datatype, offsets.into(), data, validity.into()).boxed()) +} diff --git a/examples/rust/objectron/src/objectron.rs b/examples/rust/objectron/src/objectron.rs index 9b1145b73bea1..a7d596df71bce 100644 --- a/examples/rust/objectron/src/objectron.rs +++ b/examples/rust/objectron/src/objectron.rs @@ -1,6 +1,7 @@ // This file was autogenerated by `build.rs`. Do not edit. #![allow(clippy::all, clippy::doc_markdown)] +// This file is @generated by prost-build. /// Info about the camera characteristics used to capture images and depth data. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)]