Skip to content

Commit

Permalink
Port ArrayExcept to functions-array subcrate (#9634)
Browse files Browse the repository at this point in the history
* Issue-9633 - Port ArrayExcept to functions-array subcrate

* Issue-9633 - Address review comment
  • Loading branch information
erenavsarogullari authored Mar 18, 2024
1 parent 7d3747c commit c072abb
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 131 deletions.
13 changes: 0 additions & 13 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ pub enum BuiltinScalarFunction {
ArrayReplaceN,
/// array_replace_all
ArrayReplaceAll,
/// array_except
ArrayExcept,

// string functions
/// ascii
Expand Down Expand Up @@ -270,7 +268,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Cbrt => Volatility::Immutable,
BuiltinScalarFunction::Cot => Volatility::Immutable,
BuiltinScalarFunction::Trunc => Volatility::Immutable,
BuiltinScalarFunction::ArrayExcept => Volatility::Immutable,
BuiltinScalarFunction::ArrayRemove => Volatility::Immutable,
BuiltinScalarFunction::ArrayRemoveN => Volatility::Immutable,
BuiltinScalarFunction::ArrayRemoveAll => Volatility::Immutable,
Expand Down Expand Up @@ -340,14 +337,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplace => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayReplaceN => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayReplaceAll => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayExcept => {
match (input_expr_types[0].clone(), input_expr_types[1].clone()) {
(DataType::Null, _) | (_, DataType::Null) => {
Ok(input_expr_types[0].clone())
}
(dt, _) => Ok(dt),
}
}
BuiltinScalarFunction::Ascii => Ok(Int32),
BuiltinScalarFunction::BitLength => {
utf8_to_int_type(&input_expr_types[0], "bit_length")
Expand Down Expand Up @@ -500,7 +489,6 @@ impl BuiltinScalarFunction {

// for now, the list is small, as we do not have many built-in functions.
match self {
BuiltinScalarFunction::ArrayExcept => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayRemove => {
Signature::array_and_element(self.volatility())
}
Expand Down Expand Up @@ -812,7 +800,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::FindInSet => &["find_in_set"],

// hashing functions
BuiltinScalarFunction::ArrayExcept => &["array_except", "list_except"],
BuiltinScalarFunction::ArrayRemove => &["array_remove", "list_remove"],
BuiltinScalarFunction::ArrayRemoveN => &["array_remove_n", "list_remove_n"],
BuiltinScalarFunction::ArrayRemoveAll => {
Expand Down
6 changes: 0 additions & 6 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,12 +584,6 @@ scalar_expr!(
scalar_expr!(Uuid, uuid, , "returns uuid v4 as a string value");
scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");

scalar_expr!(
ArrayExcept,
array_except,
first_array second_array,
"Returns an array of the elements that appear in the first array but not in the second."
);
scalar_expr!(
ArrayRemove,
array_remove,
Expand Down
163 changes: 163 additions & 0 deletions datafusion/functions-array/src/except.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! implementation kernel for array_except function
use crate::utils::check_datatypes;
use arrow::row::{RowConverter, SortField};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait};
use arrow_buffer::OffsetBuffer;
use arrow_schema::{DataType, FieldRef};
use datafusion_common::{exec_err, internal_err};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Expr;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;

make_udf_function!(
ArrayExcept,
array_except,
first_array second_array,
"returns an array of the elements that appear in the first array but not in the second.",
array_except_udf
);

#[derive(Debug)]
pub(super) struct ArrayExcept {
signature: Signature,
aliases: Vec<String>,
}

impl ArrayExcept {
pub fn new() -> Self {
Self {
signature: Signature::any(2, Volatility::Immutable),
aliases: vec!["array_except".to_string(), "list_except".to_string()],
}
}
}

impl ScalarUDFImpl for ArrayExcept {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_except"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
match (&arg_types[0].clone(), &arg_types[1].clone()) {
(DataType::Null, _) | (_, DataType::Null) => Ok(arg_types[0].clone()),
(dt, _) => Ok(dt.clone()),
}
}

fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
array_except_inner(&args).map(ColumnarValue::Array)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}

/// Array_except SQL function
pub fn array_except_inner(args: &[ArrayRef]) -> datafusion_common::Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("array_except needs two arguments");
}

let array1 = &args[0];
let array2 = &args[1];

match (array1.data_type(), array2.data_type()) {
(DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()),
(DataType::List(field), DataType::List(_)) => {
check_datatypes("array_except", &[array1, array2])?;
let list1 = array1.as_list::<i32>();
let list2 = array2.as_list::<i32>();
let result = general_except::<i32>(list1, list2, field)?;
Ok(Arc::new(result))
}
(DataType::LargeList(field), DataType::LargeList(_)) => {
check_datatypes("array_except", &[array1, array2])?;
let list1 = array1.as_list::<i64>();
let list2 = array2.as_list::<i64>();
let result = general_except::<i64>(list1, list2, field)?;
Ok(Arc::new(result))
}
(dt1, dt2) => {
internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
}
}
}

fn general_except<OffsetSize: OffsetSizeTrait>(
l: &GenericListArray<OffsetSize>,
r: &GenericListArray<OffsetSize>,
field: &FieldRef,
) -> datafusion_common::Result<GenericListArray<OffsetSize>> {
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;

let l_values = l.values().to_owned();
let r_values = r.values().to_owned();
let l_values = converter.convert_columns(&[l_values])?;
let r_values = converter.convert_columns(&[r_values])?;

let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
offsets.push(OffsetSize::usize_as(0));

let mut rows = Vec::with_capacity(l_values.num_rows());
let mut dedup = HashSet::new();

for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
for i in r_slice {
let right_row = r_values.row(i);
dedup.insert(right_row);
}
for i in l_slice {
let left_row = l_values.row(i);
if dedup.insert(left_row) {
rows.push(left_row);
}
}

offsets.push(OffsetSize::usize_as(rows.len()));
dedup.clear();
}

if let Some(values) = converter.convert_rows(rows)?.first() {
Ok(GenericListArray::<OffsetSize>::new(
field.to_owned(),
OffsetBuffer::new(offsets.into()),
values.to_owned(),
l.nulls().cloned(),
))
} else {
internal_err!("array_except failed to convert rows")
}
}
3 changes: 3 additions & 0 deletions datafusion/functions-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod macros;
mod array_has;
mod concat;
mod core;
mod except;
mod extract;
mod kernels;
mod position;
Expand All @@ -54,6 +55,7 @@ pub mod expr_fn {
pub use super::concat::array_concat;
pub use super::concat::array_prepend;
pub use super::core::make_array;
pub use super::except::array_except;
pub use super::extract::array_element;
pub use super::extract::array_pop_back;
pub use super::extract::array_pop_front;
Expand Down Expand Up @@ -92,6 +94,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
concat::array_append_udf(),
concat::array_prepend_udf(),
concat::array_concat_udf(),
except::array_except_udf(),
extract::array_element_udf(),
extract::array_pop_back_udf(),
extract::array_pop_front_udf(),
Expand Down
95 changes: 1 addition & 94 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@

//! Array expressions
use std::collections::HashSet;
use std::sync::Arc;

use arrow::array::*;
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Field};
use arrow::row::{RowConverter, SortField};
use arrow_buffer::NullBuffer;

use arrow_schema::FieldRef;
use datafusion_common::cast::{as_int64_array, as_large_list_array, as_list_array};
use datafusion_common::utils::array_into_list_array;
use datafusion_common::{exec_err, internal_err, plan_err, Result};
use datafusion_common::{exec_err, plan_err, Result};

/// Computes a BooleanArray indicating equality or inequality between elements in a list array and a specified element array.
///
Expand Down Expand Up @@ -132,19 +130,6 @@ fn compare_element_to_list(
Ok(res)
}

fn check_datatypes(name: &str, args: &[&ArrayRef]) -> Result<()> {
let data_type = args[0].data_type();
if !args.iter().all(|arg| {
arg.data_type().equals_datatype(data_type)
|| arg.data_type().equals_datatype(&DataType::Null)
}) {
let types = args.iter().map(|arg| arg.data_type()).collect::<Vec<_>>();
return plan_err!("{name} received incompatible types: '{types:?}'.");
}

Ok(())
}

/// Convert one or more [`ArrayRef`] of the same type into a
/// `ListArray` or 'LargeListArray' depending on the offset size.
///
Expand Down Expand Up @@ -260,84 +245,6 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result<ArrayRef> {
}
}

fn general_except<OffsetSize: OffsetSizeTrait>(
l: &GenericListArray<OffsetSize>,
r: &GenericListArray<OffsetSize>,
field: &FieldRef,
) -> Result<GenericListArray<OffsetSize>> {
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;

let l_values = l.values().to_owned();
let r_values = r.values().to_owned();
let l_values = converter.convert_columns(&[l_values])?;
let r_values = converter.convert_columns(&[r_values])?;

let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
offsets.push(OffsetSize::usize_as(0));

let mut rows = Vec::with_capacity(l_values.num_rows());
let mut dedup = HashSet::new();

for (l_w, r_w) in l.offsets().windows(2).zip(r.offsets().windows(2)) {
let l_slice = l_w[0].as_usize()..l_w[1].as_usize();
let r_slice = r_w[0].as_usize()..r_w[1].as_usize();
for i in r_slice {
let right_row = r_values.row(i);
dedup.insert(right_row);
}
for i in l_slice {
let left_row = l_values.row(i);
if dedup.insert(left_row) {
rows.push(left_row);
}
}

offsets.push(OffsetSize::usize_as(rows.len()));
dedup.clear();
}

if let Some(values) = converter.convert_rows(rows)?.first() {
Ok(GenericListArray::<OffsetSize>::new(
field.to_owned(),
OffsetBuffer::new(offsets.into()),
values.to_owned(),
l.nulls().cloned(),
))
} else {
internal_err!("array_except failed to convert rows")
}
}

pub fn array_except(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return exec_err!("array_except needs two arguments");
}

let array1 = &args[0];
let array2 = &args[1];

match (array1.data_type(), array2.data_type()) {
(DataType::Null, _) | (_, DataType::Null) => Ok(array1.to_owned()),
(DataType::List(field), DataType::List(_)) => {
check_datatypes("array_except", &[array1, array2])?;
let list1 = array1.as_list::<i32>();
let list2 = array2.as_list::<i32>();
let result = general_except::<i32>(list1, list2, field)?;
Ok(Arc::new(result))
}
(DataType::LargeList(field), DataType::LargeList(_)) => {
check_datatypes("array_except", &[array1, array2])?;
let list1 = array1.as_list::<i64>();
let list2 = array2.as_list::<i64>();
let result = general_except::<i64>(list1, list2, field)?;
Ok(Arc::new(result))
}
(dt1, dt2) => {
internal_err!("array_except got unexpected types: {dt1:?} and {dt2:?}")
}
}
}

/// For each element of `list_array[i]`, removed up to `arr_n[i]` occurences
/// of `element_array[i]`.
///
Expand Down
3 changes: 0 additions & 3 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,6 @@ pub fn create_physical_fun(
}

// array functions
BuiltinScalarFunction::ArrayExcept => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_except)(args)
}),
BuiltinScalarFunction::ArrayRemove => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_remove)(args)
}),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ enum ScalarFunction {
// 120 was ArrayUnion
OverLay = 121;
// 122 is Range
ArrayExcept = 123;
// 123 is ArrayExcept
// 124 was ArrayPopFront
Levenshtein = 125;
SubstrIndex = 126;
Expand Down
Loading

0 comments on commit c072abb

Please sign in to comment.