Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dekaf: Temporarily disable record LZ4 compression #1651

Merged
merged 1 commit into from
Sep 23, 2024

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Sep 20, 2024

Description:

So, this was crazy. @danthelion reported to me this morning that ClickHouse wasn't able to read from Dekaf. I spent a bunch of time digging into it, and found that long story short, Clickhouse's Kafka consumer was acting as if we were reporting bad data -- issuing a Fetch request, and then throwing the response away and re-issuing a Fetch request for the same offset, over and over.

I initially thought it was related to the change I had made to treat Flow acks as Kafka control messages, but backing that out had no effect. I tried a bunch of other things unsuccessfully until I finally took a wild guess and disabled LZ4 compression... which immediately completely fixed the problem.

This isn't the first time that client/server compression compatibility has broken Dekaf. I ran into a similar issue before which was fixed here, but this time I'm not able to reproduce this issue in any local testing.

I'd like to get this merged ASAP to unblock the Dekaf release (at least for Clickhouse) happening early next week, but ultimately I intend to make compression configurable as part of a Dekaf-type materialization's endpoint config.


This change is Reviewable

@jshearer jshearer added the change:unplanned This change is unplanned, useful for things like docs label Sep 20, 2024
@jgraettinger
Copy link
Member

jgraettinger commented Sep 20, 2024

This will have a marked impact on network egress fees and make Dekaf more expensive to operate.

What's their response about supporting LZ4 compression?

(The choice of LZ4 was very carefully considered, as it gives a 2-4x reduction in data payload size while not being nearly as CPU intensive as GZIP to compress, and being absurdly fast to decompress as well)

@jshearer
Copy link
Contributor Author

jshearer commented Sep 20, 2024

I sent an email earlier today about this, haven't heard back. I will follow up with more details, but it's Friday afternoon, I doubt we'll hear back before next week.

The choice of LZ4 was very carefully considered, as it gives a 2-4x reduction in data payload size while not being nearly as CPU intensive as GZIP to compress, and being absurdly fast to decompress as well

Totally, LZ4 certainly seems like the right choice on the merits. If I could reproduce the issue locally I could figure out what's wrong and come up with a patch to fix it like I did before tychedelia/kafka-protocol-rs#59 got merged/released, but it works fine against kcat, bytewax, and materialize (all using librdkafka) locally. Maybe I need a bleeding edge release of librdkafka to trigger it?

@jshearer
Copy link
Contributor Author

They got back to me and said they use https://github.com/twmb/franz-go instead of librdkafka, which is great -- if we don't figure it out from logs, I can come up with a quick consumer using that library to hopefully reproduce the issue locally and fix it.

Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jshearer jshearer merged commit e661a00 into master Sep 23, 2024
3 of 4 checks passed
@jshearer
Copy link
Contributor Author

Just for a paper trail, I spent a bunch of time today digging into this and ended up finding a related issue that fixes the problem, but unfortunately no smoking gun.

The actual error (which is unhelpfully swallowed by franz-go) is this:

lz4: invalid block checksum: got a1d59126; expected e48f521c

It appears that somewhere between kafka-protocol which uses lz4-rs and franz-go which uses lz4, there's some disagreement about what a correct lz4 chunk looks like. I tried a whole bunch of different things:

  • Updated kafka-protocol to bleeding edge in order to use a custom compressor and directly invoke lz4-rs
  • Manually lz4'd a bunch of different test inputs and provided them directly to go's LZ4 library
  • Extracted the compressed records from RecordBatchEncoder and directly decompressed with go
  • Turned on and off all of the LZ4 flags I could: content checksums, block checksums, tried various compression levels. This is where I found the related issue.

Turns out, lz4-rs forgot to actually propagate its encoder setting block_checksum_flag, leaving it to be always enabled no matter what you pass to .block_checksum(). Since the default is BlockChecksum::NoBlockChecksum (disabled), and the error is talking specifically about block checksums, I went and made the change to propagate this flag down to the underlying lz4 library. This fixed the problem by treating a symptom (disabling block checksums), rather than the core issue of why the block checksums were getting either incorrectly calculated or incorrectly rejected.

Either way, once 10XGenomics/lz4-rs#52 works its way through we should be able to turn lz4 back on with no more problems (though we should test it... 😬).

jshearer added a commit that referenced this pull request Sep 23, 2024
While investigating the cause of LZ4 compression issues related to franz-go (see comments here #1651), I found `lz4_flex` which is a pure-Rust lz4 implementation which appears to be safer and faster than `lz4`/`lz4-sys` that `kafka-protocol` is using. Now that tychedelia/kafka-protocol-rs#81 allows us to use our own compression, and `lz4`'s configuration of block checksums is broken (fix here 10XGenomics/lz4-rs#52), I thought it would be a good time to swap to `lz4_flex`.
jshearer added a commit that referenced this pull request Sep 23, 2024
While investigating the cause of LZ4 compression issues related to franz-go (see comments here #1651), I found `lz4_flex` which is a pure-Rust lz4 implementation which appears to be safer and faster than `lz4`/`lz4-sys` that `kafka-protocol` is using. Now that tychedelia/kafka-protocol-rs#81 allows us to use our own compression, and `lz4`'s configuration of block checksums is broken (fix here 10XGenomics/lz4-rs#52), I thought it would be a good time to swap to `lz4_flex`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
change:unplanned This change is unplanned, useful for things like docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants