Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch-read that can take ownership of consumed items #56

Closed
nullchinchilla opened this issue Jun 23, 2021 · 15 comments · Fixed by #60
Closed

Batch-read that can take ownership of consumed items #56

nullchinchilla opened this issue Jun 23, 2021 · 15 comments · Fixed by #60

Comments

@nullchinchilla
Copy link

Can a method to take ownership of the items in a ReadChunk be added? I'm not sure what the best API would be (perhaps an consume_into_iter()?) since taking ownership means commit must be called, but such a method would be crucial for efficiently batch-reading non-Copy types.

(Context: using rtrb as a primitive in a scheduler, so it contains non-Clone non-Copy jobs rather than u8s or anything of that sort)

@mgeier
Copy link
Owner

mgeier commented Jun 24, 2021

I guess this should be possible to implement.

The question is: what should happen to un-consumed items?
Should they be dropped or should they stay in the queue?

In the current API this decision is explicit, making it implicit may cause confusion.

What about batch-writing? I guess this should also allow moves for symmetry.
But AFAICT this wouldn't work with an Iterator ...

Did you happen to have a look at #52?
I guess this would also have to be extended to allow taking ownership if we go that route ...

And just out of curiosity: how large are your batches?
They must be quite large for this to make a non-negligible performance difference, right?

Do your jobs implement Default?
If yes, you could probably work around the issue with std::mem::take()?
If you can otherwise cheaply create a dummy job, you could try std::mem::swap().

@mgeier
Copy link
Owner

mgeier commented Jun 24, 2021

PS: I'm just starting to realize that the work-around will not work because ReadChunk only provides read-only access ...

@nullchinchilla
Copy link
Author

My batches are not that big (generally a few dozen at most), but I am popping very frequently (this is a task-queue for a async task executor, so the work items are extremely small and there are many thousands every second under load).

@mgeier
Copy link
Owner

mgeier commented Jun 29, 2021

What about a closure-based API?

Something like this:

    pub fn consume_chunk_with<F>(&mut self, n: usize, f: F) -> Result<(), ChunkError>
    where
        F: Fn(T),
    {
        todo!()
    }

The closure would be called exactly n times.

@nullchinchilla Would this work in your case?

Alternatively, the closure could return something to indicate an early return.

I think something like this could also work on the producer side.

@mgeier
Copy link
Owner

mgeier commented Jun 30, 2021

Regarding my questions above (#56 (comment)) Vec::drain() could be a model: when the iterator is dropped, all elements are dropped, even if not all have been iterated.

But still, I don't know how this should look on the producer side, since I don't think we can use an Iterator here.

@PieterPenninckx
Copy link

But still, I don't know how this should look on the producer side, since I don't think we can use an Iterator here.

How about the following (pseudo-Rust)? The crux I guess is in the signature: the iterator is not moved, only borrowed as mutable, but calling the iterator moves an item "out of the iterator".

impl<T> Producer<T> {
    pub fn extend<I>(iterator: &mut I)  // Bikeshedding the name for now.
        where I: Iterator<Item = T> 
    {
        let writing_slice = self.get_writing_slice(); // Doesn't really exist, but suppose this gives an &mut [MaybeUninit<T>] to the potentially uninitialized data in the ring buffer
        let mut number_of_items_written = 0;
        for (input, output) in iterator.zip(writing_slice.iter_mut()) {
            // I have no idea if this is the correct way to work with `MaybeUninit`, but you probably know.
            unsafe {
                output.as_mut_ptr().write(&input);
                mem::leak(input);
            }
            number_of_items_written += 1;
        }
        self.advance(number_of_items_written); // Or something like that.
    }
}

@mgeier mgeier mentioned this issue Jul 1, 2021
@mgeier
Copy link
Owner

mgeier commented Jul 1, 2021

With "I don't think we can use an Iterator here" I meant "we can't provide an iterator (for moving items into it)", but of course we could take an iterator (for moving items out of it).

The main question here is: should we take the size_hint() from the iterator or should the user explicitly specify a chunk size?
Or should we use producer.slots() as chunk size?
The problem with latter is that it might be more efficient to use a smaller chunk size in some cases (when the atomic variable doesn't have to be read).

I don't really have a use case for this, so I don't know which API would make most sense. What do you think @nullchinchilla?

@PieterPenninckx
Copy link

With "I don't think we can use an Iterator here" I meant "we can't provide an iterator (for moving items into it)", but of course we could take an iterator (for moving items out of it).

Ah. I see.

Just my two cents here: I would take the size_hint() from the iterator since the caller can usually limit the number of items that the iterator gives, so there's no need to specify it twice. E.g. in the case of an iterator that drains a Vecor a VecDeque, the range can be specified when creating the iterator and in many other cases, the caller can do something similar to the following:

let mut iterator = todo!();
producer.extend(iterator.take(5));

The latter only makes sense when the type does not implement Drop however.

Just my two cents. I'm also curious to hear @nullchinchilla 's view.

@nullchinchilla
Copy link
Author

nullchinchilla commented Jul 3, 2021

For my particular use case I actually would prefer a std::io::Read-like interface, where you pass in a &mut [T], and the method moves n elements into the buffer and returns n, with no iterators etc. This API also wouldn't really have any edge cases at all.

@mgeier
Copy link
Owner

mgeier commented Jul 4, 2021

@nullchinchilla

where you pass in a &mut [T], and the method moves n elements into the buffer

The problem with this is that the user has to initially provide a slice filled with valid T values, which might be non-trivial to construct, even though they are never used for anything and dropped in the process (which also might be non-trivial).

I think it would be better to provide an API that truly allows moving items without having to deal with any dummy items. I think slices are not the right tool for this job.

I agree that an additional slice-copying API would be useful (see #57 for discussion), but only for T: Copy. This issue is about move-only types.

@PieterPenninckx

I would take the size_hint() from the iterator

I'm not sure if I like this.

On the one hand, I like to be explicit about the chunk size.
On the other hand, I fear that using something like take(5) would be non-zero overhead (see https://github.com/rust-lang/rust/blob/64ae15ddd3f3cca7036ab2b2f3a6b130b62af4da/library/core/src/iter/adapters/take.rs#L58-L73).


I've tried to implement an experimental iterator API in the branch https://github.com/mgeier-forks/rtrb/tree/iterators.

It provides a WriteChunkUninit::populate() function (name TBD):

    pub fn populate<I>(self, iter: &mut I) -> usize
    where
        I: Iterator<Item = T>;

It can be used like this:

let mut it = /* some iterator */;
if let Ok(mut chunk) = producer.write_chunk_uninit(25) {
    chunk.populate(&mut it);
}

I'm not sure if it would be better to take ownership of the iterator, but I think this way it is more obvious that the iterator might not be completely drained. The iterator might be re-used in a future call.

On the reading side (which was actually requested in this issue), I've implemented IntoIterator for ReadChunk, which can be used like this:

if let Ok(chunk) = consumer.read_chunk(25) {
    let v: Vec<_> = chunk.into_iter().collect();
}

... or like this:

if let Ok(chunk) = consumer.read_chunk(25) {
    for item in chunk {
        // do something with item
    }
}

Only the actually iterated items are moved out of the ring buffer. If fewer than chunk size items are iterated, the remaining ones stay in the queue (i.e. they are not dropped).


I've also made an alternative experimental closure-based API in the branch https://github.com/mgeier-forks/rtrb/tree/consume-chunk-with.
I think I like the iterator-based API more, but feel free to try the closure-based one as well.

@mgeier
Copy link
Owner

mgeier commented Jul 8, 2021

I've created #60 in order to be able to play around with the iterator API.

I've also created #61 to try out the closure-base API, but as mentioned above, I think I prefer the iterator API.

Either one can certainly be refined, and probably there is yet another possible API?

@mgeier
Copy link
Owner

mgeier commented Jul 11, 2021

@PieterPenninckx

The crux I guess is in the signature: the iterator is not moved, only borrowed as mutable

I very recently (in the last few days) learned that it is not necessary to require &mut for the iterator parameter, because Iterator is automatically implemented for any generic &mut Iterator (see https://doc.rust-lang.org/std/iter/trait.Iterator.html#impl-Iterator-13).

Therefore, functions can in general take ownership of iterator parameters without worrying about that. This makes the call sites simpler in the cases where the iterator is not needed afterwards (and doesn't require all iterators to be mut).

But even better, knowing this we can also use IntoIterator, which allows passing many more things, like Vec and very recently even arrays!

For now, I'm using this signature in #60:

impl<T> WriteChunkUninit<'_, T> {
    pub fn populate<I>(self, iter: I) -> usize
    where
        I: IntoIterator<Item = T>,
    {
        let mut iter = iter.into_iter();
        ...
    }
}

This allows passing arrays with chunk.populate([1, 2, 3]), but it also allows passing a mutable reference to an iterator that can be continued to be used later: chunk.populate(&mut my_iterator).
Alternatively, chunk.populate(my_iterator.by_ref()) can be used, which I also only learned about very recently (https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.by_ref).

What do you think about this API?

Any alternative names for populate()?

@PieterPenninckx
Copy link

Working with IntoIterator is certainly better :-)

I don't have the energy right now to do a proper review of #60, but I trust you it has been thought-out quite well.

@mgeier
Copy link
Owner

mgeier commented Jul 19, 2021

Thanks @PieterPenninckx!
No problem, I think I'll merge #60 soon, but I'm still open to changes, especially regarding the name populate().

I'm not in a hurry and I think I will wait some more time before making the next release (which will be a breaking change).

@nullchinchilla, do you have any further suggestions?

@mgeier
Copy link
Owner

mgeier commented Jul 27, 2021

FYI, I've changed the name populate() in #60 to fill_from_iter(), which I think is more explicit and therefore better.

I will wait for a few more days, but if there are no further comments, I'll merge that.

@mgeier mgeier closed this as completed in #60 Aug 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants