Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 16, 2024
1 parent 8d097d5 commit 5cb811b
Showing 1 changed file with 41 additions and 12 deletions.
53 changes: 41 additions & 12 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ impl PartitionBuffer {
}

/// Appends all rows of given batch into active array builders.
fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result<isize> {
fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time, partition_id: usize) -> Result<isize> {
let columns = batch.columns();
let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
self.append_rows(columns, &indices, time_metric)
self.append_rows(columns, &indices, time_metric, partition_id)
}

/// Appends rows of specified indices from columns into active array builders.
Expand All @@ -252,6 +252,7 @@ impl PartitionBuffer {
columns: &[ArrayRef],
indices: &[usize],
time_metric: &Time,
partition_id: usize,
) -> Result<isize> {
let mut mem_diff = 0;
let mut start = 0;
Expand All @@ -265,7 +266,7 @@ impl PartitionBuffer {
.iter_mut()
.zip(columns)
.for_each(|(builder, column)| {
append_columns(builder, column, &indices[start..end], column.data_type());
append_columns(builder, column, &indices[start..end], column.data_type(), partition_id);
});
self.num_active_rows += end - start;
if self.num_active_rows >= self.batch_size {
Expand Down Expand Up @@ -349,6 +350,7 @@ fn append_columns(
from: &Arc<dyn Array>,
indices: &[usize],
data_type: &DataType,
partition_id: usize,
) {
/// Append values from `from` to `to` using `indices`.
macro_rules! append {
Expand Down Expand Up @@ -405,6 +407,32 @@ fn append_columns(
}};
}

macro_rules! append_general_byte_dict {
($kt:ty, $builder:ty, $dict_array:ty) => {{
let t = to.as_any_mut().downcast_mut::<$builder>().unwrap();
let f = from
.as_any()
.downcast_ref::<DictionaryArray<$kt>>()
.unwrap()
.downcast_dict::<$dict_array>()
.unwrap();
println!("dt: {}", f.data_type());
for &i in indices {
if f.is_valid(i) {
let val = unsafe { f.keys().value_unchecked(i) };
let value_idx = val.as_usize();
println!("partition_id: {}, indices len: {}, i = {}, value_idx = {}, offset = {}", partition_id, indices.len(), i, value_idx, unsafe { f.values().value_offsets().get_unchecked(value_idx) });
println!("partition_id: {}, indices len: {}, i = {}, value_idx + 1 = {}, offset = {}", partition_id, indices.len(), i, value_idx + 1, unsafe { f.values().value_offsets().get_unchecked(value_idx + 1) });
println!("partition_id: {}, (end - start).to_usize() = {:?}", partition_id, (unsafe { f.values().value_offsets().get_unchecked(value_idx + 1) } - unsafe { f.values().value_offsets().get_unchecked(value_idx) }).to_usize());
println!("partition_id: {}, f.value({}): {:?}", partition_id, i, f.value(i));
t.append_value(f.value(i));
} else {
t.append_null();
}
}
}};
}

macro_rules! append_dict_helper {
($kt:ident, $ty:ty, $dict_array:ty) => {{
match $kt.as_ref() {
Expand Down Expand Up @@ -481,28 +509,28 @@ fn append_columns(
($kt:ident, $byte_type:ty, $array_type:ty) => {{
match $kt.as_ref() {
DataType::Int8 => {
append_dict!(Int8Type, GenericByteDictionaryBuilder<Int8Type, $byte_type>, $array_type)
append_general_byte_dict!(Int8Type, GenericByteDictionaryBuilder<Int8Type, $byte_type>, $array_type)
}
DataType::Int16 => {
append_dict!(Int16Type, GenericByteDictionaryBuilder<Int16Type, $byte_type>, $array_type)
append_general_byte_dict!(Int16Type, GenericByteDictionaryBuilder<Int16Type, $byte_type>, $array_type)
}
DataType::Int32 => {
append_dict!(Int32Type, GenericByteDictionaryBuilder<Int32Type, $byte_type>, $array_type)
append_general_byte_dict!(Int32Type, GenericByteDictionaryBuilder<Int32Type, $byte_type>, $array_type)
}
DataType::Int64 => {
append_dict!(Int64Type, GenericByteDictionaryBuilder<Int64Type, $byte_type>, $array_type)
append_general_byte_dict!(Int64Type, GenericByteDictionaryBuilder<Int64Type, $byte_type>, $array_type)
}
DataType::UInt8 => {
append_dict!(UInt8Type, GenericByteDictionaryBuilder<UInt8Type, $byte_type>, $array_type)
append_general_byte_dict!(UInt8Type, GenericByteDictionaryBuilder<UInt8Type, $byte_type>, $array_type)
}
DataType::UInt16 => {
append_dict!(UInt16Type, GenericByteDictionaryBuilder<UInt16Type, $byte_type>, $array_type)
append_general_byte_dict!(UInt16Type, GenericByteDictionaryBuilder<UInt16Type, $byte_type>, $array_type)
}
DataType::UInt32 => {
append_dict!(UInt32Type, GenericByteDictionaryBuilder<UInt32Type, $byte_type>, $array_type)
append_general_byte_dict!(UInt32Type, GenericByteDictionaryBuilder<UInt32Type, $byte_type>, $array_type)
}
DataType::UInt64 => {
append_dict!(UInt64Type, GenericByteDictionaryBuilder<UInt64Type, $byte_type>, $array_type)
append_general_byte_dict!(UInt64Type, GenericByteDictionaryBuilder<UInt64Type, $byte_type>, $array_type)
}
_ => unreachable!("Unknown key type for dictionary"),
}
Expand Down Expand Up @@ -774,6 +802,7 @@ impl ShuffleRepartitioner {
input.columns(),
&shuffled_partition_ids[start..end],
time_metric,
partition_id,
)?;
}

Expand Down Expand Up @@ -801,7 +830,7 @@ impl ShuffleRepartitioner {
);

let output = &mut buffered_partitions[0];
output.append_batch(&input, time_metric)?;
output.append_batch(&input, time_metric, 0)?;
}
other => {
// this should be unreachable as long as the validation logic
Expand Down

0 comments on commit 5cb811b

Please sign in to comment.