Skip to content


Pivot to allowing registration of external commits
Browse files Browse the repository at this point in the history
The changes to the transactional producer are removed and will come back in a later PR.

Make `RunloopAccess.withRunloopZIO` private again.
  • Loading branch information
erikvanoosten committed Jan 11, 2025
1 parent 8598fca commit c22a55f
Showing 10 changed files with 124 additions and 81 deletions.
131 changes: 81 additions & 50 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
@@ -1234,67 +1234,96 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
val allMessages = (1 to messageCount).map(i => s"$i" -> f"msg$i%06d")
val (messagesBeforeRebalance, messagesAfterRebalance) = allMessages.splitAt(messageCount / 2)

def transactionalRebalanceListener(streamCompleteOnRebalanceRef: Ref[Option[Promise[Nothing, Unit]]]) =
onAssigned = _ => ZIO.unit,
onRevoked = _ =>
streamCompleteOnRebalanceRef.get.flatMap {
case Some(p) =>
ZIO.logDebug("onRevoked, awaiting stream completion") *>
p.await.timeoutFail(new InterruptedException("Timed out waiting stream to complete"))(1.minute)
case None => ZIO.unit
onLost = _ => ZIO.logDebug("Lost some partitions")

def makeCopyingTransactionalConsumer(
name: String,
consumerGroupId: String,
clientId: String,
fromTopic: String,
toTopic: String,
consumerCreated: Promise[Throwable, Unit]
tProducer: TransactionalProducer,
consumerCreated: Promise[Nothing, Unit]
): ZIO[Kafka, Throwable, Unit] =
ZIO.logAnnotate("consumer", name) {
ZIO.scoped {
(for {
consumedMessagesCounter <- Ref.make(0)
_ <- consumedMessagesCounter.get
.flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed"))

transactionalId <- randomThing("transactional")
tProducerSettings <- transactionalProducerSettings(transactionalId)
tProducer <-

tConsumer <-
.partitionedStream(Subscription.topics(fromTopic), Serde.string, Serde.string)
.flatMapPar(Int.MaxValue) { case (_, partitionStream) =>
ZStream.fromZIO(consumerCreated.succeed(())) *>
partitionStream.mapChunksZIO { records =>
ZIO.scoped {
for {
t <- tProducer.createTransaction
_ <- t.produceChunkBatch( => new ProducerRecord(toTopic, r.key, r.value)),
_ <- consumedMessagesCounter.update(_ + records.size)
} yield Chunk.empty
.tapError(e => ZIO.logError(s"Error: $e") *> <* ZIO.logDebug("Done")
} yield tConsumer)
.provideSome[Kafka & Scope](
rebalanceSafeCommits = true,
properties = Map(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200"
for {
consumedMessagesCounter <- Ref.make(0)
_ <- consumedMessagesCounter.get
.flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed"))
streamCompleteOnRebalanceRef <- Ref.make[Option[Promise[Nothing, Unit]]](None)
tConsumer <-
.partitionedAssignmentStream(Subscription.topics(fromTopic), Serde.string, Serde.string)
.mapZIO { assignedPartitions =>
for {
p <- Promise.make[Nothing, Unit]
_ <- streamCompleteOnRebalanceRef.set(Some(p))
_ <- ZIO.logDebug(s"${assignedPartitions.size} partitions assigned")
_ <- consumerCreated.succeed(())
partitionStreams =
s <- ZStream
.mergeAllUnbounded(64)(partitionStreams: _*)
.mapChunksZIO { records =>
ZIO.scoped {
for {
t <- tProducer.createTransaction
_ <- t.produceChunkBatch( => new ProducerRecord(toTopic, r.key, r.value)),
_ <- consumedMessagesCounter.update(_ + records.size)
} yield Chunk.empty
.ensuring {
for {
_ <- streamCompleteOnRebalanceRef.set(None)
_ <- p.succeed(())
c <- consumedMessagesCounter.get
_ <- ZIO.logDebug(s"Consumed $c messages")
} yield ()
} yield s
restartStreamOnRebalancing = true,
properties = Map(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200"
rebalanceListener = transactionalRebalanceListener(streamCompleteOnRebalanceRef)
.tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done")
} yield tConsumer

for {
transactionalId <- randomThing("transactional")
tProducerSettings <- transactionalProducerSettings(transactionalId)
tProducer <- TransactionalProducer.make(tProducerSettings)

topicA <- randomTopic
topicB <- randomTopic
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicA, partitions = partitionCount))
@@ -1306,26 +1335,28 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

_ <- ZIO.logDebug("Starting copier 1")
copier1ClientId = copyingGroup + "-1"
copier1Created <- Promise.make[Throwable, Unit]
copier1Created <- Promise.make[Nothing, Unit]
copier1 <- makeCopyingTransactionalConsumer(
_ <- copier1Created.await

_ <- ZIO.logDebug("Starting copier 2")
copier2ClientId = copyingGroup + "-2"
copier2Created <- Promise.make[Throwable, Unit]
copier2Created <- Promise.make[Nothing, Unit]
copier2 <- makeCopyingTransactionalConsumer(
_ <- ZIO.logDebug("Waiting for copier 2 to start")
Original file line number Diff line number Diff line change
@@ -206,8 +206,8 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {

abstract class MockCommitter extends Committer {
override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit
override val markCommittedInTransaction: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit
override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit
override val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit

override def processQueuedCommits(consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean): Task[Unit] = ZIO.unit

Original file line number Diff line number Diff line change
@@ -53,11 +53,11 @@ object KafkaTestUtils {
* Note: to run multiple tests in parallel, you need to use different transactional ids via
* `transactionalProducer(transactionalId)`.
val transactionalProducer: ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] =
val transactionalProducer: ZLayer[Kafka, Throwable, TransactionalProducer] =

def transactionalProducer(transactionalId: String): ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] =
ZLayer.makeSome[Kafka with Consumer, TransactionalProducer](
def transactionalProducer(transactionalId: String): ZLayer[Kafka, Throwable, TransactionalProducer] =
ZLayer.makeSome[Kafka, TransactionalProducer](
15 changes: 9 additions & 6 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
@@ -160,9 +160,14 @@ trait Consumer {
def metrics: Task[Map[MetricName, Metric]]

* Used internally by the [[zio.kafka.producer.TransactionalProducer]]
* Register a commit that was done externally, that is, not by this consumer.
* This method is useful when you want to use rebalance-safe-commits, but you are not committing to the Kafka brokers,
* but to some external system, for example a relational database.
* See also [[zio.kafka.consumer.ConsumerSettings.withRebalanceSafeCommits]].
def registerOffsetsCommittedInTransaction(offsetBatch: OffsetBatch): Task[Unit]
def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit]

object Consumer {
@@ -609,9 +614,7 @@ private[consumer] final class ConsumerLive private[consumer] (
override def metrics: Task[Map[MetricName, Metric]] =

override def registerOffsetsCommittedInTransaction(
offsetBatch: OffsetBatch
): Task[Unit] =
runloopAccess.withRunloopZIO(true)(runloop => runloop.registerOffsetsCommittedInTransaction(offsetBatch))
override def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] =

Original file line number Diff line number Diff line change
@@ -230,6 +230,9 @@ final case class ConsumerSettings(
* Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this
* calculates to 3 minutes. See [[#withMaxRebalanceDuration]] to change the default.
* External commits (that is, commits to an external system, e.g. a relational database) must be registered to the
* consumer with [[Consumer.registerExternalCommits]].
* When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any
* offset commits from these streams have a high chance of being delayed (commits are not possible during some phases
* of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will
Original file line number Diff line number Diff line change
@@ -11,8 +11,12 @@ import java.lang.Math.max
import scala.collection.mutable

private[internal] trait Committer {

/** A function to commit offsets. */
val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit]
val markCommittedInTransaction: Map[TopicPartition, OffsetAndMetadata] => Task[Unit]

/** A function to register offsets that have been committed externally. */
val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit]

* Takes commits from the queue, commits them and adds them to pending commits
Original file line number Diff line number Diff line change
@@ -23,11 +23,14 @@ private[consumer] final class LiveCommitter(
pendingCommits: Ref.Synchronized[Chunk[Commit]]
) extends Committer {

override val markCommittedInTransaction: Map[
override val registerExternalCommits: Map[
] => Task[Unit] = offsets =>
committedOffsetsRef.modify(_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))).unit
committedOffsetsRef.modify {
// The continuation promise can be `null` because this commit is not actually handled by the consumer.
_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))

/** This is the implementation behind the user facing api `Offset.commit`. */
override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] =
Original file line number Diff line number Diff line change
@@ -533,8 +533,8 @@ private[consumer] final class Runloop private (

def registerOffsetsCommittedInTransaction(offsetBatch: OffsetBatch): Task[Unit] =
def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit] =

object Runloop {
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription }
import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, OffsetBatch, Subscription }
import{ Stream, Take, UStream, ZStream }
import zio._

@@ -31,7 +31,7 @@ private[consumer] final class RunloopAccess private (
diagnostics: Diagnostics
) {

def withRunloopZIO[E](
private def withRunloopZIO[E](
requireRunning: Boolean
)(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] =
runloopStateRef.updateSomeAndGetZIO {
@@ -66,6 +66,9 @@ private[consumer] final class RunloopAccess private (
} yield stream

def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] =
withRunloopZIO(requireRunning = true)(_.registerExternalCommits(externallyCommittedOffsets))


private[consumer] object RunloopAccess {
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException
import org.apache.kafka.common.serialization.ByteArraySerializer
import zio.Cause.Fail
import zio._
import zio.kafka.consumer.{ Consumer, OffsetBatch }
import zio.kafka.consumer.OffsetBatch

import java.util
import scala.jdk.CollectionConverters._
@@ -22,12 +22,11 @@ object TransactionalProducer {

private final class LiveTransactionalProducer(
live: ProducerLive,
semaphore: Semaphore,
consumer: Consumer
semaphore: Semaphore
) extends TransactionalProducer {
private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction())

private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): ZIO[Any, Throwable, Unit] = {
private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): Task[Unit] = {
val sendOffsetsToTransaction: Task[Unit] =
ZIO.suspend {
@inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] =
@@ -49,12 +48,10 @@ object TransactionalProducer {

sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *>
ZIO.attemptBlocking(live.p.commitTransaction()) *>
sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction())

private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): ZIO[Any, Nothing, Unit] =
private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] =
exit match {
case Exit.Success(_) =>
@@ -78,15 +75,15 @@ object TransactionalProducer {
def createTransaction: ZIO[TransactionalProducer & Scope, Throwable, Transaction] =

val live: RLayer[TransactionalProducerSettings with Consumer, TransactionalProducer] =
val live: RLayer[TransactionalProducerSettings, TransactionalProducer] =
ZLayer.scoped {
for {
settings <- ZIO.service[TransactionalProducerSettings]
producer <- make(settings)
} yield producer

def make(settings: TransactionalProducerSettings): ZIO[Scope with Consumer, Throwable, TransactionalProducer] =
def make(settings: TransactionalProducerSettings): ZIO[Scope, Throwable, TransactionalProducer] =
for {
rawProducer <- ZIO.acquireRelease(
@@ -105,7 +102,6 @@ object TransactionalProducer {
live = new ProducerLive(rawProducer, runtime, sendQueue)
_ <- ZIO.blocking(live.sendFromQueue).forkScoped
consumer <- ZIO.service[Consumer]
} yield new LiveTransactionalProducer(live, semaphore, consumer)
_ <- ZIO.blocking(live.sendFromQueue).forkScoped
} yield new LiveTransactionalProducer(live, semaphore)

0 comments on commit c22a55f

Please sign in to comment.