Skip to content

Commit

Permalink
Use upstream is_null and is_not_null kernels
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 5, 2024
1 parent 705103e commit 80c24dc
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 72 deletions.
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/is_not_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl PhysicalExpr for IsNotNullExpr {
let arg = self.arg.evaluate(batch)?;
match arg {
ColumnarValue::Array(array) => {
let is_not_null = super::is_null::compute_is_not_null(array)?;
let is_not_null = arrow::compute::is_not_null(&array)?;
Ok(ColumnarValue::Array(Arc::new(is_not_null)))
}
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
Expand Down
77 changes: 6 additions & 71 deletions datafusion/physical-expr/src/expressions/is_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};

use arrow::compute;
use arrow::{
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};
use arrow_array::{Array, ArrayRef, BooleanArray, Int8Array, UnionArray};
use arrow_buffer::{BooleanBuffer, ScalarBuffer};
use arrow_ord::cmp;

use crate::physical_expr::down_cast_any_ref;
use crate::PhysicalExpr;
Expand Down Expand Up @@ -77,9 +73,9 @@ impl PhysicalExpr for IsNullExpr {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let arg = self.arg.evaluate(batch)?;
match arg {
ColumnarValue::Array(array) => {
Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?)))
}
ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
arrow::compute::is_null(&array)?,
))),
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
ScalarValue::Boolean(Some(scalar.is_null())),
)),
Expand All @@ -103,65 +99,6 @@ impl PhysicalExpr for IsNullExpr {
}
}

/// workaround <https://github.com/apache/arrow-rs/issues/6017>,
/// this can be replaced with a direct call to `arrow::compute::is_null` once it's fixed.
pub(crate) fn compute_is_null(array: ArrayRef) -> Result<BooleanArray> {
if let Some(union_array) = array.as_any().downcast_ref::<UnionArray>() {
if let Some(offsets) = union_array.offsets() {
dense_union_is_null(union_array, offsets)
} else {
sparse_union_is_null(union_array)
}
} else {
compute::is_null(array.as_ref()).map_err(Into::into)
}
}

/// workaround <https://github.com/apache/arrow-rs/issues/6017>,
/// this can be replaced with a direct call to `arrow::compute::is_not_null` once it's fixed.
pub(crate) fn compute_is_not_null(array: ArrayRef) -> Result<BooleanArray> {
if array.as_any().is::<UnionArray>() {
compute::not(&compute_is_null(array)?).map_err(Into::into)
} else {
compute::is_not_null(array.as_ref()).map_err(Into::into)
}
}

fn dense_union_is_null(
union_array: &UnionArray,
offsets: &ScalarBuffer<i32>,
) -> Result<BooleanArray> {
let child_arrays = (0..union_array.type_names().len())
.map(|type_id| {
compute::is_null(&union_array.child(type_id as i8)).map_err(Into::into)
})
.collect::<Result<Vec<BooleanArray>>>()?;

let buffer: BooleanBuffer = offsets
.iter()
.zip(union_array.type_ids())
.map(|(offset, type_id)| child_arrays[*type_id as usize].value(*offset as usize))
.collect();

Ok(BooleanArray::new(buffer, None))
}

fn sparse_union_is_null(union_array: &UnionArray) -> Result<BooleanArray> {
let type_ids = Int8Array::new(union_array.type_ids().clone(), None);

let mut union_is_null =
BooleanArray::new(BooleanBuffer::new_unset(union_array.len()), None);
for type_id in 0..union_array.type_names().len() {
let type_id = type_id as i8;
let union_is_child = cmp::eq(&type_ids, &Int8Array::new_scalar(type_id))?;
let child = union_array.child(type_id);
let child_array_is_null = compute::is_null(&child)?;
let child_is_null = compute::and(&union_is_child, &child_array_is_null)?;
union_is_null = compute::or(&union_is_null, &child_is_null)?;
}
Ok(union_is_null)
}

impl PartialEq<dyn Any> for IsNullExpr {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
Expand All @@ -184,7 +121,7 @@ mod tests {
array::{BooleanArray, StringArray},
datatypes::*,
};
use arrow_array::{Float64Array, Int32Array};
use arrow_array::{Array, Float64Array, Int32Array, UnionArray};
use arrow_buffer::ScalarBuffer;
use datafusion_common::cast::as_boolean_array;

Expand Down Expand Up @@ -243,8 +180,7 @@ mod tests {
let array =
UnionArray::try_new(union_fields(), type_ids, None, children).unwrap();

let array_ref = Arc::new(array) as ArrayRef;
let result = compute_is_null(array_ref).unwrap();
let result = arrow::compute::is_null(&array).unwrap();

let expected =
&BooleanArray::from(vec![false, true, false, false, true, true, false]);
Expand Down Expand Up @@ -272,8 +208,7 @@ mod tests {
UnionArray::try_new(union_fields(), type_ids, Some(offsets), children)
.unwrap();

let array_ref = Arc::new(array) as ArrayRef;
let result = compute_is_null(array_ref).unwrap();
let result = arrow::compute::is_null(&array).unwrap();

let expected = &BooleanArray::from(vec![false, true, false, true, false, true]);
assert_eq!(expected, &result);
Expand Down

0 comments on commit 80c24dc

Please sign in to comment.