Skip to content

Commit

Permalink
Add option to pass in field name to create array to support retaining…
Browse files Browse the repository at this point in the history
… field name during cast
  • Loading branch information
timsaucer committed Nov 18, 2024
1 parent be19afc commit a68ef01
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 41 deletions.
61 changes: 37 additions & 24 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,7 @@ impl ScalarValue {
} else {
Self::iter_to_array(values.iter().cloned()).unwrap()
};
Arc::new(array_into_list_array(values, nullable))
Arc::new(array_into_list_array(values, nullable, None))
}

/// Same as [`ScalarValue::new_list`] but with nullable set to true.
Expand Down Expand Up @@ -2146,7 +2146,7 @@ impl ScalarValue {
} else {
Self::iter_to_array(values).unwrap()
};
Arc::new(array_into_list_array(values, nullable))
Arc::new(array_into_list_array(values, nullable, None))
}

/// Converts `Vec<ScalarValue>` where each element has type corresponding to
Expand Down Expand Up @@ -2183,7 +2183,7 @@ impl ScalarValue {
} else {
Self::iter_to_array(values.iter().cloned()).unwrap()
};
Arc::new(array_into_large_list_array(values))
Arc::new(array_into_large_list_array(values, None))
}

/// Converts a scalar value into an array of `size` rows.
Expand Down Expand Up @@ -2663,27 +2663,36 @@ impl ScalarValue {
let list_array = array.as_list::<i32>();
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let arr =
Arc::new(array_into_list_array(nested_array, field.is_nullable()));
let arr = Arc::new(array_into_list_array(
nested_array,
field.is_nullable(),
Some(field.name()),
));

ScalarValue::List(arr)
}
DataType::LargeList(_) => {
DataType::LargeList(field) => {
let list_array = as_large_list_array(array);
let nested_array = list_array.value(index);
// Produces a single element `LargeListArray` with the value at `index`.
let arr = Arc::new(array_into_large_list_array(nested_array));
let arr = Arc::new(array_into_large_list_array(
nested_array,
Some(field.name()),
));

ScalarValue::LargeList(arr)
}
// TODO: There is no test for FixedSizeList now, add it later
DataType::FixedSizeList(_, _) => {
DataType::FixedSizeList(field, _) => {
let list_array = as_fixed_size_list_array(array)?;
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let list_size = nested_array.len();
let arr =
Arc::new(array_into_fixed_size_list_array(nested_array, list_size));
let arr = Arc::new(array_into_fixed_size_list_array(
nested_array,
list_size,
Some(field.name()),
));

ScalarValue::FixedSizeList(arr)
}
Expand Down Expand Up @@ -4060,11 +4069,10 @@ mod tests {

let result = ScalarValue::new_list_nullable(scalars.as_slice(), &DataType::Utf8);

let expected = array_into_list_array_nullable(Arc::new(StringArray::from(vec![
"rust",
"arrow",
"data-fusion",
])));
let expected = array_into_list_array_nullable(
Arc::new(StringArray::from(vec!["rust", "arrow", "data-fusion"])),
None,
);
assert_eq!(*result, expected);
}

Expand Down Expand Up @@ -4272,12 +4280,14 @@ mod tests {

#[test]
fn iter_to_array_string_test() {
let arr1 = array_into_list_array_nullable(Arc::new(StringArray::from(vec![
"foo", "bar", "baz",
])));
let arr2 = array_into_list_array_nullable(Arc::new(StringArray::from(vec![
"rust", "world",
])));
let arr1 = array_into_list_array_nullable(
Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
None,
);
let arr2 = array_into_list_array_nullable(
Arc::new(StringArray::from(vec!["rust", "world"])),
None,
);

let scalars = vec![
ScalarValue::List(Arc::new(arr1)),
Expand Down Expand Up @@ -5734,13 +5744,16 @@ mod tests {
// Define list-of-structs scalars

let nl0_array = ScalarValue::iter_to_array(vec![s0, s1.clone()]).unwrap();
let nl0 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl0_array)));
let nl0 =
ScalarValue::List(Arc::new(array_into_list_array_nullable(nl0_array, None)));

let nl1_array = ScalarValue::iter_to_array(vec![s2]).unwrap();
let nl1 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl1_array)));
let nl1 =
ScalarValue::List(Arc::new(array_into_list_array_nullable(nl1_array, None)));

let nl2_array = ScalarValue::iter_to_array(vec![s1]).unwrap();
let nl2 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl2_array)));
let nl2 =
ScalarValue::List(Arc::new(array_into_list_array_nullable(nl2_array, None)));

// iter_to_array for list-of-struct
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
Expand Down
37 changes: 30 additions & 7 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,18 +324,29 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
/// The field in the list array is nullable.
pub fn array_into_list_array_nullable(arr: ArrayRef) -> ListArray {
array_into_list_array(arr, true)
pub fn array_into_list_array_nullable(
arr: ArrayRef,
field_name: Option<&str>,
) -> ListArray {
array_into_list_array(arr, true, field_name)
}

/// Array Utils
/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
pub fn array_into_list_array(arr: ArrayRef, nullable: bool) -> ListArray {
pub fn array_into_list_array(
arr: ArrayRef,
nullable: bool,
field_name: Option<&str>,
) -> ListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
Arc::new(Field::new_list_field(arr.data_type().to_owned(), nullable)),
Arc::new(Field::new(
field_name.unwrap_or("item"),
arr.data_type().to_owned(),
nullable,
)),
offsets,
arr,
None,
Expand All @@ -344,10 +355,17 @@ pub fn array_into_list_array(arr: ArrayRef, nullable: bool) -> ListArray {

/// Wrap an array into a single element `LargeListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
pub fn array_into_large_list_array(arr: ArrayRef) -> LargeListArray {
pub fn array_into_large_list_array(
arr: ArrayRef,
field_name: Option<&str>,
) -> LargeListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
LargeListArray::new(
Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)),
Arc::new(Field::new(
field_name.unwrap_or("item"),
arr.data_type().to_owned(),
true,
)),
offsets,
arr,
None,
Expand All @@ -357,10 +375,15 @@ pub fn array_into_large_list_array(arr: ArrayRef) -> LargeListArray {
pub fn array_into_fixed_size_list_array(
arr: ArrayRef,
list_size: usize,
field_name: Option<&str>,
) -> FixedSizeListArray {
let list_size = list_size as i32;
FixedSizeListArray::new(
Arc::new(Field::new_list_field(arr.data_type().to_owned(), true)),
Arc::new(Field::new(
field_name.unwrap_or("item"),
arr.data_type().to_owned(),
true,
)),
list_size,
arr,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let set = self.0.take();
let arr = set.into_state();
let list = Arc::new(array_into_list_array_nullable(arr));
let list = Arc::new(array_into_list_array_nullable(arr, None));
Ok(vec![ScalarValue::List(list)])
}

Expand Down Expand Up @@ -109,7 +109,7 @@ impl Accumulator for BytesViewDistinctCountAccumulator {
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let set = self.0.take();
let arr = set.into_state();
let list = Arc::new(array_into_list_array_nullable(arr));
let list = Arc::new(array_into_list_array_nullable(arr, None));
Ok(vec![ScalarValue::List(list)])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ where
PrimitiveArray::<T>::from_iter_values(self.values.iter().cloned())
.with_data_type(self.data_type.clone()),
);
let list = Arc::new(array_into_list_array_nullable(arr));
let list = Arc::new(array_into_list_array_nullable(arr, None));
Ok(vec![ScalarValue::List(list)])
}

Expand Down Expand Up @@ -160,7 +160,7 @@ where
let arr = Arc::new(PrimitiveArray::<T>::from_iter_values(
self.values.iter().map(|v| v.0),
)) as ArrayRef;
let list = Arc::new(array_into_list_array_nullable(arr));
let list = Arc::new(array_into_list_array_nullable(arr, None));
Ok(vec![ScalarValue::List(list)])
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl Accumulator for ArrayAggAccumulator {
}

let concated_array = arrow::compute::concat(&element_arrays)?;
let list_array = array_into_list_array_nullable(concated_array);
let list_array = array_into_list_array_nullable(concated_array, None);

Ok(ScalarValue::List(Arc::new(list_array)))
}
Expand Down Expand Up @@ -534,6 +534,7 @@ impl OrderSensitiveArrayAggAccumulator {
StructArray::try_new(struct_field, column_wise_ordering_values, None)?;
Ok(ScalarValue::List(Arc::new(array_into_list_array_nullable(
Arc::new(ordering_array),
None,
))))
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions-aggregate/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ impl NthValueAccumulator {

Ok(ScalarValue::List(Arc::new(array_into_list_array_nullable(
Arc::new(ordering_array),
None,
))))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-nested/src/make_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
let length = arrays.iter().map(|a| a.len()).sum();
// By default Int64
let array = new_null_array(&DataType::Int64, length);
Ok(Arc::new(array_into_list_array_nullable(array)))
Ok(Arc::new(array_into_list_array_nullable(array, None)))
}
LargeList(..) => array_array::<i64>(arrays, data_type),
_ => array_array::<i32>(arrays, data_type),
Expand Down
11 changes: 7 additions & 4 deletions datafusion/functions-nested/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,12 @@ mod tests {
]));

let array2d_1 = Arc::new(array_into_list_array_nullable(
Arc::clone(&array1d_1) as ArrayRef
Arc::clone(&array1d_1) as ArrayRef,
None,
)) as ArrayRef;
let array2d_2 = Arc::new(array_into_list_array_nullable(
Arc::clone(&array1d_2) as ArrayRef
Arc::clone(&array1d_2) as ArrayRef,
None,
)) as ArrayRef;

let res = align_array_dimensions::<i32>(vec![
Expand All @@ -310,8 +312,9 @@ mod tests {
expected_dim
);

let array3d_1 = Arc::new(array_into_list_array_nullable(array2d_1)) as ArrayRef;
let array3d_2 = array_into_list_array_nullable(array2d_2.to_owned());
let array3d_1 =
Arc::new(array_into_list_array_nullable(array2d_1, None)) as ArrayRef;
let array3d_2 = array_into_list_array_nullable(array2d_2.to_owned(), None);
let res =
align_array_dimensions::<i32>(vec![array1d_1, Arc::new(array3d_2)]).unwrap();

Expand Down

0 comments on commit a68ef01

Please sign in to comment.