Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rebalance-safe-commits with external commits #1425

Merged
merged 12 commits into from
Jan 13, 2025

Conversation

svroonland
Copy link
Collaborator

@svroonland svroonland commented Dec 29, 2024

Allow rebalance-safe-commits to be used in combination with external commits. External commits are commits done to some other system than the kafka broker, e.g. a relational database.

This new capability also supports the improved TransactionalProducer that will be introduced in zio-kafka 3.0.0.

--
Note: this PR initially also included the improved TransactionalProducer. This has been moved to #1434.

@svroonland svroonland marked this pull request as ready for review January 2, 2025 10:03
@erikvanoosten
Copy link
Collaborator

This is nice. Some questions:

  • Can we somehow guarantee that rebalanceSafeCommits is enabled when we use this new transactional producer?
  • If we simply roll this out, will applications with the existing transactional producers break? Maybe not, but if that is so, I propose we either make this backward compatible, or we move this to zio-kafka 3.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jan 4, 2025

The updated TransactionProducer is not binary backward compatible; it requires a Consumer in the environment, so the ZIO type is different. It will be source compatible for many applications because many will have a Consumer available anyway.

However, this is a strange set-up. The Consumer and the TransactionProducer are tied together; they must connect to the same Kafka broker cluster. Therefore it makes more sense to produce them together, or at least let one refer to the other.

I propose that we change the construction of TransactionProducers such that they require a Consumer.
This has some advantages:

  • Method TransactionProducer.createTransaction stays backward compatible because its type doesn't change.
  • We could even make it fully backward compatible: if a consumer is provided we do the new thing pioneered in this PR, if its not provided, nothing changes. When a Consumer is provided could also force rebalanceSafeCommits to true.
  • We may even be able to check that the provided Consumer connects to the same broker as the TransactionProducer that is being constructed.

Re. method Consumer.registerOffsetsCommittedInTransaction, this is an internal method and should not be exposed too easily. We could do this by creating a separate trait (e.g. something like ConsumerInternal) and let ConsumerLive extend both traits. This is not the best way, we can probably think of something better.

@erikvanoosten
Copy link
Collaborator

What is Consumer.markCommittedInTransaction actually used for? I don't see where it is used.

@svroonland
Copy link
Collaborator Author

Agreed, providing the Consumer upon TransactionalProducer creation is better. I think it's fine to disregard backwards compatibility here, since the old method was very complicated and not well supported, this is such an improvement in usability.

If we want to check the settings, we could have a Consumer.settings method to retrieve the settings with which the Consumer was created. Is that what you have in mind as well?

There is no Consumer.markCommittedInTransaction, only Committer.markCommittedInTransaction. I'm not sure how we can hide this method, but agree that this is not ideal. In an earlier commit this was done with an internal method on the OffsetBatch, I guess we could revert to that.

@svroonland svroonland added this to the 3.0.0 milestone Jan 5, 2025
@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jan 5, 2025

I think it's fine to disregard backwards compatibility here...

Yeah, that is a good point. We'd better make very good release notes then :)

we could have a Consumer.settings method ... Is that what you have in mind as well?

Not necessarily, but I guess that is what is needed indeed.

There is no Consumer.markCommittedInTransaction, only Committer.markCommittedInTransaction.

Check, I get it now. Well done.

Copy link
Collaborator

@erikvanoosten erikvanoosten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Some nitpicks in the comments.

BTW, enforcing rebalanceSafeCommits in the consumer would still be nice.

zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala Outdated Show resolved Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala Outdated Show resolved Hide resolved
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200"
),
rebalanceListener = transactionalRebalanceListener(streamCompleteOnRebalanceRef)
ZIO.scoped {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this extra scoped fix the spurious test failures?

@erikvanoosten erikvanoosten self-requested a review January 9, 2025 12:22
erikvanoosten added a commit that referenced this pull request Jan 9, 2025
Document the transactional producer introduced in #1425.
erikvanoosten added a commit that referenced this pull request Jan 9, 2025
Document the transactional producer introduced in #1425.
@erikvanoosten erikvanoosten force-pushed the transaction-rebalance-safe-commits branch from 545743c to c22a55f Compare January 11, 2025 10:18
@erikvanoosten erikvanoosten changed the title Transactional committing and producing with rebalanceSafeCommits Support rebalance-safe-commits with external commits Jan 11, 2025
@erikvanoosten erikvanoosten removed this from the 3.0.0 milestone Jan 11, 2025
@erikvanoosten
Copy link
Collaborator

As discussed on Discord, we pivot this PR to support rebalance-safe-commits with external commits. The transactional producer changes are moved to a future PR.

@erikvanoosten erikvanoosten self-requested a review January 11, 2025 14:12
svroonland and others added 9 commits January 12, 2025 11:36
@erikvanoosten erikvanoosten force-pushed the transaction-rebalance-safe-commits branch from c22a55f to afc01ee Compare January 12, 2025 10:38
@erikvanoosten
Copy link
Collaborator

The changes in transactional producing have been pushed in the https://github.com/zio/zio-kafka/tree/transaction-rebalance-safe-commits-followup branch.

Copy link
Collaborator Author

@svroonland svroonland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@svroonland
Copy link
Collaborator Author

So this can be merged and released in 2.10.0?

@erikvanoosten
Copy link
Collaborator

So this can be merged and released in 2.10.0?

Yes! Though that reminds me, we neer to change the version in the docs.

@erikvanoosten erikvanoosten merged commit c5da73e into master Jan 13, 2025
9 checks passed
@erikvanoosten erikvanoosten deleted the transaction-rebalance-safe-commits branch January 13, 2025 07:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants