Skip to content

Commit

Permalink
Expose BitSliceIterator and BitIndexIterator (apache#1864)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jun 13, 2022
1 parent fb697ce commit 4af7ae1
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 102 deletions.
113 changes: 13 additions & 100 deletions arrow/src/compute/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::buffer::{buffer_bin_and, Buffer, MutableBuffer};
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
use crate::util::bit_chunk_iterator::{UnalignedBitChunk, UnalignedBitChunkIterator};
use crate::util::bit_iterator::{BitIndexIterator, BitSliceIterator};
use crate::util::bit_util;

/// If the filter selects more than this fraction of rows, use
Expand Down Expand Up @@ -72,91 +72,23 @@ macro_rules! downcast_dict_filter {
///
/// 2. Only performant for filters that copy across long contiguous runs
#[derive(Debug)]
pub struct SlicesIterator<'a> {
iter: UnalignedBitChunkIterator<'a>,
len: usize,
current_offset: i64,
current_chunk: u64,
}
pub struct SlicesIterator<'a>(BitSliceIterator<'a>);

impl<'a> SlicesIterator<'a> {
pub fn new(filter: &'a BooleanArray) -> Self {
let values = &filter.data_ref().buffers()[0];
let len = filter.len();
let chunk = UnalignedBitChunk::new(values.as_slice(), filter.offset(), len);
let mut iter = chunk.iter();

let current_offset = -(chunk.lead_padding() as i64);
let current_chunk = iter.next().unwrap_or(0);

Self {
iter,
len,
current_offset,
current_chunk,
}
}

/// Returns `Some((chunk_offset, bit_offset))` for the next chunk that has at
/// least one bit set, or None if there is no such chunk.
///
/// Where `chunk_offset` is the bit offset to the current `u64` chunk
/// and `bit_offset` is the offset of the first `1` bit in that chunk
fn advance_to_set_bit(&mut self) -> Option<(i64, u32)> {
loop {
if self.current_chunk != 0 {
// Find the index of the first 1
let bit_pos = self.current_chunk.trailing_zeros();
return Some((self.current_offset, bit_pos));
}
let offset = filter.offset();

self.current_chunk = self.iter.next()?;
self.current_offset += 64;
}
Self(BitSliceIterator::new(values, offset, len))
}
}

impl<'a> Iterator for SlicesIterator<'a> {
type Item = (usize, usize);

fn next(&mut self) -> Option<Self::Item> {
// Used as termination condition
if self.len == 0 {
return None;
}

let (start_chunk, start_bit) = self.advance_to_set_bit()?;

// Set bits up to start
self.current_chunk |= (1 << start_bit) - 1;

loop {
if self.current_chunk != u64::MAX {
// Find the index of the first 0
let end_bit = self.current_chunk.trailing_ones();

// Zero out up to end_bit
self.current_chunk &= !((1 << end_bit) - 1);

return Some((
(start_chunk + start_bit as i64) as usize,
(self.current_offset + end_bit as i64) as usize,
));
}

match self.iter.next() {
Some(next) => {
self.current_chunk = next;
self.current_offset += 64;
}
None => {
return Some((
(start_chunk + start_bit as i64) as usize,
std::mem::replace(&mut self.len, 0),
));
}
}
}
self.0.next()
}
}

Expand All @@ -165,47 +97,28 @@ impl<'a> Iterator for SlicesIterator<'a> {
/// This provides the best performance on most predicates, apart from those which keep
/// large runs and therefore favour [`SlicesIterator`]
struct IndexIterator<'a> {
current_chunk: u64,
chunk_offset: i64,
remaining: usize,
iter: UnalignedBitChunkIterator<'a>,
iter: BitIndexIterator<'a>,
}

impl<'a> IndexIterator<'a> {
fn new(filter: &'a BooleanArray, len: usize) -> Self {
fn new(filter: &'a BooleanArray, remaining: usize) -> Self {
assert_eq!(filter.null_count(), 0);
let data = filter.data();
let chunks =
UnalignedBitChunk::new(&data.buffers()[0], data.offset(), data.len());
let mut iter = chunks.iter();

let current_chunk = iter.next().unwrap_or(0);
let chunk_offset = -(chunks.lead_padding() as i64);

Self {
current_chunk,
chunk_offset,
remaining: len,
iter,
}
let iter = BitIndexIterator::new(&data.buffers()[0], data.offset(), data.len());
Self { remaining, iter }
}
}

impl<'a> Iterator for IndexIterator<'a> {
type Item = usize;

fn next(&mut self) -> Option<Self::Item> {
while self.remaining != 0 {
if self.current_chunk != 0 {
let bit_pos = self.current_chunk.trailing_zeros();
self.current_chunk ^= 1 << bit_pos;
self.remaining -= 1;
return Some((self.chunk_offset + bit_pos as i64) as usize);
}

if self.remaining != 0 {
let next = self.iter.next().expect("IndexIterator exhausted early");
self.remaining -= 1;
// Must panic if exhausted early as trusted length iterator
self.current_chunk = self.iter.next().expect("IndexIterator exhausted early");
self.chunk_offset += 64;
return Some(next);
}
None
}
Expand Down
6 changes: 4 additions & 2 deletions arrow/src/util/bit_chunk_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::fmt::Debug;

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand All @@ -16,7 +14,11 @@ use std::fmt::Debug;
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Types for iterating over bitmasks in 64-bit chunks
use crate::util::bit_util::ceil;
use std::fmt::Debug;

/// Iterates over an arbitrarily aligned byte buffer
///
Expand Down
160 changes: 160 additions & 0 deletions arrow/src/util/bit_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::util::bit_chunk_iterator::{UnalignedBitChunk, UnalignedBitChunkIterator};

/// Iterator of contiguous ranges of set bits within a provided packed bitmask
///
/// Returns `(usize, usize)` each representing an interval where the corresponding
/// bits in the provides mask are set
///
#[derive(Debug)]
pub struct BitSliceIterator<'a> {
iter: UnalignedBitChunkIterator<'a>,
len: usize,
current_offset: i64,
current_chunk: u64,
}

impl<'a> BitSliceIterator<'a> {
/// Create a new [`BitSliceIterator`] from the provide `buffer`,
/// and `offset` and `len` in bits
pub fn new(buffer: &'a [u8], offset: usize, len: usize) -> Self {
let chunk = UnalignedBitChunk::new(buffer, offset, len);
let mut iter = chunk.iter();

let current_offset = -(chunk.lead_padding() as i64);
let current_chunk = iter.next().unwrap_or(0);

Self {
iter,
len,
current_offset,
current_chunk,
}
}

/// Returns `Some((chunk_offset, bit_offset))` for the next chunk that has at
/// least one bit set, or None if there is no such chunk.
///
/// Where `chunk_offset` is the bit offset to the current `u64` chunk
/// and `bit_offset` is the offset of the first `1` bit in that chunk
fn advance_to_set_bit(&mut self) -> Option<(i64, u32)> {
loop {
if self.current_chunk != 0 {
// Find the index of the first 1
let bit_pos = self.current_chunk.trailing_zeros();
return Some((self.current_offset, bit_pos));
}

self.current_chunk = self.iter.next()?;
self.current_offset += 64;
}
}
}

impl<'a> Iterator for BitSliceIterator<'a> {
type Item = (usize, usize);

fn next(&mut self) -> Option<Self::Item> {
// Used as termination condition
if self.len == 0 {
return None;
}

let (start_chunk, start_bit) = self.advance_to_set_bit()?;

// Set bits up to start
self.current_chunk |= (1 << start_bit) - 1;

loop {
if self.current_chunk != u64::MAX {
// Find the index of the first 0
let end_bit = self.current_chunk.trailing_ones();

// Zero out up to end_bit
self.current_chunk &= !((1 << end_bit) - 1);

return Some((
(start_chunk + start_bit as i64) as usize,
(self.current_offset + end_bit as i64) as usize,
));
}

match self.iter.next() {
Some(next) => {
self.current_chunk = next;
self.current_offset += 64;
}
None => {
return Some((
(start_chunk + start_bit as i64) as usize,
std::mem::replace(&mut self.len, 0),
));
}
}
}
}
}

/// An iterator of `usize` whose index in a provided bitmask is true
///
/// This provides the best performance on most masks, apart from those which contain
/// large runs and therefore favour [`BitSliceIterator`]
#[derive(Debug)]
pub struct BitIndexIterator<'a> {
current_chunk: u64,
chunk_offset: i64,
iter: UnalignedBitChunkIterator<'a>,
}

impl<'a> BitIndexIterator<'a> {
/// Create a new [`BitIndexIterator`] from the provide `buffer`,
/// and `offset` and `len` in bits
pub fn new(buffer: &'a [u8], offset: usize, len: usize) -> Self {
let chunks = UnalignedBitChunk::new(buffer, offset, len);
let mut iter = chunks.iter();

let current_chunk = iter.next().unwrap_or(0);
let chunk_offset = -(chunks.lead_padding() as i64);

Self {
current_chunk,
chunk_offset,
iter,
}
}
}

impl<'a> Iterator for BitIndexIterator<'a> {
type Item = usize;

fn next(&mut self) -> Option<Self::Item> {
loop {
if self.current_chunk != 0 {
let bit_pos = self.current_chunk.trailing_zeros();
self.current_chunk ^= 1 << bit_pos;
return Some((self.chunk_offset + bit_pos as i64) as usize);
}

self.current_chunk = self.iter.next()?;
self.chunk_offset += 64;
}
}
}

// Note: tests located in filter module
1 change: 1 addition & 0 deletions arrow/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#[cfg(feature = "test_utils")]
pub mod bench_util;
pub mod bit_chunk_iterator;
pub mod bit_iterator;
pub(crate) mod bit_mask;
pub mod bit_util;
#[cfg(feature = "test_utils")]
Expand Down

0 comments on commit 4af7ae1

Please sign in to comment.