Skip to content

Commit

Permalink
rerun_py.dataframe: add support for .filter_index_values
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Oct 9, 2024
1 parent a2705b7 commit b24b5f3
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 15 deletions.
47 changes: 38 additions & 9 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3588,6 +3588,19 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308d96db8debc727c3fd9744aac51751243420e46edf401010908da7f8d5e57c"

[[package]]
name = "ndarray"
version = "0.15.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb12d4e967ec485a5f71c6311fe28158e9d6f4bc4a447b474184d0f91a8fa32"
dependencies = [
"matrixmultiply",
"num-complex",
"num-integer",
"num-traits",
"rawpointer",
]

[[package]]
name = "ndarray"
version = "0.16.1"
Expand All @@ -3609,7 +3622,7 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f093b3db6fd194718dcdeea6bd8c829417deae904e3fcc7732dabcd4416d25d8"
dependencies = [
"ndarray",
"ndarray 0.16.1",
"rand",
"rand_distr",
]
Expand Down Expand Up @@ -3846,6 +3859,21 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"

[[package]]
name = "numpy"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec170733ca37175f5d75a5bea5911d6ff45d2cd52849ce98b685394e4f2f37f4"
dependencies = [
"libc",
"ndarray 0.15.6",
"num-complex",
"num-integer",
"num-traits",
"pyo3",
"rustc-hash",
]

[[package]]
name = "objc"
version = "0.2.7"
Expand Down Expand Up @@ -5457,7 +5485,7 @@ dependencies = [
"document-features",
"itertools 0.13.0",
"libc",
"ndarray",
"ndarray 0.16.1",
"ndarray-rand",
"once_cell",
"parking_lot",
Expand Down Expand Up @@ -5647,7 +5675,7 @@ dependencies = [
"bytemuck",
"egui",
"half 2.3.1",
"ndarray",
"ndarray 0.16.1",
"re_chunk_store",
"re_data_ui",
"re_log_types",
Expand Down Expand Up @@ -5798,7 +5826,7 @@ dependencies = [
"linked-hash-map",
"mime_guess2",
"mint",
"ndarray",
"ndarray 0.16.1",
"nohash-hasher",
"once_cell",
"ply-rs",
Expand Down Expand Up @@ -6015,7 +6043,7 @@ dependencies = [
"indexmap 2.1.0",
"itertools 0.13.0",
"linked-hash-map",
"ndarray",
"ndarray 0.16.1",
"nohash-hasher",
"once_cell",
"parking_lot",
Expand Down Expand Up @@ -6288,6 +6316,7 @@ dependencies = [
"infer",
"itertools 0.13.0",
"mimalloc",
"numpy",
"once_cell",
"parking_lot",
"pyo3",
Expand Down Expand Up @@ -6460,7 +6489,7 @@ dependencies = [
"clap",
"half 2.3.1",
"image",
"ndarray",
"ndarray 0.16.1",
"re_log",
"rerun",
]
Expand Down Expand Up @@ -6532,7 +6561,7 @@ version = "0.19.0-alpha.1+dev"
dependencies = [
"anyhow",
"clap",
"ndarray",
"ndarray 0.16.1",
"re_log",
"rerun",
]
Expand Down Expand Up @@ -7015,7 +7044,7 @@ name = "snippets"
version = "0.19.0-alpha.1+dev"
dependencies = [
"itertools 0.13.0",
"ndarray",
"ndarray 0.16.1",
"rand",
"rand_distr",
"re_build_tools",
Expand Down Expand Up @@ -7187,7 +7216,7 @@ dependencies = [
"clap",
"glam",
"itertools 0.13.0",
"ndarray",
"ndarray 0.16.1",
"ndarray-rand",
"rand",
"re_log",
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ nohash-hasher = "0.2"
notify = { version = "6.1.1", features = ["macos_kqueue"] }
num-derive = "0.4"
num-traits = "0.2"
numpy = "0.21"
once_cell = "1.17" # No lazy_static - use `std::sync::OnceLock` or `once_cell` instead
ordered-float = "4.2"
parking_lot = "0.12"
Expand Down
1 change: 1 addition & 0 deletions rerun_py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ infer.workspace = true
# TODO(#5875): `mimalloc` starts leaking OS pages starting with `0.1.38`.
# When the bug is fixed, change this back to `mimalloc = { workspace = true, …`.
mimalloc = { version = "=0.1.37", features = ["local_dynamic_tls"] }
numpy.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
pyo3 = { workspace = true, features = ["abi3-py38"] }
Expand Down
12 changes: 11 additions & 1 deletion rerun_py/rerun_bindings/rerun_bindings.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ from typing import Optional, Sequence

import pyarrow as pa

from .types import AnyColumn, ComponentLike, ViewContentsLike
from .types import AnyColumn, ComponentLike, IndexLike, ViewContentsLike

class IndexColumnDescriptor:
"""A column containing the index values for when the component data was updated."""
Expand Down Expand Up @@ -57,6 +57,16 @@ class RecordingView:
"""Filter the view to only include data between the given index time values."""
...

def filter_index_values(self, index_values: IndexLike) -> RecordingView:
"""
Filter the view to only include data at the given index values.
This requires index values to be a precise match. Index values in Rerun are
represented as i64 sequence counts or nanoseconds. This API does not expose an interface
in floating point seconds, as the numerical conversion would risk false mismatches.
"""
...

def select(self, *args: AnyColumn, columns: Optional[Sequence[AnyColumn]] = None) -> pa.RecordBatchReader: ...

class Recording:
Expand Down
14 changes: 10 additions & 4 deletions rerun_py/rerun_bindings/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@

from typing import TYPE_CHECKING, Sequence, TypeAlias, Union

import numpy as np
import numpy.typing as npt
import pyarrow as pa

if TYPE_CHECKING:
from rerun._baseclasses import ComponentMixin

from .rerun_bindings import (
ComponentColumnDescriptor as ComponentColumnDescriptor,
ComponentColumnSelector as ComponentColumnSelector,
TimeColumnDescriptor as TimeColumnDescriptor,
TimeColumnSelector as TimeColumnSelector,
IndexColumnSelector as IndexColumnDescriptor,
IndexColumnSelector as IndexColumnSelector,
)

ComponentLike: TypeAlias = Union[str, type["ComponentMixin"]]

AnyColumn: TypeAlias = Union[
"TimeColumnDescriptor",
"ComponentColumnDescriptor",
"TimeColumnSelector",
"ComponentColumnSelector",
"IndexColumnDescriptor",
"IndexColumnSelector",
]

AnyComponentColumn: TypeAlias = Union[
Expand All @@ -30,3 +34,5 @@
str,
dict[str, Union[AnyColumn, Sequence[ComponentLike]]],
]

IndexLike: TypeAlias = Union[npt.NDArray[np.int_], pa.Int64Array]
108 changes: 107 additions & 1 deletion rerun_py/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
use std::collections::{BTreeMap, BTreeSet};

use arrow::{
array::{RecordBatchIterator, RecordBatchReader},
array::{make_array, Array, ArrayData, Int64Array, RecordBatchIterator, RecordBatchReader},
pyarrow::PyArrowType,
};
use numpy::PyArrayMethods as _;
use pyo3::{
exceptions::{PyRuntimeError, PyTypeError, PyValueError},
prelude::*,
Expand Down Expand Up @@ -195,6 +196,99 @@ impl AnyComponentColumn {
}
}

#[derive(FromPyObject)]
enum IndexLike<'py> {
PyArrow(PyArrowType<ArrayData>),
NumPy(numpy::PyArrayLike1<'py, i64>),

// Catch all to support ChunkedArray and other types
#[pyo3(transparent)]
CatchAll(Bound<'py, PyAny>),
}

impl<'py> IndexLike<'py> {
fn to_index_values(&self) -> PyResult<BTreeSet<re_chunk_store::TimeInt>> {
match self {
Self::PyArrow(array) => {
let array = make_array(array.0.clone());

let int_array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
PyTypeError::new_err("Expected an array of integers for index values.")
})?;

let values: BTreeSet<re_chunk_store::TimeInt> = int_array
.iter()
.map(|v| {
v.map_or_else(
|| re_chunk_store::TimeInt::STATIC,
re_chunk_store::TimeInt::new_temporal,
)
})
.collect();

if values.len() != int_array.len() {
return Err(PyValueError::new_err("Index values must be unique."));
}

Ok(values)
}
Self::NumPy(array) => {
let values: BTreeSet<re_chunk_store::TimeInt> = array
.readonly()
.as_array()
.iter()
.map(|v| re_chunk_store::TimeInt::new_temporal(*v))
.collect();

if values.len() != array.len()? {
return Err(PyValueError::new_err("Index values must be unique."));
}

Ok(values)
}
Self::CatchAll(any) => {
// If any has the `.chunks` attribute, we can try to try each chunk as pyarrow array
if let Ok(chunks) = any.getattr("chunks") {
let mut values = BTreeSet::new();
for chunk in chunks.iter()? {
let chunk = chunk?.extract::<PyArrowType<ArrayData>>()?;
let array = make_array(chunk.0.clone());

let int_array =
array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
PyTypeError::new_err(
"Expected an array of integers for index values.",
)
})?;

values.extend(
int_array
.iter()
.map(|v| {
v.map_or_else(
|| re_chunk_store::TimeInt::STATIC,
re_chunk_store::TimeInt::new_temporal,
)
})
.collect::<BTreeSet<_>>(),
);
}

if values.len() != any.len()? {
return Err(PyValueError::new_err("Index values must be unique."));
}

Ok(values)
} else {
Err(PyTypeError::new_err(
"IndexLike must be a pyarrow.Array, pyarrow.ChunkedArray, or numpy.ndarray",
))
}
}
}
}
}

struct ComponentLike(re_sdk::ComponentName);

impl FromPyObject<'_> for ComponentLike {
Expand Down Expand Up @@ -438,6 +532,18 @@ impl PyRecordingView {
query_expression,
})
}

fn filter_index_values(&self, values: IndexLike<'_>) -> PyResult<Self> {
let values = values.to_index_values()?;

let mut query_expression = self.query_expression.clone();
query_expression.filtered_index_values = Some(values);

Ok(Self {
recording: self.recording.clone(),
query_expression,
})
}
}

impl PyRecording {
Expand Down

0 comments on commit b24b5f3

Please sign in to comment.