Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Array::shrink_to_fit(&self) -> ArrayRef #6787

Closed
wants to merge 3 commits into from

Conversation

emilk
Copy link
Contributor

@emilk emilk commented Nov 25, 2024

Which issue does this PR close?

Rationale for this change

Concatenating many arrow buffers incrementally can lead to situations where the buffers are using much more memory than they need (their capacity is larger than their lengths).

Example:

use arrow::{
    array::{Array, ArrayRef, ListArray, PrimitiveArray},
    buffer::OffsetBuffer,
    datatypes::{Field, UInt8Type},
};

fn main() {
    let array0: PrimitiveArray<UInt8Type> = (0..200 * 300)
        .map(|v| (v % 255) as u8)
        .collect::<Vec<_>>()
        .into();
    let array0: ArrayRef = Arc::new(array0);

    let (global, local) = memory_use(|| {
        let concatenated = concatenate(array0.clone());
        dbg!(concatenated.data_type());
        eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
        concatenated
    });
    eprintln!("global: {global} bytes");
    eprintln!("local: {local} bytes");

    eprintln!("---");

    let array1 = ListArray::new(
        Field::new_list_field(array0.data_type().clone(), false).into(),
        OffsetBuffer::from_lengths(std::iter::once(array0.len())),
        array0.clone(),
        None,
    );
    let array1: ArrayRef = Arc::new(array1);

    let (global, local) = memory_use(|| {
        let concatenated = concatenate(array1.clone()).shrink_to_fit();
        dbg!(concatenated.data_type());
        eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
        concatenated
    });
    eprintln!("global: {global} bytes");
    eprintln!("local: {local} bytes");
}

fn concatenate(array: ArrayRef) -> ArrayRef {
    let mut concatenated = array.clone();

    for _ in 0..1000 {
        concatenated = arrow::compute::kernels::concat::concat(&[&*concatenated, &*array]).unwrap();
    }

    concatenated
}

fn how_many_bytes(array: ArrayRef) -> u64 {
    let mut array = array;
    loop {
        match array.data_type() {
            arrow::datatypes::DataType::UInt8 => break,
            arrow::datatypes::DataType::List(_) => {
                let list = array.as_any().downcast_ref::<ListArray>().unwrap();
                array = list.values().clone();
            }
            _ => unreachable!(),
        }
    }

    array.len() as _
}

// --- Memory tracking ---

use std::sync::{
    atomic::{AtomicUsize, Ordering::Relaxed},
    Arc,
};

static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0);

thread_local! {
    static LIVE_BYTES_IN_THREAD: AtomicUsize = const { AtomicUsize::new(0)  } ;
}

pub struct TrackingAllocator {
    allocator: std::alloc::System,
}

#[global_allocator]
pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator {
    allocator: std::alloc::System,
};

#[allow(unsafe_code)]
// SAFETY:
// We just do book-keeping and then let another allocator do all the actual work.
unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
    #[allow(clippy::let_and_return)]
    unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
        LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), Relaxed));
        LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed);

        // SAFETY:
        // Just deferring
        unsafe { self.allocator.alloc(layout) }
    }

    unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
        LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), Relaxed));
        LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed);

        // SAFETY:
        // Just deferring
        unsafe { self.allocator.dealloc(ptr, layout) };
    }
}

fn live_bytes_local() -> usize {
    LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed))
}

fn live_bytes_global() -> usize {
    LIVE_BYTES_GLOBAL.load(Relaxed)
}

/// Returns `(num_bytes_allocated, num_bytes_allocated_by_this_thread)`.
fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
    let used_bytes_start_local = live_bytes_local();
    let used_bytes_start_global = live_bytes_global();
    let ret = run();
    let bytes_used_local = live_bytes_local() - used_bytes_start_local;
    let bytes_used_global = live_bytes_global() - used_bytes_start_global;
    drop(ret);
    (bytes_used_global, bytes_used_local)
}

If you run this you will see 12 MB is used for 6 MB of data.

What changes are included in this PR?

This PR adds shrink_to_fit to Array and all buffers.

Are there any user-facing changes?

trait Array now has a fn shrink_to_fit(&self) -> ArrayRef.

Problems

I could not implement Array::shrink_to_fit for TypedDictionaryArray nor TypedRunArray, since they are types wrapping references. Calling shrink_to_fit on these will result in an unimplemented! panic. Perhaps we should return an error instead.

Due to the above problem, perhaps we should consider an alternative approach?

@github-actions github-actions bot added the arrow Changes to the arrow crate label Nov 25, 2024
@emilk emilk changed the title Add Array::shrink_to_fit Add Array::shrink_to_fit(&self) -> ArrayRef Nov 25, 2024
@emilk
Copy link
Contributor Author

emilk commented Nov 25, 2024

I think this is a better approach:

@tustvold tustvold closed this Dec 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add shrink_to_fit to Array
2 participants