Skip to content

Commit

Permalink
chore: Reserve memory for native shuffle writer per partition
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Sep 30, 2024
1 parent 22561c4 commit d063f15
Showing 1 changed file with 80 additions and 30 deletions.
110 changes: 80 additions & 30 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,58 +206,93 @@ struct PartitionBuffer {
/// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`,
/// the active array builders will be frozen and appended to frozen buffer `frozen`.
batch_size: usize,
/// Memory reservation for this partition buffer.
reservation: MemoryReservation,
}

impl PartitionBuffer {
fn new(schema: SchemaRef, batch_size: usize) -> Self {
fn new(
schema: SchemaRef,
batch_size: usize,
partition_id: usize,
runtime: &Arc<RuntimeEnv>,
) -> Self {
let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", partition_id))
.with_can_spill(true)
.register(&runtime.memory_pool);

Self {
schema,
frozen: vec![],
active: vec![],
active_slots_mem_size: 0,
num_active_rows: 0,
batch_size,
reservation,
}
}

/// Initializes active builders if necessary.
fn init_active_if_necessary(&mut self) -> Result<isize> {
async fn init_active_if_necessary(
&mut self,
repartitioner: &ShuffleRepartitioner,
) -> Result<isize> {
let mut mem_diff = 0;

if self.active.is_empty() {
self.active = new_array_builders(&self.schema, self.batch_size);
// Estimate the memory size of active builders
if self.active_slots_mem_size == 0 {
self.active_slots_mem_size = self
.active
.schema
.fields()
.iter()
.zip(self.schema.fields())
.map(|(_ab, field)| slot_size(self.batch_size, field.data_type()))
.map(|field| slot_size(self.batch_size, field.data_type()))
.sum::<usize>();
}

if self
.reservation
.try_grow(self.active_slots_mem_size)
.is_err()
{
repartitioner.spill().await?;
self.reservation.free();
self.reservation.try_grow(self.active_slots_mem_size)?;
}

self.active = new_array_builders(&self.schema, self.batch_size);

mem_diff += self.active_slots_mem_size as isize;
}
Ok(mem_diff)
}

/// Appends all rows of given batch into active array builders.
fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result<isize> {
async fn append_batch(
&mut self,
batch: &RecordBatch,
time_metric: &Time,
repartitioner: &ShuffleRepartitioner,
) -> Result<isize> {
let columns = batch.columns();
let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
self.append_rows(columns, &indices, time_metric)
self.append_rows(columns, &indices, time_metric, repartitioner)
.await
}

/// Appends rows of specified indices from columns into active array builders.
fn append_rows(
async fn append_rows(
&mut self,
columns: &[ArrayRef],
indices: &[usize],
time_metric: &Time,
repartitioner: &ShuffleRepartitioner,
) -> Result<isize> {
let mut mem_diff = 0;
let mut start = 0;

// lazy init because some partition may be empty
mem_diff += self.init_active_if_necessary()?;
mem_diff += self.init_active_if_necessary(repartitioner).await?;

while start < indices.len() {
let end = (start + self.batch_size).min(indices.len());
Expand All @@ -273,7 +308,7 @@ impl PartitionBuffer {
mem_diff += self.flush()?;
timer.stop();

mem_diff += self.init_active_if_necessary()?;
mem_diff += self.init_active_if_necessary(repartitioner).await?;
}
start = end;
}
Expand Down Expand Up @@ -650,7 +685,14 @@ impl ShuffleRepartitioner {
schema: Arc::clone(&schema),
buffered_partitions: Mutex::new(
(0..num_output_partitions)
.map(|_| PartitionBuffer::new(Arc::clone(&schema), batch_size))
.map(|partition_id| {
PartitionBuffer::new(
Arc::clone(&schema),
batch_size,
partition_id,
&runtime,
)
})
.collect::<Vec<_>>(),
),
spills: Mutex::new(vec![]),
Expand Down Expand Up @@ -770,26 +812,34 @@ impl ShuffleRepartitioner {

// If the range of indices is not big enough, just appending the rows into
// active array builders instead of directly adding them as a record batch.
mem_diff += output.append_rows(
input.columns(),
&shuffled_partition_ids[start..end],
time_metric,
)?;
}
mem_diff += output
.append_rows(
input.columns(),
&shuffled_partition_ids[start..end],
time_metric,
self,
)
.await?;

if mem_diff > 0 {
let mem_increase = mem_diff as usize;
if self.reservation.try_grow(mem_increase).is_err() {
self.spill().await?;
self.reservation.free();
self.reservation.try_grow(mem_increase)?;

mem_diff = 0;
}
}

if mem_diff < 0 {
let mem_used = self.reservation.size();
let mem_decrease = mem_used.min(-mem_diff as usize);
self.reservation.shrink(mem_decrease);

if mem_diff > 0 {
let mem_increase = mem_diff as usize;
if self.reservation.try_grow(mem_increase).is_err() {
self.spill().await?;
self.reservation.free();
self.reservation.try_grow(mem_increase)?;
mem_diff += mem_decrease as isize;
}
}
if mem_diff < 0 {
let mem_used = self.reservation.size();
let mem_decrease = mem_used.min(-mem_diff as usize);
self.reservation.shrink(mem_decrease);
}
}
Partitioning::UnknownPartitioning(n) if *n == 1 => {
let mut buffered_partitions = self.buffered_partitions.lock().await;
Expand All @@ -801,7 +851,7 @@ impl ShuffleRepartitioner {
);

let output = &mut buffered_partitions[0];
output.append_batch(&input, time_metric)?;
output.append_batch(&input, time_metric, self).await?;
}
other => {
// this should be unreachable as long as the validation logic
Expand Down

0 comments on commit d063f15

Please sign in to comment.