Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query): 636 fast ifnull #17047

Open
wants to merge 4 commits into
base: release/v1.2.636-rc7
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::arrow::error::Result;
#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_list<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: DataType,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand Down Expand Up @@ -70,7 +69,6 @@ pub fn read_fixed_size_list<R: Read + Seek>(

let values = read(
field_nodes,
variadic_buffer_counts,
field,
&ipc_field.fields[0],
buffers,
Expand Down
2 changes: 0 additions & 2 deletions src/common/arrow/src/arrow/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::arrow::offset::Offset;
#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: DataType,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand Down Expand Up @@ -95,7 +94,6 @@ where

let values = read(
field_nodes,
variadic_buffer_counts,
field,
&ipc_field.fields[0],
buffers,
Expand Down
2 changes: 0 additions & 2 deletions src/common/arrow/src/arrow/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::arrow::error::Result;
#[allow(clippy::too_many_arguments)]
pub fn read_map<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: DataType,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand Down Expand Up @@ -90,7 +89,6 @@ pub fn read_map<R: Read + Seek>(

let field = read(
field_nodes,
variadic_buffer_counts,
field,
&ipc_field.fields[0],
buffers,
Expand Down
3 changes: 0 additions & 3 deletions src/common/arrow/src/arrow/io/ipc/read/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,5 @@ mod dictionary;
pub use dictionary::*;
mod union;
pub use union::*;
mod binview;
pub use binview::*;
mod map;

pub use map::*;
2 changes: 0 additions & 2 deletions src/common/arrow/src/arrow/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::arrow::error::Result;
#[allow(clippy::too_many_arguments)]
pub fn read_struct<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: DataType,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand Down Expand Up @@ -72,7 +71,6 @@ pub fn read_struct<R: Read + Seek>(
.map(|(field, ipc_field)| {
read(
field_nodes,
variadic_buffer_counts,
field,
ipc_field,
buffers,
Expand Down
2 changes: 0 additions & 2 deletions src/common/arrow/src/arrow/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::arrow::error::Result;
#[allow(clippy::too_many_arguments)]
pub fn read_union<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: DataType,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand Down Expand Up @@ -103,7 +102,6 @@ pub fn read_union<R: Read + Seek>(
.map(|(field, ipc_field)| {
read(
field_nodes,
variadic_buffer_counts,
field,
ipc_field,
buffers,
Expand Down
8 changes: 0 additions & 8 deletions src/common/arrow/src/arrow/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl<'a, A, I: Iterator<Item = A>> ProjectionIter<'a, A, I> {
/// # Panics
/// iff `projection` is empty
pub fn new(projection: &'a [usize], iter: I) -> Self {
assert!(!projection.is_empty(), "projection cannot be empty");
Self {
projection: &projection[1..],
iter,
Expand Down Expand Up @@ -114,11 +113,6 @@ pub fn read_record_batch<R: Read + Seek>(
.buffers()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?;
let mut variadic_buffer_counts = batch
.variadic_buffer_counts()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
.map(|v| v.iter().map(|v| v as usize).collect::<VecDeque<usize>>())
.unwrap_or_else(VecDeque::new);
let mut buffers: VecDeque<arrow_format::ipc::BufferRef> = buffers.iter().collect();

// check that the sum of the sizes of all buffers is <= than the size of the file
Expand Down Expand Up @@ -153,7 +147,6 @@ pub fn read_record_batch<R: Read + Seek>(
.map(|maybe_field| match maybe_field {
ProjectionResult::Selected((field, ipc_field)) => Ok(Some(read(
&mut field_nodes,
&mut variadic_buffer_counts,
field,
ipc_field,
&mut buffers,
Expand Down Expand Up @@ -182,7 +175,6 @@ pub fn read_record_batch<R: Read + Seek>(
.map(|(field, ipc_field)| {
read(
&mut field_nodes,
&mut variadic_buffer_counts,
field,
ipc_field,
&mut buffers,
Expand Down
34 changes: 2 additions & 32 deletions src/common/arrow/src/arrow/io/ipc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::arrow::io::ipc::IpcField;
#[allow(clippy::too_many_arguments)]
pub fn read<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
field: &Field,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand Down Expand Up @@ -140,7 +139,6 @@ pub fn read<R: Read + Seek>(
.map(|x| x.boxed()),
List => read_list::<i32, _>(
field_nodes,
variadic_buffer_counts,
data_type,
ipc_field,
buffers,
Expand All @@ -156,7 +154,6 @@ pub fn read<R: Read + Seek>(
.map(|x| x.boxed()),
LargeList => read_list::<i64, _>(
field_nodes,
variadic_buffer_counts,
data_type,
ipc_field,
buffers,
Expand All @@ -172,7 +169,6 @@ pub fn read<R: Read + Seek>(
.map(|x| x.boxed()),
FixedSizeList => read_fixed_size_list(
field_nodes,
variadic_buffer_counts,
data_type,
ipc_field,
buffers,
Expand All @@ -188,7 +184,6 @@ pub fn read<R: Read + Seek>(
.map(|x| x.boxed()),
Struct => read_struct(
field_nodes,
variadic_buffer_counts,
data_type,
ipc_field,
buffers,
Expand Down Expand Up @@ -222,7 +217,6 @@ pub fn read<R: Read + Seek>(
}
Union => read_union(
field_nodes,
variadic_buffer_counts,
data_type,
ipc_field,
buffers,
Expand All @@ -238,7 +232,6 @@ pub fn read<R: Read + Seek>(
.map(|x| x.boxed()),
Map => read_map(
field_nodes,
variadic_buffer_counts,
data_type,
ipc_field,
buffers,
Expand All @@ -252,30 +245,7 @@ pub fn read<R: Read + Seek>(
scratch,
)
.map(|x| x.boxed()),
Utf8View => read_binview::<str, _>(
field_nodes,
variadic_buffer_counts,
data_type,
buffers,
reader,
block_offset,
is_little_endian,
compression,
limit,
scratch,
),
BinaryView => read_binview::<[u8], _>(
field_nodes,
variadic_buffer_counts,
data_type,
buffers,
reader,
block_offset,
is_little_endian,
compression,
limit,
scratch,
),
BinaryView | Utf8View => unimplemented!("BinaryView and Utf8View are not supported"),
}
}

Expand All @@ -299,6 +269,6 @@ pub fn skip(
Dictionary(_) => skip_dictionary(field_nodes, buffers),
Union => skip_union(field_nodes, data_type, buffers),
Map => skip_map(field_nodes, data_type, buffers),
BinaryView | Utf8View => todo!(),
BinaryView | Utf8View => unimplemented!("BinaryView and Utf8View are not supported"),
}
}
76 changes: 0 additions & 76 deletions src/common/arrow/src/arrow/io/ipc/read/read_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,6 @@ fn read_swapped<T: NativeType, R: Read + Seek>(
Ok(())
}

fn read_uncompressed_bytes<R: Read + Seek>(
reader: &mut R,
buffer_length: usize,
is_little_endian: bool,
) -> Result<Vec<u8>> {
if is_native_little_endian() == is_little_endian {
let mut buffer = Vec::with_capacity(buffer_length);
let _ = reader
.take(buffer_length as u64)
.read_to_end(&mut buffer)
.unwrap();
Ok(buffer)
} else {
unreachable!()
}
}

fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(
reader: &mut R,
buffer_length: usize,
Expand Down Expand Up @@ -167,65 +150,6 @@ fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
Ok(buffer)
}

fn read_compressed_bytes<R: Read + Seek>(
reader: &mut R,
buffer_length: usize,
is_little_endian: bool,
compression: Compression,
scratch: &mut Vec<u8>,
) -> Result<Vec<u8>> {
read_compressed_buffer::<u8, _>(
reader,
buffer_length,
buffer_length,
is_little_endian,
compression,
scratch,
)
}

pub fn read_bytes<R: Read + Seek>(
buf: &mut VecDeque<IpcBuffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut Vec<u8>,
) -> Result<Buffer<u8>> {
let buf = buf
.pop_front()
.ok_or_else(|| Error::oos(format!("out-of-spec: {:?}", OutOfSpecKind::ExpectedBuffer)))?;

let offset: u64 = buf.offset().try_into().map_err(|_| {
Error::oos(format!(
"out-of-spec: {:?}",
OutOfSpecKind::NegativeFooterLength
))
})?;

let buffer_length: usize = buf.length().try_into().map_err(|_| {
Error::oos(format!(
"out-of-spec: {:?}",
OutOfSpecKind::NegativeFooterLength
))
})?;

reader.seek(SeekFrom::Start(block_offset + offset))?;

if let Some(compression) = compression {
Ok(read_compressed_bytes(
reader,
buffer_length,
is_little_endian,
compression,
scratch,
)?
.into())
} else {
Ok(read_uncompressed_bytes(reader, buffer_length, is_little_endian)?.into())
}
}

pub fn read_buffer<T: NativeType, R: Read + Seek>(
buf: &mut VecDeque<IpcBuffer>,
length: usize, // in slots
Expand Down
4 changes: 1 addition & 3 deletions src/common/arrow/src/arrow/io/ipc/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,6 @@ fn get_data_type(
LargeBinary(_) => (DataType::LargeBinary, IpcField::default()),
Utf8(_) => (DataType::Utf8, IpcField::default()),
LargeUtf8(_) => (DataType::LargeUtf8, IpcField::default()),
BinaryView(_) => (DataType::BinaryView, IpcField::default()),
Utf8View(_) => (DataType::Utf8View, IpcField::default()),
FixedSizeBinary(fixed) => (
DataType::FixedSizeBinary(
fixed
Expand Down Expand Up @@ -366,7 +364,7 @@ fn get_data_type(
Struct(_) => deserialize_struct(field)?,
Union(union_) => deserialize_union(union_, field)?,
Map(map) => deserialize_map(map, field)?,
RunEndEncoded(_) | LargeListView(_) | ListView(_) => unimplemented!(),
_other => unimplemented!("BinaryView and Utf8View are not supported"),
})
}

Expand Down
Loading
Loading