diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 27973a40faa9..bb3ed6e4fac9 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -880,6 +880,7 @@ mod tests { assert_eq!(array.null_count(), 0); assert_eq!(array.values().len(), 1); assert_eq!(array.values().null_count(), 1); + assert_eq!(array.run_ends().len(), 4); assert_eq!(array.run_ends().values(), &[4]); let idx = array.get_physical_indices(&[0, 1, 2, 3]).unwrap(); diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs index 126aefde94f3..e50903f30f9b 100644 --- a/arrow-array/src/array/run_array.rs +++ b/arrow-array/src/array/run_array.rs @@ -17,6 +17,7 @@ use std::any::Any; +use arrow_buffer::buffer::RunEndBuffer; use arrow_buffer::ArrowNativeType; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType, Field}; @@ -62,7 +63,7 @@ use crate::{ pub struct RunArray { data: ArrayData, - run_ends: PrimitiveArray, + run_ends: RunEndBuffer, values: ArrayRef, } @@ -110,11 +111,8 @@ impl RunArray { Ok(array_data.into()) } - /// Returns a reference to run_ends array - /// - /// Note: any slicing of this [`RunArray`] array is not applied to the returned array - /// and must be handled separately - pub fn run_ends(&self) -> &PrimitiveArray { + /// Returns a reference to [`RunEndBuffer`] + pub fn run_ends(&self) -> &RunEndBuffer { &self.run_ends } @@ -128,19 +126,12 @@ impl RunArray { /// Returns the physical index at which the array slice starts. pub fn get_start_physical_index(&self) -> usize { - if self.offset() == 0 { - return 0; - } - self.get_zero_offset_physical_index(self.offset()).unwrap() + self.run_ends.get_start_physical_index() } /// Returns the physical index at which the array slice ends. pub fn get_end_physical_index(&self) -> usize { - if self.offset() + self.len() == Self::logical_len(&self.run_ends) { - return self.run_ends.len() - 1; - } - self.get_zero_offset_physical_index(self.offset() + self.len() - 1) - .unwrap() + self.run_ends.get_end_physical_index() } /// Downcast this [`RunArray`] to a [`TypedRunArray`] @@ -164,47 +155,13 @@ impl RunArray { }) } - /// Returns index to the physical array for the given index to the logical array. - /// The function does not adjust the input logical index based on `ArrayData::offset`. - /// Performs a binary search on the run_ends array for the input index. - #[inline] - pub fn get_zero_offset_physical_index(&self, logical_index: usize) -> Option { - if logical_index >= Self::logical_len(&self.run_ends) { - return None; - } - let mut st: usize = 0; - let mut en: usize = self.run_ends.len(); - while st + 1 < en { - let mid: usize = (st + en) / 2; - if logical_index - < unsafe { - // Safety: - // The value of mid will always be between 1 and len - 1, - // where len is length of run ends array. - // This is based on the fact that `st` starts with 0 and - // `en` starts with len. The condition `st + 1 < en` ensures - // `st` and `en` differs atleast by two. So the value of `mid` - // will never be either `st` or `en` - self.run_ends.value_unchecked(mid - 1).as_usize() - } - { - en = mid - } else { - st = mid - } - } - Some(st) - } - /// Returns index to the physical array for the given index to the logical array. /// This function adjusts the input logical index based on `ArrayData::offset` /// Performs a binary search on the run_ends array for the input index. - #[inline] - pub fn get_physical_index(&self, logical_index: usize) -> Option { - if logical_index >= self.len() { - return None; - } - self.get_zero_offset_physical_index(logical_index + self.offset()) + /// + /// The result is arbitrary if `logical_index >= self.len()` + pub fn get_physical_index(&self, logical_index: usize) -> usize { + self.run_ends.get_physical_index(logical_index) } /// Returns the physical indices of the input logical indices. Returns error if any of the logical @@ -222,6 +179,9 @@ impl RunArray { where I: ArrowNativeType, { + let len = self.run_ends().len(); + let offset = self.run_ends().offset(); + let indices_len = logical_indices.len(); if indices_len == 0 { @@ -243,7 +203,7 @@ impl RunArray { // Return early if all the logical indices cannot be converted to physical indices. let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize(); - if largest_logical_index >= self.len() { + if largest_logical_index >= len { return Err(ArrowError::InvalidArgumentError(format!( "Cannot convert all logical indices to physical indices. The logical index cannot be converted is {largest_logical_index}.", ))); @@ -259,7 +219,7 @@ impl RunArray { self.run_ends.values().iter().enumerate().skip(skip_value) { // Get the run end index (relative to offset) of current physical index - let run_end_value = run_end.as_usize() - self.offset(); + let run_end_value = run_end.as_usize() - offset; // All the `logical_indices` that are less than current run end index // belongs to current physical index. @@ -295,7 +255,15 @@ impl From for RunArray { } } - let run_ends = PrimitiveArray::::from(data.child_data()[0].clone()); + // Safety + // ArrayData is valid + let child = &data.child_data()[0]; + assert_eq!(child.data_type(), &R::DATA_TYPE, "Incorrect run ends type"); + let run_ends = unsafe { + let scalar = child.buffers()[0].clone().into(); + RunEndBuffer::new_unchecked(scalar, data.offset(), data.len()) + }; + let values = make_array(data.child_data()[1].clone()); Self { data, @@ -330,7 +298,8 @@ impl std::fmt::Debug for RunArray { writeln!( f, "RunArray {{run_ends: {:?}, values: {:?}}}", - self.run_ends, self.values + self.run_ends.values(), + self.values ) } } @@ -347,7 +316,7 @@ impl std::fmt::Debug for RunArray { /// .map(|&x| if x == "b" { None } else { Some(x) }) /// .collect(); /// assert_eq!( -/// "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 5,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n", +/// "RunArray {run_ends: [2, 3, 5], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n", /// format!("{:?}", array) /// ); /// ``` @@ -374,7 +343,7 @@ impl<'a, T: RunEndIndexType> FromIterator> for RunArray { /// let test = vec!["a", "a", "b", "c"]; /// let array: RunArray = test.into_iter().collect(); /// assert_eq!( -/// "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n", +/// "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n", /// format!("{:?}", array) /// ); /// ``` @@ -401,7 +370,7 @@ impl<'a, T: RunEndIndexType> FromIterator<&'a str> for RunArray { /// /// let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); /// let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); -/// assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5])); +/// assert_eq!(array.run_ends().values(), &[2, 3, 5]); /// assert_eq!(array.values(), &values); /// ``` pub type Int16RunArray = RunArray; @@ -416,7 +385,7 @@ pub type Int16RunArray = RunArray; /// /// let array: Int32RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); /// let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); -/// assert_eq!(array.run_ends(), &Int32Array::from(vec![2, 3, 5])); +/// assert_eq!(array.run_ends().values(), &[2, 3, 5]); /// assert_eq!(array.values(), &values); /// ``` pub type Int32RunArray = RunArray; @@ -431,7 +400,7 @@ pub type Int32RunArray = RunArray; /// /// let array: Int64RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); /// let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); -/// assert_eq!(array.run_ends(), &Int64Array::from(vec![2, 3, 5])); +/// assert_eq!(array.run_ends().values(), &[2, 3, 5]); /// assert_eq!(array.values(), &values); /// ``` pub type Int64RunArray = RunArray; @@ -480,7 +449,7 @@ impl<'a, R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'a, R, V> { impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> { /// Returns the run_ends of this [`TypedRunArray`] - pub fn run_ends(&self) -> &'a PrimitiveArray { + pub fn run_ends(&self) -> &'a RunEndBuffer { self.run_array.run_ends() } @@ -531,7 +500,7 @@ where } unsafe fn value_unchecked(&self, logical_index: usize) -> Self::Item { - let physical_index = self.run_array.get_physical_index(logical_index).unwrap(); + let physical_index = self.run_array.get_physical_index(logical_index); self.values().value_unchecked(physical_index) } } @@ -563,7 +532,7 @@ mod tests { use crate::builder::PrimitiveRunBuilder; use crate::cast::as_primitive_array; use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type}; - use crate::{Array, Int16Array, Int32Array, StringArray}; + use crate::{Array, Int32Array, StringArray}; fn build_input_array(size: usize) -> Vec> { // The input array is created by shuffling and repeating @@ -643,9 +612,10 @@ mod tests { ]); // Construct a run_ends array: - let run_ends_data = PrimitiveArray::::from_iter_values([ - 4_i16, 6, 7, 9, 13, 18, 20, 22, - ]); + let run_ends_values = [4_i16, 6, 7, 9, 13, 18, 20, 22]; + let run_ends_data = PrimitiveArray::::from_iter_values( + run_ends_values.iter().copied(), + ); // Construct a run ends encoded array from the above two let ree_array = @@ -659,8 +629,7 @@ mod tests { assert_eq!(&DataType::Int8, values.data_type()); let run_ends = ree_array.run_ends(); - assert_eq!(&run_ends_data.into_data(), run_ends.data()); - assert_eq!(&DataType::Int16, run_ends.data_type()); + assert_eq!(run_ends.values(), &run_ends_values); } #[test] @@ -671,7 +640,7 @@ mod tests { builder.append_value(22345678); let array = builder.finish(); assert_eq!( - "RunArray {run_ends: PrimitiveArray\n[\n 1,\n 2,\n 3,\n], values: PrimitiveArray\n[\n 12345678,\n null,\n 22345678,\n]}\n", + "RunArray {run_ends: [1, 2, 3], values: PrimitiveArray\n[\n 12345678,\n null,\n 22345678,\n]}\n", format!("{array:?}") ); @@ -685,7 +654,7 @@ mod tests { assert_eq!(array.null_count(), 0); assert_eq!( - "RunArray {run_ends: PrimitiveArray\n[\n 20,\n], values: PrimitiveArray\n[\n 1,\n]}\n", + "RunArray {run_ends: [20], values: PrimitiveArray\n[\n 1,\n]}\n", format!("{array:?}") ); } @@ -698,7 +667,7 @@ mod tests { .map(|&x| if x == "b" { None } else { Some(x) }) .collect(); assert_eq!( - "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n", + "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n", format!("{array:?}") ); @@ -707,7 +676,7 @@ mod tests { let array: RunArray = test.into_iter().collect(); assert_eq!( - "RunArray {run_ends: PrimitiveArray\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n", + "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n", format!("{array:?}") ); } @@ -721,8 +690,6 @@ mod tests { assert_eq!(array.null_count(), 0); let run_ends = array.run_ends(); - assert_eq!(&DataType::Int16, run_ends.data_type()); - assert_eq!(0, run_ends.null_count()); assert_eq!(&[1, 2, 3, 4], run_ends.values()); } @@ -735,9 +702,6 @@ mod tests { assert_eq!(array.null_count(), 0); let run_ends = array.run_ends(); - assert_eq!(&DataType::Int32, run_ends.data_type()); - assert_eq!(0, run_ends.null_count()); - assert_eq!(5, run_ends.len()); assert_eq!(&[1, 2, 3, 5, 6], run_ends.values()); let values_data = array.values(); @@ -754,7 +718,7 @@ mod tests { assert_eq!(array.null_count(), 0); let run_ends = array.run_ends(); - assert_eq!(1, run_ends.len()); + assert_eq!(3, run_ends.len()); assert_eq!(&[3], run_ends.values()); let values_data = array.values(); @@ -770,16 +734,14 @@ mod tests { [Some(1), Some(2), Some(3), Some(4)].into_iter().collect(); let array = RunArray::::try_new(&run_ends, &values).unwrap(); - assert_eq!(array.run_ends().data_type(), &DataType::Int32); assert_eq!(array.values().data_type(), &DataType::Utf8); assert_eq!(array.null_count(), 0); assert_eq!(array.len(), 4); - assert_eq!(array.run_ends.null_count(), 0); assert_eq!(array.values().null_count(), 1); assert_eq!( - "RunArray {run_ends: PrimitiveArray\n[\n 1,\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n", + "RunArray {run_ends: [1, 2, 3, 4], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n", format!("{array:?}") ); } @@ -788,7 +750,7 @@ mod tests { fn test_run_array_int16_type_definition() { let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect(); let values: Arc = Arc::new(StringArray::from(vec!["a", "b", "c"])); - assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5])); + assert_eq!(array.run_ends().values(), &[2, 3, 5]); assert_eq!(array.values(), &values); } @@ -796,7 +758,7 @@ mod tests { fn test_run_array_empty_string() { let array: Int16RunArray = vec!["a", "a", "", "", "c"].into_iter().collect(); let values: Arc = Arc::new(StringArray::from(vec!["a", "", "c"])); - assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 4, 5])); + assert_eq!(array.run_ends().values(), &[2, 4, 5]); assert_eq!(array.values(), &values); } @@ -849,9 +811,7 @@ mod tests { } #[test] - #[should_panic( - expected = "PrimitiveArray expected ArrayData with type Int64 got Int32" - )] + #[should_panic(expected = "Incorrect run ends type")] fn test_run_array_run_ends_data_type_mismatch() { let a = RunArray::::from_iter(["32"]); let _ = RunArray::::from(a.into_data()); @@ -874,7 +834,7 @@ mod tests { let actual = typed.value(i); assert_eq!(*val, actual) } else { - let physical_ix = run_array.get_physical_index(i).unwrap(); + let physical_ix = run_array.get_physical_index(i); assert!(typed.values().is_null(physical_ix)); }; } diff --git a/arrow-array/src/builder/generic_byte_run_builder.rs b/arrow-array/src/builder/generic_byte_run_builder.rs index c6dbb82ff6eb..5c15b1544ed3 100644 --- a/arrow-array/src/builder/generic_byte_run_builder.rs +++ b/arrow-array/src/builder/generic_byte_run_builder.rs @@ -49,10 +49,7 @@ use arrow_buffer::ArrowNativeType; /// builder.append_null(); /// let array = builder.finish(); /// -/// assert_eq!( -/// array.run_ends(), -/// &Int16Array::from(vec![Some(2), Some(3), Some(5), Some(6)]) -/// ); +/// assert_eq!(array.run_ends().values(), &[2, 3, 5, 6]); /// /// let av = array.values(); /// @@ -331,10 +328,7 @@ where /// builder.extend([Some("def"), Some("def"), Some("abc")]); /// let array = builder.finish(); /// -/// assert_eq!( -/// array.run_ends(), -/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)]) -/// ); +/// assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]); /// /// // Values are polymorphic and so require a downcast. /// let av = array.values(); @@ -370,10 +364,7 @@ pub type LargeStringRunBuilder = GenericByteRunBuilder; /// builder.extend([Some(b"def"), Some(b"def"), Some(b"abc")]); /// let array = builder.finish(); /// -/// assert_eq!( -/// array.run_ends(), -/// &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)]) -/// ); +/// assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]); /// /// // Values are polymorphic and so require a downcast. /// let av = array.values(); @@ -396,11 +387,9 @@ mod tests { use super::*; use crate::array::Array; - use crate::cast::as_primitive_array; use crate::cast::as_string_array; use crate::types::{Int16Type, Int32Type}; use crate::GenericByteArray; - use crate::Int16Array; use crate::Int16RunArray; fn test_bytes_run_buider(values: Vec<&T::Native>) @@ -426,10 +415,7 @@ mod tests { assert_eq!(array.len(), 11); assert_eq!(array.null_count(), 0); - assert_eq!( - array.run_ends(), - &Int16Array::from(vec![Some(3), Some(5), Some(7), Some(11)]) - ); + assert_eq!(array.run_ends().values(), &[3, 5, 7, 11]); // Values are polymorphic and so require a downcast. let av = array.values(); @@ -475,10 +461,7 @@ mod tests { assert_eq!(array.len(), 5); assert_eq!(array.null_count(), 0); - assert_eq!( - array.run_ends(), - &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(5)]) - ); + assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]); // Values are polymorphic and so require a downcast. let av = array.values(); @@ -500,10 +483,7 @@ mod tests { assert_eq!(array.len(), 8); assert_eq!(array.null_count(), 0); - assert_eq!( - array.run_ends(), - &Int16Array::from(vec![Some(1), Some(2), Some(4), Some(7), Some(8),]) - ); + assert_eq!(array.run_ends().values(), &[1, 2, 4, 7, 8]); // Values are polymorphic and so require a downcast. let av2 = array.values(); @@ -536,10 +516,7 @@ mod tests { let array = builder.finish(); assert_eq!(array.len(), 10); - assert_eq!( - as_primitive_array::(array.run_ends()).values(), - &[3, 5, 8, 10] - ); + assert_eq!(array.run_ends().values(), &[3, 5, 8, 10]); let str_array = as_string_array(array.values().as_ref()); assert_eq!(str_array.value(0), "a"); diff --git a/arrow-array/src/builder/primitive_run_builder.rs b/arrow-array/src/builder/primitive_run_builder.rs index 41066228390d..e7c822ee6b19 100644 --- a/arrow-array/src/builder/primitive_run_builder.rs +++ b/arrow-array/src/builder/primitive_run_builder.rs @@ -44,10 +44,7 @@ use arrow_buffer::ArrowNativeType; /// builder.append_value(5678); /// let array = builder.finish(); /// -/// assert_eq!( -/// array.run_ends(), -/// &Int16Array::from(vec![Some(3), Some(4), Some(6)]) -/// ); +/// assert_eq!(array.run_ends().values(), &[3, 4, 6]); /// /// let av = array.values(); /// @@ -270,7 +267,7 @@ mod tests { use crate::builder::PrimitiveRunBuilder; use crate::cast::as_primitive_array; use crate::types::{Int16Type, UInt32Type}; - use crate::{Array, Int16Array, UInt32Array}; + use crate::{Array, UInt32Array}; #[test] fn test_primitive_ree_array_builder() { @@ -287,10 +284,7 @@ mod tests { assert_eq!(array.null_count(), 0); assert_eq!(array.len(), 6); - assert_eq!( - array.run_ends(), - &Int16Array::from(vec![Some(3), Some(4), Some(6)]) - ); + assert_eq!(array.run_ends().values(), &[3, 4, 6]); let av = array.values(); @@ -313,10 +307,7 @@ mod tests { assert_eq!(array.len(), 11); assert_eq!(array.null_count(), 0); - assert_eq!( - as_primitive_array::(array.run_ends()).values(), - &[1, 3, 5, 9, 10, 11] - ); + assert_eq!(array.run_ends().values(), &[1, 3, 5, 9, 10, 11]); assert_eq!( as_primitive_array::(array.values().as_ref()).values(), &[1, 2, 5, 4, 6, 2] diff --git a/arrow-array/src/run_iterator.rs b/arrow-array/src/run_iterator.rs index 44cb59ac7fc4..60022113c3dd 100644 --- a/arrow-array/src/run_iterator.rs +++ b/arrow-array/src/run_iterator.rs @@ -17,9 +17,8 @@ //! Idiomatic iterator for [`RunArray`](crate::Array) -use arrow_buffer::ArrowNativeType; - use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, TypedRunArray}; +use arrow_buffer::ArrowNativeType; /// The [`RunArrayIter`] provides an idiomatic way to iterate over the run array. /// It returns Some(T) if there is a value or None if the value is null. @@ -83,14 +82,11 @@ where if self.current_front_logical == self.current_back_logical { return None; } + // If current logical index is greater than current run end index then increment // the physical index. - if self.current_front_logical - >= self - .array - .run_ends() - .value(self.current_front_physical) - .as_usize() + let run_ends = self.array.run_ends().values(); + if self.current_front_logical >= run_ends[self.current_front_physical].as_usize() { // As the run_ends is expected to be strictly increasing, there // should be at least one logical entry in one physical entry. Because of this @@ -138,13 +134,10 @@ where self.current_back_logical -= 1; + let run_ends = self.array.run_ends().values(); if self.current_back_physical > 0 && self.current_back_logical - < self - .array - .run_ends() - .value(self.current_back_physical - 1) - .as_usize() + < run_ends[self.current_back_physical - 1].as_usize() { // As the run_ends is expected to be strictly increasing, there // should be at least one logical entry in one physical entry. Because of this diff --git a/arrow-buffer/src/buffer/mod.rs b/arrow-buffer/src/buffer/mod.rs index f7e41260d80e..ed53d3361daa 100644 --- a/arrow-buffer/src/buffer/mod.rs +++ b/arrow-buffer/src/buffer/mod.rs @@ -32,3 +32,5 @@ mod boolean; pub use boolean::*; mod null; pub use null::*; +mod run; +pub use run::*; diff --git a/arrow-buffer/src/buffer/run.rs b/arrow-buffer/src/buffer/run.rs new file mode 100644 index 000000000000..a7c39638758c --- /dev/null +++ b/arrow-buffer/src/buffer/run.rs @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::buffer::ScalarBuffer; +use crate::ArrowNativeType; + +/// A slice-able buffer of monotonically increasing, positive integers used to store run-ends +/// +/// # Logical vs Physical +/// +/// A [`RunEndBuffer`] is used to encode runs of the same value, the index of each run is +/// called the physical index. The logical index is then the corresponding index in the logical +/// run-encoded array, i.e. a single run of length `3`, would have the logical indices `0..3`. +/// +/// Each value in [`RunEndBuffer::values`] is the cumulative length of all runs in the +/// logical array, up to that physical index. +/// +/// Consider a [`RunEndBuffer`] containing `[3, 4, 6]`. The maximum physical index is `2`, +/// as there are `3` values, and the maximum logical index is `6`, as the maximum run end +/// is `6`. The physical indices are therefore `[0, 0, 0, 1, 1, 2, 2]` +/// +/// ```text +/// ┌─────────┐ ┌─────────┐ ┌─────────┐ +/// │ 3 │ │ 0 │ ─┬──────▶ │ 0 │ +/// ├─────────┤ ├─────────┤ │ ├─────────┤ +/// │ 4 │ │ 1 │ ─┤ ┌────▶ │ 1 │ +/// ├─────────┤ ├─────────┤ │ │ ├─────────┤ +/// │ 6 │ │ 2 │ ─┘ │ ┌──▶ │ 2 │ +/// └─────────┘ ├─────────┤ │ │ └─────────┘ +/// run ends │ 3 │ ───┤ │ physical indices +/// ├─────────┤ │ │ +/// │ 4 │ ───┘ │ +/// ├─────────┤ │ +/// │ 5 │ ─────┤ +/// ├─────────┤ │ +/// │ 6 │ ─────┘ +/// └─────────┘ +/// logical indices +/// ``` +/// +/// # Slicing +/// +/// In order to provide zero-copy slicing, this container stores a separate offset and length +/// +/// For example, a [`RunEndBuffer`] containing values `[3, 6, 8]` with offset and length `4` would +/// describe the physical indices `1, 1, 2, 2` +/// +/// For example, a [`RunEndBuffer`] containing values `[6, 8, 9]` with offset `2` and length `5` +/// would describe the physical indices `0, 0, 0, 0, 1` +/// +/// [Run-End encoded layout]: https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout +#[derive(Debug, Clone)] +pub struct RunEndBuffer { + run_ends: ScalarBuffer, + len: usize, + offset: usize, +} + +impl RunEndBuffer +where + E: ArrowNativeType, +{ + /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], an `offset` and `len` + /// + /// # Panics + /// + /// - `buffer` does not contain strictly increasing values greater than zero + /// - the last value of `buffer` is less than `offset + len` + pub fn new(run_ends: ScalarBuffer, offset: usize, len: usize) -> Self { + assert!( + run_ends.windows(2).all(|w| w[0] < w[1]), + "run-ends not strictly increasing" + ); + + if len != 0 { + assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends"); + let end = E::from_usize(offset.saturating_add(len)).unwrap(); + assert!( + *run_ends.first().unwrap() >= E::usize_as(0), + "run-ends not greater than 0" + ); + assert!( + *run_ends.last().unwrap() >= end, + "slice beyond bounds of run-ends" + ); + } + + Self { + run_ends, + offset, + len, + } + } + + /// Create a new [`RunEndBuffer`] from an [`ScalarBuffer`], an `offset` and `len` + /// + /// # Safety + /// + /// - `buffer` must contain strictly increasing values greater than zero + /// - The last value of `buffer` must be greater than or equal to `offset + len` + pub unsafe fn new_unchecked( + run_ends: ScalarBuffer, + offset: usize, + len: usize, + ) -> Self { + Self { + run_ends, + offset, + len, + } + } + + /// Returns the logical offset into the run-ends stored by this buffer + #[inline] + pub fn offset(&self) -> usize { + self.offset + } + + /// Returns the logical length of the run-ends stored by this buffer + #[inline] + pub fn len(&self) -> usize { + self.len + } + + /// Returns true if this buffer is empty + #[inline] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Returns the values of this [`RunEndBuffer`] not including any offset + #[inline] + pub fn values(&self) -> &[E] { + &self.run_ends + } + + /// Returns the maximum run-end encoded in the underlying buffer + #[inline] + pub fn max_value(&self) -> usize { + self.values().last().copied().unwrap_or_default().as_usize() + } + + /// Performs a binary search to find the physical index for the given logical index + /// + /// The result is arbitrary if `logical_index >= self.len()` + pub fn get_physical_index(&self, logical_index: usize) -> usize { + let logical_index = E::usize_as(self.offset + logical_index); + let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap(); + + match self.run_ends.binary_search_by(cmp) { + Ok(idx) => idx + 1, + Err(idx) => idx, + } + } + + /// Returns the physical index at which the logical array starts + pub fn get_start_physical_index(&self) -> usize { + if self.offset == 0 { + return 0; + } + // Fallback to binary search + self.get_physical_index(0) + } + + /// Returns the physical index at which the logical array ends + pub fn get_end_physical_index(&self) -> usize { + if self.max_value() == self.offset + self.len { + return self.values().len() - 1; + } + // Fallback to binary search + self.get_physical_index(self.len - 1) + } + + /// Slices this [`RunEndBuffer`] by the provided `offset` and `length` + pub fn slice(&self, offset: usize, len: usize) -> Self { + assert!( + offset.saturating_add(len) <= self.len, + "the length + offset of the sliced RunEndBuffer cannot exceed the existing length" + ); + Self { + run_ends: self.run_ends.clone(), + offset: self.offset + offset, + len, + } + } +} diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 75c48bebcf63..b57692749878 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -580,42 +580,30 @@ pub(crate) fn unslice_run_array(arr: ArrayData) -> Result fn into_zero_offset_run_array( run_array: RunArray, ) -> Result, ArrowError> { - if run_array.offset() == 0 - && run_array.len() == RunArray::::logical_len(run_array.run_ends()) - { + let run_ends = run_array.run_ends(); + if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() { return Ok(run_array); } + // The physical index of original run_ends array from which the `ArrayData`is sliced. - let start_physical_index = run_array - .get_zero_offset_physical_index(run_array.offset()) - .unwrap(); + let start_physical_index = run_ends.get_start_physical_index(); - // The logical length of original run_ends array until which the `ArrayData` is sliced. - let end_logical_index = run_array.offset() + run_array.len() - 1; // The physical index of original run_ends array until which the `ArrayData`is sliced. - let end_physical_index = run_array - .get_zero_offset_physical_index(end_logical_index) - .unwrap(); + let end_physical_index = run_ends.get_end_physical_index(); let physical_length = end_physical_index - start_physical_index + 1; - // build new run_ends array by subtrating offset from run ends. + // build new run_ends array by subtracting offset from run ends. + let offset = R::Native::usize_as(run_ends.offset()); let mut builder = BufferBuilder::::new(physical_length); - for ix in start_physical_index..end_physical_index { - let run_end_value = unsafe { - // Safety: - // start_physical_index and end_physical_index are within - // run_ends array bounds. - run_array.run_ends().value_unchecked(ix).as_usize() - }; - let run_end_value = run_end_value - run_array.offset(); - builder.append(R::Native::from_usize(run_end_value).unwrap()); + for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] { + builder.append(run_end_value.sub_wrapping(offset)); } builder.append(R::Native::from_usize(run_array.len()).unwrap()); let new_run_ends = unsafe { // Safety: // The function builds a valid run_ends array and hence need not be validated. - ArrayDataBuilder::new(run_array.run_ends().data_type().clone()) + ArrayDataBuilder::new(R::DATA_TYPE) .len(physical_length) .add_buffer(builder.finish()) .build_unchecked() diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs index 230eb9390f2f..0f248ee637b0 100644 --- a/arrow-ord/src/sort.rs +++ b/arrow-ord/src/sort.rs @@ -673,7 +673,7 @@ fn sort_run_downcasted( let new_run_ends = unsafe { // Safety: // The function builds a valid run_ends array and hence need not be validated. - ArrayDataBuilder::new(run_array.run_ends().data_type().clone()) + ArrayDataBuilder::new(R::DATA_TYPE) .len(new_physical_len) .add_buffer(new_run_ends_builder.finish()) .build_unchecked() @@ -746,7 +746,7 @@ where let mut remaining_len = output_len; - let run_ends = run_array.run_ends(); + let run_ends = run_array.run_ends().values(); assert_eq!( 0, @@ -770,22 +770,20 @@ where // and len, both of which are within bounds of run_array if physical_index == start_physical_index { ( - run_ends.value_unchecked(physical_index).as_usize() + run_ends.get_unchecked(physical_index).as_usize() - run_array.offset(), 0, ) } else if physical_index == end_physical_index { - let prev_run_end = - run_ends.value_unchecked(physical_index - 1).as_usize(); + let prev_run_end = run_ends.get_unchecked(physical_index - 1).as_usize(); ( run_array.offset() + run_array.len() - prev_run_end, prev_run_end - run_array.offset(), ) } else { - let prev_run_end = - run_ends.value_unchecked(physical_index - 1).as_usize(); + let prev_run_end = run_ends.get_unchecked(physical_index - 1).as_usize(); ( - run_ends.value_unchecked(physical_index).as_usize() - prev_run_end, + run_ends.get_unchecked(physical_index).as_usize() - prev_run_end, prev_run_end - run_array.offset(), ) } diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index 771a7eeb5c5a..1c55bc3ce528 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -2157,8 +2157,7 @@ mod tests { let take_out = take_run(&run_array, &take_indices).unwrap(); assert_eq!(take_out.len(), 7); - - assert_eq!(take_out.run_ends().len(), 5); + assert_eq!(take_out.run_ends().len(), 7); assert_eq!(take_out.run_ends().values(), &[1_i32, 3, 4, 5, 7]); let take_out_values = as_primitive_array::(take_out.values());