Skip to content

Commit

Permalink
Allow ColumnarValue to array conversion with less copying
Browse files Browse the repository at this point in the history
`ColumnarValue::into_array` takes ownership of the columnar value and so
requires copying data if the call site doesn't own the value.  This does
not matter in many cases, since the `into_array` internally will copy
data even more. It does matter however for the cases when to-array is
called with desired array length of 1, which may happen when UDF
implementation attempts to normalize arguments into arrays without
expanding them. This pattern can be seen in regexp functions, but can be
useful in downstream projects too.
  • Loading branch information
findepi committed Dec 4, 2024
1 parent c62ab39 commit 448f140
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 10 deletions.
20 changes: 19 additions & 1 deletion datafusion/expr-common/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,24 @@ impl ColumnarValue {
})
}

/// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
/// number of rows. [`Self::Scalar`] is converted by repeating the same
/// scalar multiple times which is not as efficient as handling the scalar
/// directly.
///
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
/// arrays of the same length.
///
/// # Errors
///
/// Errors if `self` is a Scalar that fails to be converted into an array of size
pub fn to_array(&self, num_rows: usize) -> Result<ArrayRef> {
Ok(match self {
ColumnarValue::Array(array) => Arc::clone(array),
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows)?,
})
}

/// Null columnar values are implemented as a null array in order to pass batch
/// num_rows
pub fn create_null_array(num_rows: usize) -> Self {
Expand Down Expand Up @@ -176,7 +194,7 @@ impl ColumnarValue {

let args = args
.iter()
.map(|arg| arg.clone().into_array(inferred_length))
.map(|arg| arg.to_array(inferred_length))
.collect::<Result<Vec<_>>>()?;

Ok(args)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions-nested/src/array_has.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl ScalarUDFImpl for ArrayHas {
match &args[1] {
ColumnarValue::Array(array_needle) => {
// the needle is already an array, convert the haystack to an array of the same length
let haystack = args[0].to_owned().into_array(array_needle.len())?;
let haystack = args[0].to_array(array_needle.len())?;
let array = array_has_inner_for_array(&haystack, array_needle)?;
Ok(ColumnarValue::Array(array))
}
Expand All @@ -118,7 +118,7 @@ impl ScalarUDFImpl for ArrayHas {
}

// since the needle is a scalar, convert it to an array of size 1
let haystack = args[0].to_owned().into_array(1)?;
let haystack = args[0].to_array(1)?;
let needle = scalar_needle.to_array_of_size(1)?;
let needle = Scalar::new(needle);
let array = array_has_inner_for_scalar(&haystack, &needle)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/regex/regexpcount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl ScalarUDFImpl for RegexpCountFunc {
let inferred_length = len.unwrap_or(1);
let args = args
.iter()
.map(|arg| arg.clone().into_array(inferred_length))
.map(|arg| arg.to_array(inferred_length))
.collect::<Result<Vec<_>>>()?;

let result = regexp_count_func(&args);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/regex/regexplike.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl ScalarUDFImpl for RegexpLikeFunc {
let inferred_length = len.unwrap_or(1);
let args = args
.iter()
.map(|arg| arg.clone().into_array(inferred_length))
.map(|arg| arg.to_array(inferred_length))
.collect::<Result<Vec<_>>>()?;

let result = regexp_like(&args);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/regex/regexpmatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl ScalarUDFImpl for RegexpMatchFunc {
let inferred_length = len.unwrap_or(1);
let args = args
.iter()
.map(|arg| arg.clone().into_array(inferred_length))
.map(|arg| arg.to_array(inferred_length))
.collect::<Result<Vec<_>>>()?;

let result = regexp_match_func(&args);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions/src/regex/regexpreplace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ pub fn specialize_regexp_replace<T: OffsetSizeTrait>(
Hint::AcceptsSingular => 1,
Hint::Pad => inferred_length,
};
arg.clone().into_array(expansion_len)
arg.to_array(expansion_len)
})
.collect::<Result<Vec<_>>>()?;
_regexp_replace_static_pattern_replace::<T>(&args)
Expand All @@ -586,7 +586,7 @@ pub fn specialize_regexp_replace<T: OffsetSizeTrait>(
(_, _, _, _) => {
let args = args
.iter()
.map(|arg| arg.clone().into_array(inferred_length))
.map(|arg| arg.to_array(inferred_length))
.collect::<Result<Vec<_>>>()?;

match args[0].data_type() {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
Hint::AcceptsSingular => 1,
Hint::Pad => inferred_length,
};
arg.clone().into_array(expansion_len)
arg.to_array(expansion_len)
})
.collect::<Result<Vec<_>>>()?;

Expand Down Expand Up @@ -152,7 +152,7 @@ pub mod test {
let result = func.invoke_with_args(datafusion_expr::ScalarFunctionArgs{args: $ARGS, number_rows: cardinality, return_type: &return_type});
assert_eq!(result.is_ok(), true, "function returned an error: {}", result.unwrap_err());

let result = result.unwrap().clone().into_array(cardinality).expect("Failed to convert to array");
let result = result.unwrap().to_array(cardinality).expect("Failed to convert to array");
let result = result.as_any().downcast_ref::<$ARRAY_TYPE>().expect("Failed to convert to type");

// value is correct
Expand Down

0 comments on commit 448f140

Please sign in to comment.