Skip to content

Commit

Permalink
Prefix more things with Arrow2
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Nov 21, 2024
1 parent cc869e8 commit 502de1b
Show file tree
Hide file tree
Showing 22 changed files with 129 additions and 126 deletions.
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ impl PendingRow {
re_tracing::profile_scope!("iterate per timeline set");

// Then we split the micro batches even further -- one sub-batch per unique set of datatypes.
let mut per_datatype_set: IntMap<u64 /* ArrowDatatype set */, Vec<Self>> =
let mut per_datatype_set: IntMap<u64 /* Arrow2Datatype set */, Vec<Self>> =
Default::default();
{
re_tracing::profile_scope!("compute datatype sets");
Expand Down
22 changes: 11 additions & 11 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow2::{
},
chunk::Chunk as Arrow2Chunk,
datatypes::{
DataType as ArrowDatatype, Field as ArrowField, Metadata as ArrowMetadata,
DataType as Arrow2Datatype, Field as ArrowField, Metadata as Arrow2Metadata,
Schema as Arrow2Schema, TimeUnit as ArrowTimeUnit,
},
};
Expand Down Expand Up @@ -100,7 +100,7 @@ impl TransportChunk {

/// Returns the appropriate chunk-level [`Arrow2Schema`] metadata for a Rerun [`ChunkId`].
#[inline]
pub fn chunk_metadata_id(id: ChunkId) -> ArrowMetadata {
pub fn chunk_metadata_id(id: ChunkId) -> Arrow2Metadata {
[
(
Self::CHUNK_METADATA_KEY_ID.to_owned(),
Expand All @@ -112,7 +112,7 @@ impl TransportChunk {

/// Returns the appropriate chunk-level [`Arrow2Schema`] metadata for the in-memory size in bytes.
#[inline]
pub fn chunk_metadata_heap_size_bytes(heap_size_bytes: u64) -> ArrowMetadata {
pub fn chunk_metadata_heap_size_bytes(heap_size_bytes: u64) -> Arrow2Metadata {
[
(
Self::CHUNK_METADATA_KEY_HEAP_SIZE_BYTES.to_owned(),
Expand All @@ -124,7 +124,7 @@ impl TransportChunk {

/// Returns the appropriate chunk-level [`Arrow2Schema`] metadata for a Rerun [`EntityPath`].
#[inline]
pub fn chunk_metadata_entity_path(entity_path: &EntityPath) -> ArrowMetadata {
pub fn chunk_metadata_entity_path(entity_path: &EntityPath) -> Arrow2Metadata {
[
(
Self::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(),
Expand All @@ -136,7 +136,7 @@ impl TransportChunk {

/// Returns the appropriate chunk-level [`Arrow2Schema`] metadata for an `IS_SORTED` marker.
#[inline]
pub fn chunk_metadata_is_sorted() -> ArrowMetadata {
pub fn chunk_metadata_is_sorted() -> Arrow2Metadata {
[
(
Self::CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID.to_owned(),
Expand All @@ -148,7 +148,7 @@ impl TransportChunk {

/// Returns the appropriate field-level [`Arrow2Schema`] metadata for a Rerun time column.
#[inline]
pub fn field_metadata_time_column() -> ArrowMetadata {
pub fn field_metadata_time_column() -> Arrow2Metadata {
[
(
Self::FIELD_METADATA_KEY_KIND.to_owned(),
Expand All @@ -160,7 +160,7 @@ impl TransportChunk {

/// Returns the appropriate field-level [`Arrow2Schema`] metadata for a Rerun control column.
#[inline]
pub fn field_metadata_control_column() -> ArrowMetadata {
pub fn field_metadata_control_column() -> Arrow2Metadata {
[
(
Self::FIELD_METADATA_KEY_KIND.to_owned(),
Expand All @@ -172,7 +172,7 @@ impl TransportChunk {

/// Returns the appropriate field-level [`Arrow2Schema`] metadata for a Rerun data column.
#[inline]
pub fn field_metadata_data_column() -> ArrowMetadata {
pub fn field_metadata_data_column() -> Arrow2Metadata {
[
(
Self::FIELD_METADATA_KEY_KIND.to_owned(),
Expand All @@ -184,7 +184,7 @@ impl TransportChunk {

/// Returns the appropriate field-level [`Arrow2Schema`] metadata for an `IS_SORTED` marker.
#[inline]
pub fn field_metadata_is_sorted() -> ArrowMetadata {
pub fn field_metadata_is_sorted() -> Arrow2Metadata {
[
(
Self::FIELD_METADATA_MARKER_IS_SORTED_BY_TIME.to_owned(),
Expand Down Expand Up @@ -492,8 +492,8 @@ impl Chunk {
for (field, column) in transport.timelines() {
// See also [`Timeline::datatype`]
let timeline = match column.data_type().to_logical_type() {
ArrowDatatype::Int64 => Timeline::new_sequence(field.name.as_str()),
ArrowDatatype::Timestamp(ArrowTimeUnit::Nanosecond, None) => {
Arrow2Datatype::Int64 => Timeline::new_sequence(field.name.as_str()),
Arrow2Datatype::Timestamp(ArrowTimeUnit::Nanosecond, None) => {
Timeline::new_temporal(field.name.as_str())
}
_ => {
Expand Down
29 changes: 16 additions & 13 deletions crates/store/re_chunk/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use arrow2::{
array::{
Array as Arrow2Array, BooleanArray as ArrowBooleanArray,
Array as Arrow2Array, BooleanArray as Arrow2BooleanArray,
DictionaryArray as ArrowDictionaryArray, ListArray as ArrowListArray,
PrimitiveArray as ArrowPrimitiveArray,
PrimitiveArray as Arrow2PrimitiveArray,
},
bitmap::Bitmap as ArrowBitmap,
datatypes::DataType as ArrowDatatype,
datatypes::DataType as Arrow2Datatype,
offset::Offsets as ArrowOffsets,
};
use itertools::Itertools;
Expand Down Expand Up @@ -59,7 +59,7 @@ pub fn arrays_to_list_array_opt(
///
/// Returns an empty list if `arrays` is empty.
pub fn arrays_to_list_array(
array_datatype: ArrowDatatype,
array_datatype: Arrow2Datatype,
arrays: &[Option<&dyn Arrow2Array>],
) -> Option<ArrowListArray<i32>> {
let arrays_dense = arrays.iter().flatten().copied().collect_vec();
Expand Down Expand Up @@ -109,7 +109,7 @@ pub fn arrays_to_list_array(
// TODO(cmc): A possible improvement would be to pick the smallest key datatype possible based
// on the cardinality of the input arrays.
pub fn arrays_to_dictionary<Idx: Copy + Eq>(
array_datatype: &ArrowDatatype,
array_datatype: &Arrow2Datatype,
arrays: &[Option<(Idx, &dyn Arrow2Array)>],
) -> Option<ArrowDictionaryArray<i32>> {
// Dedupe the input arrays based on the given primary key.
Expand Down Expand Up @@ -162,7 +162,7 @@ pub fn arrays_to_dictionary<Idx: Copy + Eq>(
ArrowListArray::<i32>::new(array_datatype.clone(), offsets.into(), values, None).to_boxed()
};

let datatype = ArrowDatatype::Dictionary(
let datatype = Arrow2Datatype::Dictionary(
arrow2::datatypes::IntegerType::Int32,
std::sync::Arc::new(array_datatype.clone()),
true, // is_sorted
Expand All @@ -172,7 +172,7 @@ pub fn arrays_to_dictionary<Idx: Copy + Eq>(
// unique values.
ArrowDictionaryArray::try_new(
datatype,
ArrowPrimitiveArray::<i32>::from(keys),
Arrow2PrimitiveArray::<i32>::from(keys),
data.to_boxed(),
)
.ok()
Expand Down Expand Up @@ -310,7 +310,10 @@ pub fn pad_list_array_front(
/// Returns a new [`ArrowListArray`] with len `entries`.
///
/// Each entry will be an empty array of the given `child_datatype`.
pub fn new_list_array_of_empties(child_datatype: ArrowDatatype, len: usize) -> ArrowListArray<i32> {
pub fn new_list_array_of_empties(
child_datatype: Arrow2Datatype,
len: usize,
) -> ArrowListArray<i32> {
let empty_array = arrow2::array::new_empty_array(child_datatype);

#[allow(clippy::unwrap_used)] // yes, these are indeed lengths
Expand Down Expand Up @@ -352,7 +355,7 @@ pub fn concat_arrays(arrays: &[&dyn Arrow2Array]) -> arrow2::error::Result<Box<d
/// Takes care of up- and down-casting the data back and forth on behalf of the caller.
///
/// [filter]: arrow2::compute::filter::filter
pub fn filter_array<A: Arrow2Array + Clone>(array: &A, filter: &ArrowBooleanArray) -> A {
pub fn filter_array<A: Arrow2Array + Clone>(array: &A, filter: &Arrow2BooleanArray) -> A {
assert_eq!(
array.len(), filter.len(),
"the length of the filter must match the length of the array (the underlying kernel will panic otherwise)",
Expand Down Expand Up @@ -392,7 +395,7 @@ pub fn filter_array<A: Arrow2Array + Clone>(array: &A, filter: &ArrowBooleanArra
// For internal stuff, we could perhaps provide a custom implementation that returns a `DictionaryArray` instead?
pub fn take_array<A: Arrow2Array + Clone, O: arrow2::types::Index>(
array: &A,
indices: &ArrowPrimitiveArray<O>,
indices: &Arrow2PrimitiveArray<O>,
) -> A {
debug_assert!(
indices.validity().is_none(),
Expand Down Expand Up @@ -435,15 +438,15 @@ pub fn take_array<A: Arrow2Array + Clone, O: arrow2::types::Index>(

// ---

use arrow2::{chunk::Chunk as ArrowChunk, datatypes::Schema as ArrowSchema};
use arrow2::{chunk::Chunk as Arrow2Chunk, datatypes::Schema as Arrow2Schema};

/// Concatenate multiple [`TransportChunk`]s into one.
///
/// This is a temporary method that we use while waiting to migrate towards `arrow-rs`.
/// * `arrow2` doesn't have a `RecordBatch` type, therefore we emulate that using our `TransportChunk`s.
/// * `arrow-rs` does have one, and it natively supports concatenation.
pub fn concatenate_record_batches(
schema: ArrowSchema,
schema: Arrow2Schema,
batches: &[TransportChunk],
) -> anyhow::Result<TransportChunk> {
assert!(batches.iter().map(|batch| &batch.schema).all_equal());
Expand All @@ -464,6 +467,6 @@ pub fn concatenate_record_batches(

Ok(TransportChunk {
schema,
data: ArrowChunk::new(arrays),
data: Arrow2Chunk::new(arrays),
})
}
4 changes: 2 additions & 2 deletions crates/store/re_chunk/tests/latest_at.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow2::datatypes::DataType as ArrowDatatype;
use arrow2::datatypes::DataType as Arrow2Datatype;
use nohash_hasher::IntMap;

use re_chunk::{Chunk, ComponentName, LatestAtQuery, RowId, TimePoint, Timeline};
Expand All @@ -9,7 +9,7 @@ use re_types_core::{Component, Loggable};

const ENTITY_PATH: &str = "my/entity";

fn datatypes() -> IntMap<ComponentName, ArrowDatatype> {
fn datatypes() -> IntMap<ComponentName, Arrow2Datatype> {
[
(MyPoint::name(), MyPoint::arrow2_datatype()),
(MyColor::name(), MyColor::arrow2_datatype()),
Expand Down
Loading

0 comments on commit 502de1b

Please sign in to comment.