diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 9668359fc0..1802d3eb06 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -240,10 +240,10 @@ impl PartitionBuffer { } /// Appends all rows of given batch into active array builders. - fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result { + fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time, partition_id: usize) -> Result { let columns = batch.columns(); let indices = (0..batch.num_rows()).collect::>(); - self.append_rows(columns, &indices, time_metric) + self.append_rows(columns, &indices, time_metric, partition_id) } /// Appends rows of specified indices from columns into active array builders. @@ -252,6 +252,7 @@ impl PartitionBuffer { columns: &[ArrayRef], indices: &[usize], time_metric: &Time, + partition_id: usize, ) -> Result { let mut mem_diff = 0; let mut start = 0; @@ -265,7 +266,7 @@ impl PartitionBuffer { .iter_mut() .zip(columns) .for_each(|(builder, column)| { - append_columns(builder, column, &indices[start..end], column.data_type()); + append_columns(builder, column, &indices[start..end], column.data_type(), partition_id); }); self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { @@ -349,6 +350,7 @@ fn append_columns( from: &Arc, indices: &[usize], data_type: &DataType, + partition_id: usize, ) { /// Append values from `from` to `to` using `indices`. macro_rules! append { @@ -405,6 +407,32 @@ fn append_columns( }}; } + macro_rules! append_general_byte_dict { + ($kt:ty, $builder:ty, $dict_array:ty) => {{ + let t = to.as_any_mut().downcast_mut::<$builder>().unwrap(); + let f = from + .as_any() + .downcast_ref::>() + .unwrap() + .downcast_dict::<$dict_array>() + .unwrap(); + println!("dt: {}", f.data_type()); + for &i in indices { + if f.is_valid(i) { + let val = unsafe { f.keys().value_unchecked(i) }; + let value_idx = val.as_usize(); + println!("partition_id: {}, indices len: {}, i = {}, value_idx = {}, offset = {}", partition_id, indices.len(), i, value_idx, unsafe { f.values().value_offsets().get_unchecked(value_idx) }); + println!("partition_id: {}, indices len: {}, i = {}, value_idx + 1 = {}, offset = {}", partition_id, indices.len(), i, value_idx + 1, unsafe { f.values().value_offsets().get_unchecked(value_idx + 1) }); + println!("partition_id: {}, (end - start).to_usize() = {:?}", partition_id, (unsafe { f.values().value_offsets().get_unchecked(value_idx + 1) } - unsafe { f.values().value_offsets().get_unchecked(value_idx) }).to_usize()); + println!("partition_id: {}, f.value({}): {:?}", partition_id, i, f.value(i)); + t.append_value(f.value(i)); + } else { + t.append_null(); + } + } + }}; + } + macro_rules! append_dict_helper { ($kt:ident, $ty:ty, $dict_array:ty) => {{ match $kt.as_ref() { @@ -481,28 +509,28 @@ fn append_columns( ($kt:ident, $byte_type:ty, $array_type:ty) => {{ match $kt.as_ref() { DataType::Int8 => { - append_dict!(Int8Type, GenericByteDictionaryBuilder, $array_type) + append_general_byte_dict!(Int8Type, GenericByteDictionaryBuilder, $array_type) } DataType::Int16 => { - append_dict!(Int16Type, GenericByteDictionaryBuilder, $array_type) + append_general_byte_dict!(Int16Type, GenericByteDictionaryBuilder, $array_type) } DataType::Int32 => { - append_dict!(Int32Type, GenericByteDictionaryBuilder, $array_type) + append_general_byte_dict!(Int32Type, GenericByteDictionaryBuilder, $array_type) } DataType::Int64 => { - append_dict!(Int64Type, GenericByteDictionaryBuilder, $array_type) + append_general_byte_dict!(Int64Type, GenericByteDictionaryBuilder, $array_type) } DataType::UInt8 => { - append_dict!(UInt8Type, GenericByteDictionaryBuilder, $array_type) + append_general_byte_dict!(UInt8Type, GenericByteDictionaryBuilder, $array_type) } DataType::UInt16 => { - append_dict!(UInt16Type, GenericByteDictionaryBuilder, $array_type) + append_general_byte_dict!(UInt16Type, GenericByteDictionaryBuilder, $array_type) } DataType::UInt32 => { - append_dict!(UInt32Type, GenericByteDictionaryBuilder, $array_type) + append_general_byte_dict!(UInt32Type, GenericByteDictionaryBuilder, $array_type) } DataType::UInt64 => { - append_dict!(UInt64Type, GenericByteDictionaryBuilder, $array_type) + append_general_byte_dict!(UInt64Type, GenericByteDictionaryBuilder, $array_type) } _ => unreachable!("Unknown key type for dictionary"), } @@ -774,6 +802,7 @@ impl ShuffleRepartitioner { input.columns(), &shuffled_partition_ids[start..end], time_metric, + partition_id, )?; } @@ -801,7 +830,7 @@ impl ShuffleRepartitioner { ); let output = &mut buffered_partitions[0]; - output.append_batch(&input, time_metric)?; + output.append_batch(&input, time_metric, 0)?; } other => { // this should be unreachable as long as the validation logic