From 47be719f18e7836020b03ca2ae8a68357e5d87d8 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 6 Aug 2024 03:46:25 -0400 Subject: [PATCH 1/2] avoid copy --- datafusion/core/src/datasource/physical_plan/arrow_file.rs | 5 +++-- datafusion/proto-common/src/from_proto/mod.rs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index e720b4efff6f..745392de87f2 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -31,6 +31,7 @@ use crate::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, }; +use arrow::buffer::Buffer; use arrow_ipc::reader::FileDecoder; use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; @@ -296,7 +297,7 @@ impl FileOpener for ArrowOpener { for (dict_block, dict_result) in footer.dictionaries().iter().flatten().zip(dict_results) { - decoder.read_dictionary(dict_block, &dict_result.into())?; + decoder.read_dictionary(dict_block, &Buffer::from_bytes(dict_result.into()))?; } // filter recordbatches according to range @@ -331,7 +332,7 @@ impl FileOpener for ArrowOpener { .into_iter() .zip(recordbatch_results) .filter_map(move |(block, data)| { - match decoder.read_record_batch(&block, &data.into()) { + match decoder.read_record_batch(&block, &Buffer::from_bytes(data.into())) { Ok(Some(record_batch)) => Some(Ok(record_batch)), Ok(None) => None, Err(err) => Some(Err(err)), diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 3487f43ae24e..feb4c11aa809 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -408,7 +408,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { "Error IPC message while deserializing ScalarValue::List: {e}" )) })?; - let buffer = Buffer::from(arrow_data); + let buffer = Buffer::from(arrow_data.as_slice()); let ipc_batch = message.header_as_record_batch().ok_or_else(|| { Error::General( @@ -423,7 +423,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { "Error IPC message while deserializing ScalarValue::List dictionary message: {e}" )) })?; - let buffer = Buffer::from(arrow_data); + let buffer = Buffer::from(arrow_data.as_slice()); let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| { Error::General( From 64eff784d3ccb6a416b55bcbb55a0d9487dd3e52 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Tue, 6 Aug 2024 03:48:07 -0400 Subject: [PATCH 2/2] fmt --- .../core/src/datasource/physical_plan/arrow_file.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 745392de87f2..152d0bf09c5b 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -297,7 +297,10 @@ impl FileOpener for ArrowOpener { for (dict_block, dict_result) in footer.dictionaries().iter().flatten().zip(dict_results) { - decoder.read_dictionary(dict_block, &Buffer::from_bytes(dict_result.into()))?; + decoder.read_dictionary( + dict_block, + &Buffer::from_bytes(dict_result.into()), + )?; } // filter recordbatches according to range @@ -332,7 +335,10 @@ impl FileOpener for ArrowOpener { .into_iter() .zip(recordbatch_results) .filter_map(move |(block, data)| { - match decoder.read_record_batch(&block, &Buffer::from_bytes(data.into())) { + match decoder.read_record_batch( + &block, + &Buffer::from_bytes(data.into()), + ) { Ok(Some(record_batch)) => Some(Ok(record_batch)), Ok(None) => None, Err(err) => Some(Err(err)),