Skip to content

Commit

Permalink
Avoid unecessary copy when reading arrow files (#11840)
Browse files Browse the repository at this point in the history
* avoid copy

* fmt

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
XiangpengHao and alamb authored Aug 8, 2024
1 parent 86030a1 commit 1c9583a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
};

use arrow::buffer::Buffer;
use arrow_ipc::reader::FileDecoder;
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -296,7 +297,10 @@ impl FileOpener for ArrowOpener {
for (dict_block, dict_result) in
footer.dictionaries().iter().flatten().zip(dict_results)
{
decoder.read_dictionary(dict_block, &dict_result.into())?;
decoder.read_dictionary(
dict_block,
&Buffer::from_bytes(dict_result.into()),
)?;
}

// filter recordbatches according to range
Expand Down Expand Up @@ -332,7 +336,10 @@ impl FileOpener for ArrowOpener {
.zip(recordbatch_results)
.filter_map(move |(block, data)| {
decoder
.read_record_batch(&block, &data.into())
.read_record_batch(
&block,
&Buffer::from_bytes(data.into()),
)
.transpose()
}),
)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
"Error IPC message while deserializing ScalarValue::List: {e}"
))
})?;
let buffer = Buffer::from(arrow_data);
let buffer = Buffer::from(arrow_data.as_slice());

let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
Error::General(
Expand All @@ -423,7 +423,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
"Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
))
})?;
let buffer = Buffer::from(arrow_data);
let buffer = Buffer::from(arrow_data.as_slice());

let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
Error::General(
Expand Down

0 comments on commit 1c9583a

Please sign in to comment.