From a47729cef90f1160e0a4d66f61ec84975b576712 Mon Sep 17 00:00:00 2001 From: Namgung Chan <33323415+getChan@users.noreply.github.com> Date: Sun, 29 Dec 2024 22:11:47 +0900 Subject: [PATCH] fix RecordBatch size in hash join (#13916) --- datafusion/physical-plan/src/joins/hash_join.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index a0fe0bd116ee..4e0b0bf820f2 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -71,6 +71,7 @@ use datafusion_physical_expr::equivalence::{ }; use datafusion_physical_expr::PhysicalExprRef; +use crate::spill::get_record_batch_memory_size; use ahash::RandomState; use datafusion_expr::Operator; use datafusion_physical_expr_common::datum::compare_op_for_nested; @@ -921,7 +922,7 @@ async fn collect_left_input( let initial = (Vec::new(), 0, metrics, reservation); let (batches, num_rows, metrics, mut reservation) = stream .try_fold(initial, |mut acc, batch| async { - let batch_size = batch.get_array_memory_size(); + let batch_size = get_record_batch_memory_size(&batch); // Reserve memory for incoming batch acc.3.try_grow(batch_size)?; // Update metrics @@ -3982,6 +3983,11 @@ mod tests { err.to_string(), "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput" ); + + assert_contains!( + err.to_string(), + "Failed to allocate additional 120 bytes for HashJoinInput" + ); } Ok(()) @@ -4063,6 +4069,11 @@ mod tests { "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]" ); + + assert_contains!( + err.to_string(), + "Failed to allocate additional 120 bytes for HashJoinInput[1]" + ); } Ok(())