Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving efficiency of encoding ProduceRequests #104

Open
eblocha opened this issue Jan 28, 2025 · 5 comments
Open

Improving efficiency of encoding ProduceRequests #104

eblocha opened this issue Jan 28, 2025 · 5 comments

Comments

@eblocha
Copy link

eblocha commented Jan 28, 2025

Hello!

I'm wondering if there's any way to avoid a double-copy I'm having to do when encoding produce requests. Currently I am creating a new BytesMut to pass to RecordBatchEncoder::encode, then passing that into PartitionProduceData. At the tcp sink, those bytes then get encoded again into the sink's internal buffer.

let mut records = BytesMut::new();

RecordBatchEncoder::encode(
    &mut records,
    prepared_records.iter(), // Vec<Record>
    ...
);

// error handling...

// this gets added to the ProduceRequest
let partition_data = PartitionProduceData::default()
    .with_index(partition)
    .with_records(Some(records.into()));

However, when this request is going to be sent to the IO sink, it gets encoded into a FramedWrite's internal buffer (from tokio_util). After encoding, the BytesMut I made is dropped, and I need to allocate a new one.

I'm wondering if there'd be a way to avoid this initial encode step into the intermediate byte array in PartitionProduceData, and do RecordBatchEncoder::encode on the buffer for FramedWrite, skipping the extra allocation.

@eblocha
Copy link
Author

eblocha commented Jan 28, 2025

Here's a flamegraph of a profile I took that sparked this question:

Image

When building the produce request, I have to copy the bytes into a buffer to put in the request, then copy from the request into the IO sink buffer.

@rukai
Copy link
Collaborator

rukai commented Jan 29, 2025

I agree that is a problem.
I'm not sure how to solve it though.
For my own use case I use kafka-protocol for a kafka proxy and I need the records to remain as raw bytes since it would be expensive and unneeded to parse the records.

Maybe we could store the records as something like:

enum RecordBatch {
   RawBytes(Bytes),
   Parsed(RecordBatchParsed),
}

A while back I had the idea to extend this project to generate 3 separate crates.

  • one optimized for clients
  • one optimized for kafka reimplementations
  • one optimized for proxies

This has lots of benefits to performance and ease of use.
But in this particular case the first two crates in this scenario could avoid the complexities of the above enum solution and just directly store the RecordBatchParsed.
Unfortunately this is a huge undertaking that I cant afford to take on.

@eblocha
Copy link
Author

eblocha commented Jan 29, 2025

On the decode side, you'd have to pick only one variant to decode into. Perhaps if PartitionProduceData was generic, it could defer the Encodable and Decodable implementations to the generic's implementation. Of course, this would be a breaking change.

Avoiding a breaking change would probably mean creating a whole new type.

@rukai
Copy link
Collaborator

rukai commented Jan 29, 2025

we are currently pretty flexible with breaking changes in this project, a bump to 0.15 is not an issue.

You make a good point about decoding.
The simplest solution is to just take the performance hit for decoding in your use case and decode to raw bytes every time.
I would like to avoid generics if possible because of the complexities they bring.
But if you could do it in a way that is not very disruptive it could work.

Another alternative is to have an alternate decode method on Decodable that decodes as the parsed form.

@eblocha
Copy link
Author

eblocha commented Jan 29, 2025

More complexity with this is that the compact byte format uses a dynamically-sized header to record the size of the message, and we don't know the size of the bytes we're about to write.

I'm not sure if there's a way to reserve enough space for the maximum-size length header, write the records, go back and write the length, then copy_within the records backwards to be in the correct position. That might defeat the whole optimization to begin with, not sure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants