Skip to content

Commit

Permalink
[Parquet] Reuse buffer in ByteViewArrayDecoderPlain (#6930)
Browse files Browse the repository at this point in the history
* reuse buffer in view array

* Update parquet/src/arrow/array_reader/byte_view_array.rs

Co-authored-by: Raphael Taylor-Davies <[email protected]>

* use From<Bytes> instead

---------

Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
XiangpengHao and tustvold authored Jan 8, 2025
1 parent 74499c0 commit a47d996
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl ByteViewArrayDecoder {

/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`]
pub struct ByteViewArrayDecoderPlain {
buf: Bytes,
buf: Buffer,
offset: usize,

validate_utf8: bool,
Expand All @@ -308,17 +308,23 @@ impl ByteViewArrayDecoderPlain {
validate_utf8: bool,
) -> Self {
Self {
buf,
buf: Buffer::from(buf),
offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
validate_utf8,
}
}

pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
// Zero copy convert `bytes::Bytes` into `arrow_buffer::Buffer`
let buf = arrow_buffer::Buffer::from(self.buf.clone());
let block_id = output.append_block(buf);
// avoid creating a new buffer if the last buffer is the same as the current buffer
// This is especially useful when row-level filtering is applied, where we call lots of small `read` over the same buffer.
let block_id = {
if output.buffers.last().is_some_and(|x| x.ptr_eq(&self.buf)) {
output.buffers.len() as u32 - 1
} else {
output.append_block(self.buf.clone())
}
};

let to_read = len.min(self.max_remaining_values);

Expand Down Expand Up @@ -690,12 +696,13 @@ mod tests {

use crate::{
arrow::{
array_reader::test_util::{byte_array_all_encodings, utf8_column},
array_reader::test_util::{byte_array_all_encodings, encode_byte_array, utf8_column},
buffer::view_buffer::ViewBuffer,
record_reader::buffer::ValuesBuffer,
},
basic::Encoding,
column::reader::decoder::ColumnValueDecoder,
data_type::ByteArray,
};

use super::*;
Expand Down Expand Up @@ -746,4 +753,23 @@ mod tests {
);
}
}

#[test]
fn test_byte_view_array_plain_decoder_reuse_buffer() {
let byte_array = vec!["hello", "world", "large payload over 12 bytes", "b"];
let byte_array: Vec<ByteArray> = byte_array.into_iter().map(|x| x.into()).collect();
let pages = encode_byte_array(Encoding::PLAIN, &byte_array);

let column_desc = utf8_column();
let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);

let mut view_buffer = ViewBuffer::default();
decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
decoder.read(&mut view_buffer, 1).unwrap();
decoder.read(&mut view_buffer, 1).unwrap();
assert_eq!(view_buffer.buffers.len(), 1);

decoder.read(&mut view_buffer, 1).unwrap();
assert_eq!(view_buffer.buffers.len(), 1);
}
}

0 comments on commit a47d996

Please sign in to comment.