From 1c9583ab95310fb1afa93fec88432ed6536da749 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Fri, 9 Aug 2024 04:39:44 +0800 Subject: [PATCH] Avoid unecessary copy when reading arrow files (#11840) * avoid copy * fmt --------- Co-authored-by: Andrew Lamb --- .../core/src/datasource/physical_plan/arrow_file.rs | 11 +++++++++-- datafusion/proto-common/src/from_proto/mod.rs | 4 ++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index a1ee6fbe1341..b4edc221c1f8 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -31,6 +31,7 @@ use crate::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, }; +use arrow::buffer::Buffer; use arrow_ipc::reader::FileDecoder; use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; @@ -296,7 +297,10 @@ impl FileOpener for ArrowOpener { for (dict_block, dict_result) in footer.dictionaries().iter().flatten().zip(dict_results) { - decoder.read_dictionary(dict_block, &dict_result.into())?; + decoder.read_dictionary( + dict_block, + &Buffer::from_bytes(dict_result.into()), + )?; } // filter recordbatches according to range @@ -332,7 +336,10 @@ impl FileOpener for ArrowOpener { .zip(recordbatch_results) .filter_map(move |(block, data)| { decoder - .read_record_batch(&block, &data.into()) + .read_record_batch( + &block, + &Buffer::from_bytes(data.into()), + ) .transpose() }), ) diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 3487f43ae24e..feb4c11aa809 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -408,7 +408,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { "Error IPC message while deserializing ScalarValue::List: {e}" )) })?; - let buffer = Buffer::from(arrow_data); + let buffer = Buffer::from(arrow_data.as_slice()); let ipc_batch = message.header_as_record_batch().ok_or_else(|| { Error::General( @@ -423,7 +423,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { "Error IPC message while deserializing ScalarValue::List dictionary message: {e}" )) })?; - let buffer = Buffer::from(arrow_data); + let buffer = Buffer::from(arrow_data.as_slice()); let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| { Error::General(