From 2e7fd6266ed115b930b6ddc5134c82b37c635366 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Sat, 7 Oct 2023 02:41:54 +0800 Subject: [PATCH] rewrite concat_internal (#7748) Signed-off-by: jayzhan211 --- .../physical-expr/src/array_expressions.rs | 82 ++++++++++--------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 34fbfc3c0269..790c0481e188 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -810,6 +810,7 @@ fn align_array_dimensions(args: Vec) -> Result> { aligned_args } +// Concatenate arrays on the same row. fn concat_internal(args: &[ArrayRef]) -> Result { let args = align_array_dimensions(args.to_vec())?; @@ -818,49 +819,56 @@ fn concat_internal(args: &[ArrayRef]) -> Result { // Assume number of rows is the same for all arrays let row_count = list_arrays[0].len(); - let capacity = Capacities::Array(list_arrays.iter().map(|a| a.len()).sum()); - let array_data: Vec<_> = list_arrays.iter().map(|a| a.to_data()).collect::>(); - let array_data: Vec<&ArrayData> = array_data.iter().collect(); - let mut mutable = MutableArrayData::with_capacities(array_data, true, capacity); - - let mut array_lens = vec![0; row_count]; - let mut null_bit_map: Vec = vec![true; row_count]; + let mut array_lengths = vec![]; + let mut arrays = vec![]; + let mut valid = BooleanBufferBuilder::new(row_count); + for i in 0..row_count { + let nulls = list_arrays + .iter() + .map(|arr| arr.is_null(i)) + .collect::>(); + + // If all the arrays are null, the concatenated array is null + let is_null = nulls.iter().all(|&x| x); + if is_null { + array_lengths.push(0); + valid.append(false); + } else { + // Get all the arrays on i-th row + let values = list_arrays + .iter() + .map(|arr| arr.value(i)) + .collect::>(); - for (i, array_len) in array_lens.iter_mut().enumerate().take(row_count) { - let null_count = mutable.null_count(); - for (j, a) in list_arrays.iter().enumerate() { - mutable.extend(j, i, i + 1); - *array_len += a.value_length(i); - } + let elements = values + .iter() + .map(|a| a.as_ref()) + .collect::>(); - // This means all arrays are null - if mutable.null_count() == null_count + list_arrays.len() { - null_bit_map[i] = false; + // Concatenated array on i-th row + let concated_array = arrow::compute::concat(elements.as_slice())?; + array_lengths.push(concated_array.len()); + arrays.push(concated_array); + valid.append(true); } } + // Assume all arrays have the same data type + let data_type = list_arrays[0].value_type().clone(); + let buffer = valid.finish(); - let mut buffer = BooleanBufferBuilder::new(row_count); - buffer.append_slice(null_bit_map.as_slice()); - let nulls = Some(NullBuffer::from(buffer.finish())); - - let offsets: Vec = std::iter::once(0) - .chain(array_lens.iter().scan(0, |state, &x| { - *state += x; - Some(*state) - })) - .collect(); - - let builder = mutable.into_builder(); - - let list = builder - .len(row_count) - .buffers(vec![Buffer::from_vec(offsets)]) - .nulls(nulls) - .build()?; - - let list = arrow::array::make_array(list); - Ok(Arc::new(list)) + let elements = arrays + .iter() + .map(|a| a.as_ref()) + .collect::>(); + + let list_arr = ListArray::new( + Arc::new(Field::new("item", data_type, true)), + OffsetBuffer::from_lengths(array_lengths), + Arc::new(arrow::compute::concat(elements.as_slice())?), + Some(NullBuffer::new(buffer)), + ); + Ok(Arc::new(list_arr)) } /// Array_concat/Array_cat SQL function