Skip to content

Commit

Permalink
Merge branch 'apache:main' into avro-codec
Browse files Browse the repository at this point in the history
  • Loading branch information
jecsand838 authored Jan 10, 2025
2 parents 81d7bba + 88fb923 commit 03bd5f0
Show file tree
Hide file tree
Showing 52 changed files with 1,292 additions and 505 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ jobs:
ARROW_INTEGRATION_JAVA: ON
ARROW_INTEGRATION_JS: ON
ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS: "rust"
# Disable nanoarrow integration, due to https://github.com/apache/arrow-rs/issues/5052
ARCHERY_INTEGRATION_WITH_NANOARROW: "0"
ARCHERY_INTEGRATION_WITH_NANOARROW: "1"
# https://github.com/apache/arrow/pull/38403/files#r1371281630
ARCHERY_INTEGRATION_WITH_RUST: "1"
# These are necessary because the github runner overrides $HOME
Expand Down
28 changes: 7 additions & 21 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,32 +123,18 @@ jobs:
uses: ./.github/actions/setup-builder
- name: Install cargo-msrv
run: cargo install cargo-msrv
- name: Downgrade arrow dependencies
run: cargo update -p ahash --precise 0.8.7
- name: Check arrow
working-directory: arrow
run: |
# run `cd arrow; cargo msrv verify` to see problematic dependencies
cargo msrv verify --output-format=json
- name: Check parquet
working-directory: parquet
run: |
# run `cd parquet; cargo msrv verify` to see problematic dependencies
cargo msrv verify --output-format=json
- name: Check arrow-flight
working-directory: arrow-flight
run: |
# run `cd arrow-flight; cargo msrv verify` to see problematic dependencies
cargo msrv verify --output-format=json
- name: Downgrade object_store dependencies
working-directory: object_store
# Necessary because tokio 1.30.0 updates MSRV to 1.63
# and url 2.5.1, updates to 1.67
run: |
cargo update -p tokio --precise 1.29.1
cargo update -p url --precise 2.5.0
- name: Check object_store
working-directory: object_store
- name: Check all packages
run: |
# run `cd object_store; cargo msrv verify` to see problematic dependencies
cargo msrv verify --output-format=json
# run `cargo msrv verify --manifest-path "path/to/Cargo.toml"` to see problematic dependencies
find . -mindepth 2 -name Cargo.toml | while read -r dir
do
echo "Checking package '$dir'"
cargo msrv verify --manifest-path "$dir" --output-format=json || exit 1
done
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ include = [
"Cargo.toml",
]
edition = "2021"
rust-version = "1.62"
rust-version = "1.70"

[workspace.dependencies]
arrow = { version = "54.0.0", path = "./arrow", default-features = false }
Expand Down
118 changes: 91 additions & 27 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,43 @@ use crate::{bit_util, bytes::Bytes, native::ArrowNativeType};
use super::ops::bitwise_unary_op_helper;
use super::{MutableBuffer, ScalarBuffer};

/// Buffer represents a contiguous memory region that can be shared with other buffers and across
/// thread boundaries.
/// A contiguous memory region that can be shared with other buffers and across
/// thread boundaries that stores Arrow data.
///
/// `Buffer`s can be sliced and cloned without copying the underlying data and can
/// be created from memory allocated by non-Rust sources such as C/C++.
///
/// # Example: Create a `Buffer` from a `Vec` (without copying)
/// ```
/// # use arrow_buffer::Buffer;
/// let vec: Vec<u32> = vec![1, 2, 3];
/// let buffer = Buffer::from(vec);
/// ```
///
/// # Example: Convert a `Buffer` to a `Vec` (without copying)
///
/// Use [`Self::into_vec`] to convert a `Buffer` back into a `Vec` if there are
/// no other references and the types are aligned correctly.
/// ```
/// # use arrow_buffer::Buffer;
/// # let vec: Vec<u32> = vec![1, 2, 3];
/// # let buffer = Buffer::from(vec);
/// // convert the buffer back into a Vec of u32
/// // note this will fail if the buffer is shared or not aligned correctly
/// let vec: Vec<u32> = buffer.into_vec().unwrap();
/// ```
///
/// # Example: Create a `Buffer` from a [`bytes::Bytes`] (without copying)
///
/// [`bytes::Bytes`] is a common type in the Rust ecosystem for shared memory
/// regions. You can create a buffer from a `Bytes` instance using the `From`
/// implementation, also without copying.
///
/// ```
/// # use arrow_buffer::Buffer;
/// let bytes = bytes::Bytes::from("hello");
/// let buffer = Buffer::from(bytes);
///```
#[derive(Clone, Debug)]
pub struct Buffer {
/// the internal byte buffer.
Expand Down Expand Up @@ -59,24 +94,15 @@ unsafe impl Send for Buffer where Bytes: Send {}
unsafe impl Sync for Buffer where Bytes: Sync {}

impl Buffer {
/// Auxiliary method to create a new Buffer
/// Create a new Buffer from a (internal) `Bytes`
///
/// This can be used with a [`bytes::Bytes`] via `into()`:
/// NOTE despite the same name, `Bytes` is an internal struct in arrow-rs
/// and is different than [`bytes::Bytes`].
///
/// ```
/// # use arrow_buffer::Buffer;
/// let bytes = bytes::Bytes::from_static(b"foo");
/// let buffer = Buffer::from_bytes(bytes.into());
/// ```
#[inline]
/// See examples on [`Buffer`] for ways to create a buffer from a [`bytes::Bytes`].
#[deprecated(since = "54.1.0", note = "Use Buffer::from instead")]
pub fn from_bytes(bytes: Bytes) -> Self {
let length = bytes.len();
let ptr = bytes.as_ptr();
Buffer {
data: Arc::new(bytes),
ptr,
length,
}
Self::from(bytes)
}

/// Returns the offset, in bytes, of `Self::ptr` to `Self::data`
Expand Down Expand Up @@ -107,8 +133,11 @@ impl Buffer {
buffer.into()
}

/// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting
/// and the memory will be freed using the `drop` method of [crate::alloc::Allocation] when the reference count reaches zero.
/// Creates a buffer from an existing memory region.
///
/// Ownership of the memory is tracked via reference counting
/// and the memory will be freed using the `drop` method of
/// [crate::alloc::Allocation] when the reference count reaches zero.
///
/// # Arguments
///
Expand Down Expand Up @@ -155,7 +184,7 @@ impl Buffer {
self.data.capacity()
}

/// Tried to shrink the capacity of the buffer as much as possible, freeing unused memory.
/// Tries to shrink the capacity of the buffer as much as possible, freeing unused memory.
///
/// If the buffer is shared, this is a no-op.
///
Expand Down Expand Up @@ -190,7 +219,7 @@ impl Buffer {
}
}

/// Returns whether the buffer is empty.
/// Returns true if the buffer is empty.
#[inline]
pub fn is_empty(&self) -> bool {
self.length == 0
Expand All @@ -206,7 +235,9 @@ impl Buffer {
}

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
/// Doing so allows the same memory region to be shared between buffers.
///
/// This function is `O(1)` and does not copy any data, allowing the
/// same memory region to be shared between buffers.
///
/// # Panics
///
Expand Down Expand Up @@ -240,7 +271,10 @@ impl Buffer {

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`,
/// with `length` bytes.
/// Doing so allows the same memory region to be shared between buffers.
///
/// This function is `O(1)` and does not copy any data, allowing the same
/// memory region to be shared between buffers.
///
/// # Panics
/// Panics iff `(offset + length)` is larger than the existing length.
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
Expand Down Expand Up @@ -328,10 +362,16 @@ impl Buffer {
})
}

/// Returns `Vec` for mutating the buffer
/// Converts self into a `Vec`, if possible.
///
/// This can be used to reuse / mutate the underlying data.
///
/// Returns `Err(self)` if this buffer does not have the same [`Layout`] as
/// the destination Vec or contains a non-zero offset
/// # Errors
///
/// Returns `Err(self)` if
/// 1. this buffer does not have the same [`Layout`] as the destination Vec
/// 2. contains a non-zero offset
/// 3. The buffer is shared
pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
let layout = match self.data.deallocation() {
Deallocation::Standard(l) => l,
Expand Down Expand Up @@ -414,7 +454,29 @@ impl<T: ArrowNativeType> From<ScalarBuffer<T>> for Buffer {
}
}

/// Creating a `Buffer` instance by storing the boolean values into the buffer
/// Convert from internal `Bytes` (not [`bytes::Bytes`]) to `Buffer`
impl From<Bytes> for Buffer {
#[inline]
fn from(bytes: Bytes) -> Self {
let length = bytes.len();
let ptr = bytes.as_ptr();
Self {
data: Arc::new(bytes),
ptr,
length,
}
}
}

/// Convert from [`bytes::Bytes`], not internal `Bytes` to `Buffer`
impl From<bytes::Bytes> for Buffer {
fn from(bytes: bytes::Bytes) -> Self {
let bytes: Bytes = bytes.into();
Self::from(bytes)
}
}

/// Create a `Buffer` instance by storing the boolean values into the buffer
impl FromIterator<bool> for Buffer {
fn from_iter<I>(iter: I) -> Self
where
Expand Down Expand Up @@ -447,7 +509,9 @@ impl<T: ArrowNativeType> From<BufferBuilder<T>> for Buffer {

impl Buffer {
/// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length.
///
/// Prefer this to `collect` whenever possible, as it is ~60% faster.
///
/// # Example
/// ```
/// # use arrow_buffer::buffer::Buffer;
Expand Down
2 changes: 1 addition & 1 deletion arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl MutableBuffer {
pub(super) fn into_buffer(self) -> Buffer {
let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
std::mem::forget(self);
Buffer::from_bytes(bytes)
Buffer::from(bytes)
}

/// View this buffer as a mutable slice of a specific type.
Expand Down
8 changes: 6 additions & 2 deletions arrow-buffer/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ use crate::buffer::dangling_ptr;

/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
///
/// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's
/// global allocator nor u8 alignment.
/// Note that this structure is an internal implementation detail of the
/// arrow-rs crate. While it has the same name and similar API as
/// [`bytes::Bytes`] it is not limited to rust's global allocator nor u8
/// alignment. It is possible to create a `Bytes` from `bytes::Bytes` using the
/// `From` implementation.
///
/// In the most common case, this buffer is allocated using [`alloc`](std::alloc::alloc)
/// with an alignment of [`ALIGNMENT`](crate::alloc::ALIGNMENT)
///
/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the
/// custom deallocator to deallocate the region when it is no longer needed.
///
pub struct Bytes {
/// The raw pointer to be beginning of the region
ptr: NonNull<u8>,
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/gen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ name = "gen"
description = "Code generation for arrow-flight"
version = "0.1.0"
edition = { workspace = true }
rust-version = { workspace = true }
rust-version = "1.71.1"
authors = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl FlightDataDecoder {
));
};

let buffer = Buffer::from_bytes(data.data_body.into());
let buffer = Buffer::from(data.data_body);
let dictionary_batch = message.header_as_dictionary_batch().ok_or_else(|| {
FlightError::protocol(
"Could not get dictionary batch from DictionaryBatch message",
Expand Down
14 changes: 7 additions & 7 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,7 @@ mod tests {
])
.unwrap();

verify_encoded_split(batch, 112).await;
verify_encoded_split(batch, 120).await;
}

#[tokio::test]
Expand All @@ -1719,7 +1719,7 @@ mod tests {

// overage is much higher than ideal
// https://github.com/apache/arrow-rs/issues/3478
verify_encoded_split(batch, 4304).await;
verify_encoded_split(batch, 4312).await;
}

#[tokio::test]
Expand Down Expand Up @@ -1755,7 +1755,7 @@ mod tests {
// 5k over limit (which is 2x larger than limit of 5k)
// overage is much higher than ideal
// https://github.com/apache/arrow-rs/issues/3478
verify_encoded_split(batch, 5800).await;
verify_encoded_split(batch, 5808).await;
}

#[tokio::test]
Expand All @@ -1771,7 +1771,7 @@ mod tests {

let batch = RecordBatch::try_from_iter(vec![("a1", Arc::new(array) as _)]).unwrap();

verify_encoded_split(batch, 48).await;
verify_encoded_split(batch, 56).await;
}

#[tokio::test]
Expand All @@ -1785,7 +1785,7 @@ mod tests {

// overage is much higher than ideal
// https://github.com/apache/arrow-rs/issues/3478
verify_encoded_split(batch, 3328).await;
verify_encoded_split(batch, 3336).await;
}

#[tokio::test]
Expand All @@ -1799,7 +1799,7 @@ mod tests {

// overage is much higher than ideal
// https://github.com/apache/arrow-rs/issues/3478
verify_encoded_split(batch, 5280).await;
verify_encoded_split(batch, 5288).await;
}

#[tokio::test]
Expand All @@ -1824,7 +1824,7 @@ mod tests {

// overage is much higher than ideal
// https://github.com/apache/arrow-rs/issues/3478
verify_encoded_split(batch, 4128).await;
verify_encoded_split(batch, 4136).await;
}

/// Return size, in memory of flight data
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ pub fn arrow_data_from_flight_data(

let dictionaries_by_field = HashMap::new();
let record_batch = read_record_batch(
&Buffer::from_bytes(flight_data.data_body.into()),
&Buffer::from(flight_data.data_body),
ipc_record_batch,
arrow_schema_ref.clone(),
&dictionaries_by_field,
Expand Down
2 changes: 1 addition & 1 deletion arrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ authors = { workspace = true }
license = { workspace = true }
edition = { workspace = true }
publish = false
rust-version = { workspace = true }
rust-version = "1.75.0"

[lib]
crate-type = ["lib", "cdylib"]
Expand Down
2 changes: 1 addition & 1 deletion arrow-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
flatbuffers = { version = "24.3.25", default-features = false }
flatbuffers = { version = "24.12.23", default-features = false }
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true }
zstd = { version = "0.13.0", default-features = false, optional = true }

Expand Down
Loading

0 comments on commit 03bd5f0

Please sign in to comment.