From a47d9967be152b246e9fde1fe12ee512bcd5a856 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 8 Jan 2025 09:28:05 -0500 Subject: [PATCH] [Parquet] Reuse buffer in `ByteViewArrayDecoderPlain` (#6930) * reuse buffer in view array * Update parquet/src/arrow/array_reader/byte_view_array.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> * use From instead --------- Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- .../src/arrow/array_reader/byte_view_array.rs | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 92a8b0592d0..0e16642940d 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -290,7 +290,7 @@ impl ByteViewArrayDecoder { /// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`] pub struct ByteViewArrayDecoderPlain { - buf: Bytes, + buf: Buffer, offset: usize, validate_utf8: bool, @@ -308,7 +308,7 @@ impl ByteViewArrayDecoderPlain { validate_utf8: bool, ) -> Self { Self { - buf, + buf: Buffer::from(buf), offset: 0, max_remaining_values: num_values.unwrap_or(num_levels), validate_utf8, @@ -316,9 +316,15 @@ impl ByteViewArrayDecoderPlain { } pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result { - // Zero copy convert `bytes::Bytes` into `arrow_buffer::Buffer` - let buf = arrow_buffer::Buffer::from(self.buf.clone()); - let block_id = output.append_block(buf); + // avoid creating a new buffer if the last buffer is the same as the current buffer + // This is especially useful when row-level filtering is applied, where we call lots of small `read` over the same buffer. + let block_id = { + if output.buffers.last().is_some_and(|x| x.ptr_eq(&self.buf)) { + output.buffers.len() as u32 - 1 + } else { + output.append_block(self.buf.clone()) + } + }; let to_read = len.min(self.max_remaining_values); @@ -690,12 +696,13 @@ mod tests { use crate::{ arrow::{ - array_reader::test_util::{byte_array_all_encodings, utf8_column}, + array_reader::test_util::{byte_array_all_encodings, encode_byte_array, utf8_column}, buffer::view_buffer::ViewBuffer, record_reader::buffer::ValuesBuffer, }, basic::Encoding, column::reader::decoder::ColumnValueDecoder, + data_type::ByteArray, }; use super::*; @@ -746,4 +753,23 @@ mod tests { ); } } + + #[test] + fn test_byte_view_array_plain_decoder_reuse_buffer() { + let byte_array = vec!["hello", "world", "large payload over 12 bytes", "b"]; + let byte_array: Vec = byte_array.into_iter().map(|x| x.into()).collect(); + let pages = encode_byte_array(Encoding::PLAIN, &byte_array); + + let column_desc = utf8_column(); + let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc); + + let mut view_buffer = ViewBuffer::default(); + decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap(); + decoder.read(&mut view_buffer, 1).unwrap(); + decoder.read(&mut view_buffer, 1).unwrap(); + assert_eq!(view_buffer.buffers.len(), 1); + + decoder.read(&mut view_buffer, 1).unwrap(); + assert_eq!(view_buffer.buffers.len(), 1); + } }