Skip to content

Commit

Permalink
Add array_reverse function to datafusion-function-* crate (#9630)
Browse files Browse the repository at this point in the history
* Add array_reverse function to datafusion-function-* crate

* fix ci
  • Loading branch information
Weijun-H authored Mar 16, 2024
1 parent e83efce commit 9eb7087
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 121 deletions.
6 changes: 0 additions & 6 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ pub enum BuiltinScalarFunction {
ArrayReplaceN,
/// array_replace_all
ArrayReplaceAll,
/// array_reverse
ArrayReverse,
/// array_intersect
ArrayIntersect,
/// array_union
Expand Down Expand Up @@ -289,7 +287,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplace => Volatility::Immutable,
BuiltinScalarFunction::ArrayReplaceN => Volatility::Immutable,
BuiltinScalarFunction::ArrayReplaceAll => Volatility::Immutable,
BuiltinScalarFunction::ArrayReverse => Volatility::Immutable,
BuiltinScalarFunction::ArrayIntersect => Volatility::Immutable,
BuiltinScalarFunction::ArrayUnion => Volatility::Immutable,
BuiltinScalarFunction::Ascii => Volatility::Immutable,
Expand Down Expand Up @@ -359,7 +356,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplace => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayReplaceN => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayReplaceAll => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayReverse => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayIntersect => {
match (input_expr_types[0].clone(), input_expr_types[1].clone()) {
(DataType::Null, DataType::Null) | (DataType::Null, _) => {
Expand Down Expand Up @@ -557,7 +553,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplaceAll => {
Signature::any(3, self.volatility())
}
BuiltinScalarFunction::ArrayReverse => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayIntersect => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayUnion => Signature::any(2, self.volatility()),

Expand Down Expand Up @@ -882,7 +877,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayReplaceAll => {
&["array_replace_all", "list_replace_all"]
}
BuiltinScalarFunction::ArrayReverse => &["array_reverse", "list_reverse"],
BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"],
BuiltinScalarFunction::ArrayIntersect => {
&["array_intersect", "list_intersect"]
Expand Down
6 changes: 0 additions & 6 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,12 +638,6 @@ scalar_expr!(
array from to,
"replaces all occurrences of the specified element with another specified element."
);
scalar_expr!(
ArrayReverse,
array_reverse,
array,
"reverses the order of elements in the array."
);
scalar_expr!(ArrayUnion, array_union, array1 array2, "returns an array of the elements in the union of array1 and array2 without duplicates.");

scalar_expr!(
Expand Down
69 changes: 69 additions & 0 deletions datafusion/functions-array/src/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,3 +1202,72 @@ pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
None,
)?))
}

/// array_reverse SQL function
pub fn array_reverse(arg: &[ArrayRef]) -> Result<ArrayRef> {
if arg.len() != 1 {
return exec_err!("array_reverse needs one argument");
}

match &arg[0].data_type() {
DataType::List(field) => {
let array = as_list_array(&arg[0])?;
general_array_reverse::<i32>(array, field)
}
DataType::LargeList(field) => {
let array = as_large_list_array(&arg[0])?;
general_array_reverse::<i64>(array, field)
}
DataType::Null => Ok(arg[0].clone()),
array_type => exec_err!("array_reverse does not support type '{array_type:?}'."),
}
}

fn general_array_reverse<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
field: &FieldRef,
) -> Result<ArrayRef>
where
O: TryFrom<i64>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());
let mut offsets = vec![O::usize_as(0)];
let mut nulls = vec![];
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], false, capacity);

for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
// skip the null value
if array.is_null(row_index) {
nulls.push(false);
offsets.push(offsets[row_index] + O::one());
mutable.extend(0, 0, 1);
continue;
} else {
nulls.push(true);
}

let start = offset_window[0];
let end = offset_window[1];

let mut index = end - O::one();
let mut cnt = 0;

while index >= start {
mutable.extend(0, index.to_usize().unwrap(), index.to_usize().unwrap() + 1);
index = index - O::one();
cnt += 1;
}
offsets.push(offsets[row_index] + O::usize_as(cnt));
}

let data = mutable.freeze();
Ok(Arc::new(GenericListArray::<O>::try_new(
field.clone(),
OffsetBuffer::<O>::new(offsets.into()),
arrow_array::make_array(data),
Some(nulls.into()),
)?))
}
2 changes: 2 additions & 0 deletions datafusion/functions-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub mod expr_fn {
pub use super::udf::array_ndims;
pub use super::udf::array_repeat;
pub use super::udf::array_resize;
pub use super::udf::array_reverse;
pub use super::udf::array_sort;
pub use super::udf::array_to_string;
pub use super::udf::cardinality;
Expand Down Expand Up @@ -100,6 +101,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
udf::array_distinct_udf(),
udf::array_repeat_udf(),
udf::array_resize_udf(),
udf::array_reverse_udf(),
];
functions.into_iter().try_for_each(|udf| {
let existing_udf = registry.register_udf(udf)?;
Expand Down
49 changes: 49 additions & 0 deletions datafusion/functions-array/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -882,3 +882,52 @@ impl ScalarUDFImpl for crate::udf::ArrayDistinct {
&self.aliases
}
}

make_udf_function!(
ArrayReverse,
array_reverse,
array,
"reverses the order of elements in the array.",
array_reverse_udf
);

#[derive(Debug)]
pub(super) struct ArrayReverse {
signature: Signature,
aliases: Vec<String>,
}

impl crate::udf::ArrayReverse {
pub fn new() -> Self {
Self {
signature: Signature::any(1, Volatility::Immutable),
aliases: vec!["array_reverse".to_string(), "list_reverse".to_string()],
}
}
}

impl ScalarUDFImpl for crate::udf::ArrayReverse {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"array_reserse"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
crate::kernels::array_reverse(&args).map(ColumnarValue::Array)
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}
69 changes: 0 additions & 69 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,72 +990,3 @@ pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
None,
)?))
}

/// array_reverse SQL function
pub fn array_reverse(arg: &[ArrayRef]) -> Result<ArrayRef> {
if arg.len() != 1 {
return exec_err!("array_reverse needs one argument");
}

match &arg[0].data_type() {
DataType::List(field) => {
let array = as_list_array(&arg[0])?;
general_array_reverse::<i32>(array, field)
}
DataType::LargeList(field) => {
let array = as_large_list_array(&arg[0])?;
general_array_reverse::<i64>(array, field)
}
DataType::Null => Ok(arg[0].clone()),
array_type => exec_err!("array_reverse does not support type '{array_type:?}'."),
}
}

fn general_array_reverse<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
field: &FieldRef,
) -> Result<ArrayRef>
where
O: TryFrom<i64>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());
let mut offsets = vec![O::usize_as(0)];
let mut nulls = vec![];
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], false, capacity);

for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
// skip the null value
if array.is_null(row_index) {
nulls.push(false);
offsets.push(offsets[row_index] + O::one());
mutable.extend(0, 0, 1);
continue;
} else {
nulls.push(true);
}

let start = offset_window[0];
let end = offset_window[1];

let mut index = end - O::one();
let mut cnt = 0;

while index >= start {
mutable.extend(0, index.to_usize().unwrap(), index.to_usize().unwrap() + 1);
index = index - O::one();
cnt += 1;
}
offsets.push(offsets[row_index] + O::usize_as(cnt));
}

let data = mutable.freeze();
Ok(Arc::new(GenericListArray::<O>::try_new(
field.clone(),
OffsetBuffer::<O>::new(offsets.into()),
arrow_array::make_array(data),
Some(nulls.into()),
)?))
}
3 changes: 0 additions & 3 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,6 @@ pub fn create_physical_fun(
BuiltinScalarFunction::ArrayReplaceAll => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_replace_all)(args)
}),
BuiltinScalarFunction::ArrayReverse => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_reverse)(args)
}),
BuiltinScalarFunction::ArrayIntersect => Arc::new(|args| {
make_scalar_function_inner(array_expressions::array_intersect)(args)
}),
Expand Down
22 changes: 11 additions & 11 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -661,23 +661,23 @@ enum ScalarFunction {
ArrayIntersect = 119;
ArrayUnion = 120;
OverLay = 121;
/// 122 is Range
// 122 is Range
ArrayExcept = 123;
// 124 was ArrayPopFront
Levenshtein = 125;
SubstrIndex = 126;
FindInSet = 127;
/// 128 was ArraySort
/// 129 was ArrayDistinct
/// 130 was ArrayResize
// 128 was ArraySort
// 129 was ArrayDistinct
// 130 was ArrayResize
EndsWith = 131;
/// 132 was InStr
/// 133 was MakeDate
ArrayReverse = 134;
/// 135 is RegexpLike
/// 136 was ToChar
/// 137 was ToDate
/// 138 was ToUnixtime
// 132 was InStr
// 133 was MakeDate
// 134 was ArrayReverse
// 135 is RegexpLike
// 136 was ToChar
// 137 was ToDate
// 138 was ToUnixtime
}

message ScalarFunctionNode {
Expand Down
3 changes: 0 additions & 3 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 12 additions & 14 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9eb7087

Please sign in to comment.