From abbf8111ef6cadab6c2512172a0b3864cf18b014 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 29 Dec 2024 10:09:44 +0100 Subject: [PATCH 01/12] Transactional committing and producing with rebalanceSafeCommits Just a POC for now, this was the easiest interface change I could think of --- .../zio/kafka/consumer/ConsumerSpec.scala | 23 ++----------------- .../internal/PartitionStreamControlSpec.scala | 1 + .../internal/RebalanceCoordinatorSpec.scala | 4 +++- .../kafka/consumer/CommittableRecord.scala | 4 ++++ .../scala/zio/kafka/consumer/Offset.scala | 2 ++ .../zio/kafka/consumer/OffsetBatch.scala | 6 ++++- .../kafka/consumer/internal/Committer.scala | 1 + .../consumer/internal/LiveCommitter.scala | 6 +++++ .../zio/kafka/consumer/internal/Runloop.scala | 1 + .../producer/TransactionalProducer.scala | 4 +++- 10 files changed, 28 insertions(+), 24 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ce7cbae65..f274a1034 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1234,19 +1234,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val allMessages = (1 to messageCount).map(i => s"$i" -> f"msg$i%06d") val (messagesBeforeRebalance, messagesAfterRebalance) = allMessages.splitAt(messageCount / 2) - def transactionalRebalanceListener(streamCompleteOnRebalanceRef: Ref[Option[Promise[Nothing, Unit]]]) = - RebalanceListener( - onAssigned = _ => ZIO.unit, - onRevoked = _ => - streamCompleteOnRebalanceRef.get.flatMap { - case Some(p) => - ZIO.logDebug("onRevoked, awaiting stream completion") *> - p.await.timeoutFail(new InterruptedException("Timed out waiting stream to complete"))(1.minute) - case None => ZIO.unit - }, - onLost = _ => ZIO.logDebug("Lost some partitions") - ) - def makeCopyingTransactionalConsumer( name: String, consumerGroupId: String, @@ -1263,14 +1250,11 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed")) .repeat(Schedule.fixed(1.second)) .fork - streamCompleteOnRebalanceRef <- Ref.make[Option[Promise[Nothing, Unit]]](None) tConsumer <- Consumer .partitionedAssignmentStream(Subscription.topics(fromTopic), Serde.string, Serde.string) .mapZIO { assignedPartitions => for { - p <- Promise.make[Nothing, Unit] - _ <- streamCompleteOnRebalanceRef.set(Some(p)) _ <- ZIO.logDebug(s"${assignedPartitions.size} partitions assigned") _ <- consumerCreated.succeed(()) partitionStreams = assignedPartitions.map(_._2) @@ -1293,8 +1277,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .runDrain .ensuring { for { - _ <- streamCompleteOnRebalanceRef.set(None) - _ <- p.succeed(()) c <- consumedMessagesCounter.get _ <- ZIO.logDebug(s"Consumed $c messages") } yield () @@ -1306,13 +1288,12 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { transactionalConsumer( clientId, consumerGroupId, - restartStreamOnRebalancing = true, + rebalanceSafeCommits = true, properties = Map( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> implicitly[ClassTag[T]].runtimeClass.getName, ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" - ), - rebalanceListener = transactionalRebalanceListener(streamCompleteOnRebalanceRef) + ) ) ) .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala index 9fcc3a979..158ac54d5 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala @@ -177,6 +177,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault { Array[Byte]() ), commitHandle = _ => ZIO.unit, + markCommittedInTransactionHandle = _ => ZIO.unit, consumerGroupMetadata = None ) ) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala index 514063acf..3947e135e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala @@ -199,6 +199,7 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { Array[Byte]() ), commitHandle = _ => ZIO.unit, + markCommittedInTransactionHandle = _ => ZIO.unit, consumerGroupMetadata = None ) ) @@ -206,7 +207,8 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { } abstract class MockCommitter extends Committer { - override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit + override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit + override val markCommittedInTransaction: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit override def processQueuedCommits(consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean): Task[Unit] = ZIO.unit diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index d138ea6d8..8246becc3 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -8,6 +8,7 @@ import zio.{ RIO, Task } final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + private val markCommittedInTransactionHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { def deserializeWith[R, K1, V1]( @@ -44,6 +45,7 @@ final case class CommittableRecord[K, V]( partition = record.partition(), offset = record.offset(), commitHandle = commitHandle, + markCommittedInTransactionHandle = markCommittedInTransactionHandle, consumerGroupMetadata = consumerGroupMetadata, metadata = None ) @@ -53,11 +55,13 @@ object CommittableRecord { def apply[K, V]( record: ConsumerRecord[K, V], commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + markCommittedInTransactionHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ): CommittableRecord[K, V] = new CommittableRecord( record = record, commitHandle = commitHandle, + markCommittedInTransactionHandle = markCommittedInTransactionHandle, consumerGroupMetadata = consumerGroupMetadata ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala index 07907e8ff..89b326e4f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala @@ -46,6 +46,7 @@ private final case class OffsetImpl( partition: Int, offset: Long, commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + markCommittedInTransactionHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata], metadata: Option[String] = None ) extends Offset { @@ -53,6 +54,7 @@ private final case class OffsetImpl( def batch: OffsetBatch = OffsetBatchImpl( Map(topicPartition -> asJavaOffsetAndMetadata), commitHandle, + markCommittedInTransactionHandle, consumerGroupMetadata ) def withMetadata(metadata: String): OffsetImpl = copy(metadata = Some(metadata)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala index 6c4b5a977..1b498a693 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala @@ -7,6 +7,7 @@ import zio.{ RIO, Schedule, Task, ZIO } sealed trait OffsetBatch { def offsets: Map[TopicPartition, OffsetAndMetadata] def commit: Task[Unit] + private[kafka] def markCommittedInTransaction: Task[Unit] def add(offset: Offset): OffsetBatch @deprecated("Use add(Offset) instead", "2.1.4") def merge(offset: Offset): OffsetBatch @@ -30,9 +31,11 @@ object OffsetBatch { private final case class OffsetBatchImpl( offsets: Map[TopicPartition, OffsetAndMetadata], commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + markCommittedInTransactionHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ) extends OffsetBatch { - override def commit: Task[Unit] = commitHandle(offsets) + override def commit: Task[Unit] = commitHandle(offsets) + override private[kafka] def markCommittedInTransaction = markCommittedInTransactionHandle(offsets) override def add(offset: Offset): OffsetBatch = { val maxOffsetAndMetadata = offsets.get(offset.topicPartition) match { @@ -65,6 +68,7 @@ private final case class OffsetBatchImpl( case object EmptyOffsetBatch extends OffsetBatch { override val offsets: Map[TopicPartition, OffsetAndMetadata] = Map.empty override val commit: Task[Unit] = ZIO.unit + override val markCommittedInTransaction: Task[Unit] = ZIO.unit override def add(offset: Offset): OffsetBatch = offset.batch override def merge(offset: Offset): OffsetBatch = add(offset) override def merge(offsets: OffsetBatch): OffsetBatch = offsets diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala index a40427dcc..d4efa4455 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala @@ -12,6 +12,7 @@ import scala.collection.mutable private[internal] trait Committer { val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] + val markCommittedInTransaction: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] /** * Takes commits from the queue, commits them and adds them to pending commits diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala index 6ababd40f..21d550bcf 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala @@ -23,6 +23,12 @@ private[consumer] final class LiveCommitter( pendingCommits: Ref.Synchronized[Chunk[Commit]] ) extends Committer { + override val markCommittedInTransaction: Map[ + TopicPartition, + OffsetAndMetadata + ] => Task[Unit] = offsets => + committedOffsetsRef.modify(_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))).unit + /** This is the implementation behind the user facing api `Offset.commit`. */ override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = offsets => diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 3c277d8c0..f7cf53503 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -143,6 +143,7 @@ private[consumer] final class Runloop private ( CommittableRecord[Array[Byte], Array[Byte]]( record = consumerRecord, commitHandle = committer.commit, + markCommittedInTransactionHandle = committer.markCommittedInTransaction, consumerGroupMetadata = consumerGroupMetadata ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 173f43100..74d8b6a07 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -48,7 +48,9 @@ object TransactionalProducer { } } - sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) + sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> + ZIO.attemptBlocking(live.p.commitTransaction()) *> + offsetBatch.markCommittedInTransaction } private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] = From 833314e5c0c332237a83444f75887017fb5bb229 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Wed, 1 Jan 2025 11:23:57 +0100 Subject: [PATCH 02/12] Simplify stream --- .../zio/kafka/consumer/ConsumerSpec.scala | 47 +++++++------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index f274a1034..8a0fcb3ef 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1252,36 +1252,23 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .fork tConsumer <- Consumer - .partitionedAssignmentStream(Subscription.topics(fromTopic), Serde.string, Serde.string) - .mapZIO { assignedPartitions => - for { - _ <- ZIO.logDebug(s"${assignedPartitions.size} partitions assigned") - _ <- consumerCreated.succeed(()) - partitionStreams = assignedPartitions.map(_._2) - s <- ZStream - .mergeAllUnbounded(64)(partitionStreams: _*) - .mapChunksZIO { records => - ZIO.scoped { - for { - t <- tProducer.createTransaction - _ <- t.produceChunkBatch( - records.map(r => new ProducerRecord(toTopic, r.key, r.value)), - Serde.string, - Serde.string, - OffsetBatch(records.map(_.offset)) - ) - _ <- consumedMessagesCounter.update(_ + records.size) - } yield Chunk.empty - }.uninterruptible - } - .runDrain - .ensuring { - for { - c <- consumedMessagesCounter.get - _ <- ZIO.logDebug(s"Consumed $c messages") - } yield () - } - } yield s + .partitionedStream(Subscription.topics(fromTopic), Serde.string, Serde.string) + .flatMapPar(Int.MaxValue) { case (_, partitionStream) => + ZStream.fromZIO(consumerCreated.succeed(())) *> + partitionStream.mapChunksZIO { records => + ZIO.scoped { + for { + t <- tProducer.createTransaction + _ <- t.produceChunkBatch( + records.map(r => new ProducerRecord(toTopic, r.key, r.value)), + Serde.string, + Serde.string, + OffsetBatch(records.map(_.offset)) + ) + _ <- consumedMessagesCounter.update(_ + records.size) + } yield Chunk.empty + } + } } .runDrain .provideSome[Kafka]( From 414175f22d09a2dad790d21a455ed4602b7e82ff Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Wed, 1 Jan 2025 20:50:36 +0100 Subject: [PATCH 03/12] Alternative implementation --- .../zio/kafka/consumer/CommittableRecord.scala | 1 - .../main/scala/zio/kafka/consumer/Consumer.scala | 9 +++++++++ .../src/main/scala/zio/kafka/consumer/Offset.scala | 2 -- .../scala/zio/kafka/consumer/OffsetBatch.scala | 6 +----- .../zio/kafka/consumer/internal/Runloop.scala | 3 +++ .../kafka/consumer/internal/RunloopAccess.scala | 2 +- .../zio/kafka/producer/TransactionalProducer.scala | 14 +++++++------- 7 files changed, 21 insertions(+), 16 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index 8246becc3..741f60dba 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -45,7 +45,6 @@ final case class CommittableRecord[K, V]( partition = record.partition(), offset = record.offset(), commitHandle = commitHandle, - markCommittedInTransactionHandle = markCommittedInTransactionHandle, consumerGroupMetadata = consumerGroupMetadata, metadata = None ) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 9d7ab0a22..8eee66294 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -158,6 +158,11 @@ trait Consumer { * Expose internal consumer metrics */ def metrics: Task[Map[MetricName, Metric]] + + /** + * Used internally by the [[zio.kafka.producer.TransactionalProducer]] + */ + def registerOffsetsCommittedInTransaction(offsetBatch: OffsetBatch): Task[Unit] } object Consumer { @@ -604,4 +609,8 @@ private[consumer] final class ConsumerLive private[consumer] ( override def metrics: Task[Map[MetricName, Metric]] = consumer.withConsumer(_.metrics().asScala.toMap) + override def registerOffsetsCommittedInTransaction( + offsetBatch: OffsetBatch + ): Task[Unit] = runloopAccess.withRunloopZIO(true)(runloop => runloop.registerOffsetsCommittedInTransaction(offsetBatch)) + } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala index 89b326e4f..07907e8ff 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala @@ -46,7 +46,6 @@ private final case class OffsetImpl( partition: Int, offset: Long, commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], - markCommittedInTransactionHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata], metadata: Option[String] = None ) extends Offset { @@ -54,7 +53,6 @@ private final case class OffsetImpl( def batch: OffsetBatch = OffsetBatchImpl( Map(topicPartition -> asJavaOffsetAndMetadata), commitHandle, - markCommittedInTransactionHandle, consumerGroupMetadata ) def withMetadata(metadata: String): OffsetImpl = copy(metadata = Some(metadata)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala index 1b498a693..6c4b5a977 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala @@ -7,7 +7,6 @@ import zio.{ RIO, Schedule, Task, ZIO } sealed trait OffsetBatch { def offsets: Map[TopicPartition, OffsetAndMetadata] def commit: Task[Unit] - private[kafka] def markCommittedInTransaction: Task[Unit] def add(offset: Offset): OffsetBatch @deprecated("Use add(Offset) instead", "2.1.4") def merge(offset: Offset): OffsetBatch @@ -31,11 +30,9 @@ object OffsetBatch { private final case class OffsetBatchImpl( offsets: Map[TopicPartition, OffsetAndMetadata], commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], - markCommittedInTransactionHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ) extends OffsetBatch { - override def commit: Task[Unit] = commitHandle(offsets) - override private[kafka] def markCommittedInTransaction = markCommittedInTransactionHandle(offsets) + override def commit: Task[Unit] = commitHandle(offsets) override def add(offset: Offset): OffsetBatch = { val maxOffsetAndMetadata = offsets.get(offset.topicPartition) match { @@ -68,7 +65,6 @@ private final case class OffsetBatchImpl( case object EmptyOffsetBatch extends OffsetBatch { override val offsets: Map[TopicPartition, OffsetAndMetadata] = Map.empty override val commit: Task[Unit] = ZIO.unit - override val markCommittedInTransaction: Task[Unit] = ZIO.unit override def add(offset: Offset): OffsetBatch = offset.batch override def merge(offset: Offset): OffsetBatch = add(offset) override def merge(offsets: OffsetBatch): OffsetBatch = offsets diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index f7cf53503..d65e8310b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -533,6 +533,9 @@ private[consumer] final class Runloop private ( .repeat(runloopMetricsSchedule) .unit } + + def registerOffsetsCommittedInTransaction(offsetBatch: OffsetBatch): Task[Unit] = + committer.markCommittedInTransaction(offsetBatch.offsets) } object Runloop { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 90c787f84..5a0f2980e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -31,7 +31,7 @@ private[consumer] final class RunloopAccess private ( diagnostics: Diagnostics ) { - private def withRunloopZIO[E]( + def withRunloopZIO[E]( requireRunning: Boolean )(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] = runloopStateRef.updateSomeAndGetZIO { diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 74d8b6a07..94a207eb2 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -7,13 +7,13 @@ import org.apache.kafka.common.errors.InvalidGroupIdException import org.apache.kafka.common.serialization.ByteArraySerializer import zio.Cause.Fail import zio._ -import zio.kafka.consumer.OffsetBatch +import zio.kafka.consumer.{ Consumer, OffsetBatch } import java.util import scala.jdk.CollectionConverters._ trait TransactionalProducer { - def createTransaction: ZIO[Scope, Throwable, Transaction] + def createTransaction: ZIO[Scope with Consumer, Throwable, Transaction] } object TransactionalProducer { @@ -26,7 +26,7 @@ object TransactionalProducer { ) extends TransactionalProducer { private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction()) - private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): Task[Unit] = { + private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): ZIO[Consumer, Throwable, Unit] = { val sendOffsetsToTransaction: Task[Unit] = ZIO.suspend { @inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] = @@ -50,10 +50,10 @@ object TransactionalProducer { sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) *> - offsetBatch.markCommittedInTransaction + ZIO.serviceWithZIO[Consumer](_.registerOffsetsCommittedInTransaction(offsetBatch)).unit } - private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] = + private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): ZIO[Consumer, Nothing, Unit] = exit match { case Exit.Success(_) => transaction.offsetBatchRef.get @@ -62,7 +62,7 @@ object TransactionalProducer { case Exit.Failure(_) => abortTransaction.retryN(5).orDie } - override def createTransaction: ZIO[Scope, Throwable, Transaction] = + override def createTransaction: ZIO[Scope with Consumer, Throwable, Transaction] = semaphore.withPermitScoped *> { ZIO.acquireReleaseExit { for { @@ -74,7 +74,7 @@ object TransactionalProducer { } } - def createTransaction: ZIO[TransactionalProducer & Scope, Throwable, Transaction] = + def createTransaction: ZIO[TransactionalProducer & Scope & Consumer, Throwable, Transaction] = ZIO.service[TransactionalProducer].flatMap(_.createTransaction) val live: RLayer[TransactionalProducerSettings, TransactionalProducer] = From 8a34c12a1373ec0f5d0d812926a4e3abcbef3f45 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Wed, 1 Jan 2025 20:52:45 +0100 Subject: [PATCH 04/12] Fixup --- .../kafka/consumer/internal/PartitionStreamControlSpec.scala | 1 - .../zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala | 1 - .../src/main/scala/zio/kafka/consumer/CommittableRecord.scala | 3 --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 1 - 4 files changed, 6 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala index 158ac54d5..9fcc3a979 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala @@ -177,7 +177,6 @@ object PartitionStreamControlSpec extends ZIOSpecDefault { Array[Byte]() ), commitHandle = _ => ZIO.unit, - markCommittedInTransactionHandle = _ => ZIO.unit, consumerGroupMetadata = None ) ) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala index 3947e135e..6cab115c4 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala @@ -199,7 +199,6 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { Array[Byte]() ), commitHandle = _ => ZIO.unit, - markCommittedInTransactionHandle = _ => ZIO.unit, consumerGroupMetadata = None ) ) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index 741f60dba..d138ea6d8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -8,7 +8,6 @@ import zio.{ RIO, Task } final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], - private val markCommittedInTransactionHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { def deserializeWith[R, K1, V1]( @@ -54,13 +53,11 @@ object CommittableRecord { def apply[K, V]( record: ConsumerRecord[K, V], commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], - markCommittedInTransactionHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ): CommittableRecord[K, V] = new CommittableRecord( record = record, commitHandle = commitHandle, - markCommittedInTransactionHandle = markCommittedInTransactionHandle, consumerGroupMetadata = consumerGroupMetadata ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index d65e8310b..0355f25e6 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -143,7 +143,6 @@ private[consumer] final class Runloop private ( CommittableRecord[Array[Byte], Array[Byte]]( record = consumerRecord, commitHandle = committer.commit, - markCommittedInTransactionHandle = committer.markCommittedInTransaction, consumerGroupMetadata = consumerGroupMetadata ) } From ed96dd922ac9da07670626d0b30b98c17a673006 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Thu, 2 Jan 2025 10:37:08 +0100 Subject: [PATCH 05/12] Formatting --- zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 8eee66294..6ebca93db 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -611,6 +611,7 @@ private[consumer] final class ConsumerLive private[consumer] ( override def registerOffsetsCommittedInTransaction( offsetBatch: OffsetBatch - ): Task[Unit] = runloopAccess.withRunloopZIO(true)(runloop => runloop.registerOffsetsCommittedInTransaction(offsetBatch)) + ): Task[Unit] = + runloopAccess.withRunloopZIO(true)(runloop => runloop.registerOffsetsCommittedInTransaction(offsetBatch)) } From a857bc663fe79527f949b4eb355a129ccff7f223 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sat, 4 Jan 2025 17:05:08 +0100 Subject: [PATCH 06/12] Provide Consumer upon creation of TransactionalProducer --- .../zio/kafka/consumer/ConsumerSpec.scala | 100 +++++++++--------- .../zio/kafka/testkit/KafkaTestUtils.scala | 6 +- .../producer/TransactionalProducer.scala | 24 +++-- 3 files changed, 68 insertions(+), 62 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 8a0fcb3ef..aff508271 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -19,7 +19,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{ SubscriptionFinalized } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.producer.{ Producer, TransactionalProducer } +import zio.kafka.producer.{ Producer, TransactionalProducer, TransactionalProducerSettings } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit.{ Kafka, KafkaRandom } @@ -1240,62 +1240,66 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { clientId: String, fromTopic: String, toTopic: String, - tProducer: TransactionalProducer, + tProducerSettings: TransactionalProducerSettings, consumerCreated: Promise[Nothing, Unit] ): ZIO[Kafka, Throwable, Unit] = ZIO.logAnnotate("consumer", name) { - for { - consumedMessagesCounter <- Ref.make(0) - _ <- consumedMessagesCounter.get - .flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed")) - .repeat(Schedule.fixed(1.second)) - .fork - tConsumer <- - Consumer - .partitionedStream(Subscription.topics(fromTopic), Serde.string, Serde.string) - .flatMapPar(Int.MaxValue) { case (_, partitionStream) => - ZStream.fromZIO(consumerCreated.succeed(())) *> - partitionStream.mapChunksZIO { records => - ZIO.scoped { - for { - t <- tProducer.createTransaction - _ <- t.produceChunkBatch( - records.map(r => new ProducerRecord(toTopic, r.key, r.value)), - Serde.string, - Serde.string, - OffsetBatch(records.map(_.offset)) - ) - _ <- consumedMessagesCounter.update(_ + records.size) - } yield Chunk.empty + ZIO.scoped { + (for { + consumedMessagesCounter <- Ref.make(0) + _ <- consumedMessagesCounter.get + .flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed")) + .repeat(Schedule.fixed(1.second)) + .fork + + tProducer <- + TransactionalProducer.make(tProducerSettings) + + tConsumer <- + Consumer + .partitionedStream(Subscription.topics(fromTopic), Serde.string, Serde.string) + .flatMapPar(Int.MaxValue) { case (_, partitionStream) => + ZStream.fromZIO(consumerCreated.succeed(())) *> + partitionStream.mapChunksZIO { records => + ZIO.scoped { + for { + t <- tProducer.createTransaction + _ <- t.produceChunkBatch( + records.map(r => new ProducerRecord(toTopic, r.key, r.value)), + Serde.string, + Serde.string, + OffsetBatch(records.map(_.offset)) + ) + _ <- consumedMessagesCounter.update(_ + records.size) + } yield Chunk.empty + } } - } - } - .runDrain - .provideSome[Kafka]( - transactionalConsumer( - clientId, - consumerGroupId, - rebalanceSafeCommits = true, - properties = Map( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> - implicitly[ClassTag[T]].runtimeClass.getName, - ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" - ) + } + .runDrain + .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") + } yield tConsumer) + .provideSome[Kafka & Scope]( + transactionalConsumer( + clientId, + consumerGroupId, + rebalanceSafeCommits = true, + properties = Map( + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> + implicitly[ClassTag[T]].runtimeClass.getName, + ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" ) ) - .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") - } yield tConsumer + ) + } } for { transactionalId <- randomThing("transactional") tProducerSettings <- transactionalProducerSettings(transactionalId) - tProducer <- TransactionalProducer.make(tProducerSettings) - - topicA <- randomTopic - topicB <- randomTopic - _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicA, partitions = partitionCount)) - _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicB, partitions = partitionCount)) + topicA <- randomTopic + topicB <- randomTopic + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicA, partitions = partitionCount)) + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicB, partitions = partitionCount)) _ <- produceMany(topicA, messagesBeforeRebalance) @@ -1310,7 +1314,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { copier1ClientId, topicA, topicB, - tProducer, + tProducerSettings, copier1Created ).fork _ <- copier1Created.await @@ -1324,7 +1328,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { copier2ClientId, topicA, topicB, - tProducer, + tProducerSettings, copier2Created ).fork _ <- ZIO.logDebug("Waiting for copier 2 to start") diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 9d6ad7a8a..aa2a926bb 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -53,11 +53,11 @@ object KafkaTestUtils { * Note: to run multiple tests in parallel, you need to use different transactional ids via * `transactionalProducer(transactionalId)`. */ - val transactionalProducer: ZLayer[Kafka, Throwable, TransactionalProducer] = + val transactionalProducer: ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] = transactionalProducer("test-transaction") - def transactionalProducer(transactionalId: String): ZLayer[Kafka, Throwable, TransactionalProducer] = - ZLayer.makeSome[Kafka, TransactionalProducer]( + def transactionalProducer(transactionalId: String): ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] = + ZLayer.makeSome[Kafka with Consumer, TransactionalProducer]( ZLayer(transactionalProducerSettings(transactionalId)), TransactionalProducer.live ) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 94a207eb2..431caf103 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -13,7 +13,7 @@ import java.util import scala.jdk.CollectionConverters._ trait TransactionalProducer { - def createTransaction: ZIO[Scope with Consumer, Throwable, Transaction] + def createTransaction: ZIO[Scope, Throwable, Transaction] } object TransactionalProducer { @@ -22,11 +22,12 @@ object TransactionalProducer { private final class LiveTransactionalProducer( live: ProducerLive, - semaphore: Semaphore + semaphore: Semaphore, + consumer: Consumer ) extends TransactionalProducer { private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction()) - private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): ZIO[Consumer, Throwable, Unit] = { + private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): ZIO[Any, Throwable, Unit] = { val sendOffsetsToTransaction: Task[Unit] = ZIO.suspend { @inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] = @@ -50,10 +51,10 @@ object TransactionalProducer { sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) *> - ZIO.serviceWithZIO[Consumer](_.registerOffsetsCommittedInTransaction(offsetBatch)).unit + consumer.registerOffsetsCommittedInTransaction(offsetBatch).unit } - private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): ZIO[Consumer, Nothing, Unit] = + private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): ZIO[Any, Nothing, Unit] = exit match { case Exit.Success(_) => transaction.offsetBatchRef.get @@ -62,7 +63,7 @@ object TransactionalProducer { case Exit.Failure(_) => abortTransaction.retryN(5).orDie } - override def createTransaction: ZIO[Scope with Consumer, Throwable, Transaction] = + override def createTransaction: ZIO[Scope, Throwable, Transaction] = semaphore.withPermitScoped *> { ZIO.acquireReleaseExit { for { @@ -74,10 +75,10 @@ object TransactionalProducer { } } - def createTransaction: ZIO[TransactionalProducer & Scope & Consumer, Throwable, Transaction] = + def createTransaction: ZIO[TransactionalProducer & Scope, Throwable, Transaction] = ZIO.service[TransactionalProducer].flatMap(_.createTransaction) - val live: RLayer[TransactionalProducerSettings, TransactionalProducer] = + val live: RLayer[TransactionalProducerSettings with Consumer, TransactionalProducer] = ZLayer.scoped { for { settings <- ZIO.service[TransactionalProducerSettings] @@ -85,7 +86,7 @@ object TransactionalProducer { } yield producer } - def make(settings: TransactionalProducerSettings): ZIO[Scope, Throwable, TransactionalProducer] = + def make(settings: TransactionalProducerSettings): ZIO[Scope with Consumer, Throwable, TransactionalProducer] = for { rawProducer <- ZIO.acquireRelease( ZIO.attempt( @@ -104,6 +105,7 @@ object TransactionalProducer { settings.producerSettings.sendBufferSize ) live = new ProducerLive(rawProducer, runtime, sendQueue) - _ <- ZIO.blocking(live.sendFromQueue).forkScoped - } yield new LiveTransactionalProducer(live, semaphore) + _ <- ZIO.blocking(live.sendFromQueue).forkScoped + consumer <- ZIO.service[Consumer] + } yield new LiveTransactionalProducer(live, semaphore, consumer) } From 933e50e6b85dcf4e55c06d373a1d31bf91df1a29 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 5 Jan 2025 11:25:48 +0100 Subject: [PATCH 07/12] Random transaction ID per consumer --- .../zio/kafka/consumer/ConsumerSpec.scala | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index aff508271..d3f94deb0 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -19,7 +19,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{ SubscriptionFinalized } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.producer.{ Producer, TransactionalProducer, TransactionalProducerSettings } +import zio.kafka.producer.{ Producer, TransactionalProducer } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit.{ Kafka, KafkaRandom } @@ -1240,8 +1240,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { clientId: String, fromTopic: String, toTopic: String, - tProducerSettings: TransactionalProducerSettings, - consumerCreated: Promise[Nothing, Unit] + consumerCreated: Promise[Throwable, Unit] ): ZIO[Kafka, Throwable, Unit] = ZIO.logAnnotate("consumer", name) { ZIO.scoped { @@ -1252,6 +1251,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .repeat(Schedule.fixed(1.second)) .fork + transactionalId <- randomThing("transactional") + tProducerSettings <- transactionalProducerSettings(transactionalId) tProducer <- TransactionalProducer.make(tProducerSettings) @@ -1276,7 +1277,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } } .runDrain - .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") + .tapError(e => ZIO.logError(s"Error: $e") *> consumerCreated.fail(e)) <* ZIO.logDebug("Done") } yield tConsumer) .provideSome[Kafka & Scope]( transactionalConsumer( @@ -1294,12 +1295,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } for { - transactionalId <- randomThing("transactional") - tProducerSettings <- transactionalProducerSettings(transactionalId) - topicA <- randomTopic - topicB <- randomTopic - _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicA, partitions = partitionCount)) - _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicB, partitions = partitionCount)) + topicA <- randomTopic + topicB <- randomTopic + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicA, partitions = partitionCount)) + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicB, partitions = partitionCount)) _ <- produceMany(topicA, messagesBeforeRebalance) @@ -1307,28 +1306,26 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- ZIO.logDebug("Starting copier 1") copier1ClientId = copyingGroup + "-1" - copier1Created <- Promise.make[Nothing, Unit] + copier1Created <- Promise.make[Throwable, Unit] copier1 <- makeCopyingTransactionalConsumer( "1", copyingGroup, copier1ClientId, topicA, topicB, - tProducerSettings, copier1Created ).fork _ <- copier1Created.await _ <- ZIO.logDebug("Starting copier 2") copier2ClientId = copyingGroup + "-2" - copier2Created <- Promise.make[Nothing, Unit] + copier2Created <- Promise.make[Throwable, Unit] copier2 <- makeCopyingTransactionalConsumer( "2", copyingGroup, copier2ClientId, topicA, topicB, - tProducerSettings, copier2Created ).fork _ <- ZIO.logDebug("Waiting for copier 2 to start") From 064e710477bc7db17bdf0b80e9bf8d03c176b57e Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 11 Jan 2025 10:43:07 +0100 Subject: [PATCH 08/12] Pivot to allowing registration of external commits The changes to the transactional producer are removed and will come back in a later PR. Make `RunloopAccess.withRunloopZIO` private again. --- .../zio/kafka/consumer/ConsumerSpec.scala | 131 +++++++++++------- .../internal/RebalanceCoordinatorSpec.scala | 4 +- .../zio/kafka/testkit/KafkaTestUtils.scala | 6 +- .../scala/zio/kafka/consumer/Consumer.scala | 15 +- .../zio/kafka/consumer/ConsumerSettings.scala | 3 + .../kafka/consumer/internal/Committer.scala | 6 +- .../consumer/internal/LiveCommitter.scala | 7 +- .../zio/kafka/consumer/internal/Runloop.scala | 4 +- .../consumer/internal/RunloopAccess.scala | 7 +- .../producer/TransactionalProducer.scala | 22 ++- 10 files changed, 124 insertions(+), 81 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index d3f94deb0..ce7cbae65 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1234,67 +1234,96 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val allMessages = (1 to messageCount).map(i => s"$i" -> f"msg$i%06d") val (messagesBeforeRebalance, messagesAfterRebalance) = allMessages.splitAt(messageCount / 2) + def transactionalRebalanceListener(streamCompleteOnRebalanceRef: Ref[Option[Promise[Nothing, Unit]]]) = + RebalanceListener( + onAssigned = _ => ZIO.unit, + onRevoked = _ => + streamCompleteOnRebalanceRef.get.flatMap { + case Some(p) => + ZIO.logDebug("onRevoked, awaiting stream completion") *> + p.await.timeoutFail(new InterruptedException("Timed out waiting stream to complete"))(1.minute) + case None => ZIO.unit + }, + onLost = _ => ZIO.logDebug("Lost some partitions") + ) + def makeCopyingTransactionalConsumer( name: String, consumerGroupId: String, clientId: String, fromTopic: String, toTopic: String, - consumerCreated: Promise[Throwable, Unit] + tProducer: TransactionalProducer, + consumerCreated: Promise[Nothing, Unit] ): ZIO[Kafka, Throwable, Unit] = ZIO.logAnnotate("consumer", name) { - ZIO.scoped { - (for { - consumedMessagesCounter <- Ref.make(0) - _ <- consumedMessagesCounter.get - .flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed")) - .repeat(Schedule.fixed(1.second)) - .fork - - transactionalId <- randomThing("transactional") - tProducerSettings <- transactionalProducerSettings(transactionalId) - tProducer <- - TransactionalProducer.make(tProducerSettings) - - tConsumer <- - Consumer - .partitionedStream(Subscription.topics(fromTopic), Serde.string, Serde.string) - .flatMapPar(Int.MaxValue) { case (_, partitionStream) => - ZStream.fromZIO(consumerCreated.succeed(())) *> - partitionStream.mapChunksZIO { records => - ZIO.scoped { - for { - t <- tProducer.createTransaction - _ <- t.produceChunkBatch( - records.map(r => new ProducerRecord(toTopic, r.key, r.value)), - Serde.string, - Serde.string, - OffsetBatch(records.map(_.offset)) - ) - _ <- consumedMessagesCounter.update(_ + records.size) - } yield Chunk.empty - } - } - } - .runDrain - .tapError(e => ZIO.logError(s"Error: $e") *> consumerCreated.fail(e)) <* ZIO.logDebug("Done") - } yield tConsumer) - .provideSome[Kafka & Scope]( - transactionalConsumer( - clientId, - consumerGroupId, - rebalanceSafeCommits = true, - properties = Map( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> - implicitly[ClassTag[T]].runtimeClass.getName, - ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" + for { + consumedMessagesCounter <- Ref.make(0) + _ <- consumedMessagesCounter.get + .flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed")) + .repeat(Schedule.fixed(1.second)) + .fork + streamCompleteOnRebalanceRef <- Ref.make[Option[Promise[Nothing, Unit]]](None) + tConsumer <- + Consumer + .partitionedAssignmentStream(Subscription.topics(fromTopic), Serde.string, Serde.string) + .mapZIO { assignedPartitions => + for { + p <- Promise.make[Nothing, Unit] + _ <- streamCompleteOnRebalanceRef.set(Some(p)) + _ <- ZIO.logDebug(s"${assignedPartitions.size} partitions assigned") + _ <- consumerCreated.succeed(()) + partitionStreams = assignedPartitions.map(_._2) + s <- ZStream + .mergeAllUnbounded(64)(partitionStreams: _*) + .mapChunksZIO { records => + ZIO.scoped { + for { + t <- tProducer.createTransaction + _ <- t.produceChunkBatch( + records.map(r => new ProducerRecord(toTopic, r.key, r.value)), + Serde.string, + Serde.string, + OffsetBatch(records.map(_.offset)) + ) + _ <- consumedMessagesCounter.update(_ + records.size) + } yield Chunk.empty + }.uninterruptible + } + .runDrain + .ensuring { + for { + _ <- streamCompleteOnRebalanceRef.set(None) + _ <- p.succeed(()) + c <- consumedMessagesCounter.get + _ <- ZIO.logDebug(s"Consumed $c messages") + } yield () + } + } yield s + } + .runDrain + .provideSome[Kafka]( + transactionalConsumer( + clientId, + consumerGroupId, + restartStreamOnRebalancing = true, + properties = Map( + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> + implicitly[ClassTag[T]].runtimeClass.getName, + ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" + ), + rebalanceListener = transactionalRebalanceListener(streamCompleteOnRebalanceRef) ) ) - ) - } + .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") + } yield tConsumer } for { + transactionalId <- randomThing("transactional") + tProducerSettings <- transactionalProducerSettings(transactionalId) + tProducer <- TransactionalProducer.make(tProducerSettings) + topicA <- randomTopic topicB <- randomTopic _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicA, partitions = partitionCount)) @@ -1306,26 +1335,28 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- ZIO.logDebug("Starting copier 1") copier1ClientId = copyingGroup + "-1" - copier1Created <- Promise.make[Throwable, Unit] + copier1Created <- Promise.make[Nothing, Unit] copier1 <- makeCopyingTransactionalConsumer( "1", copyingGroup, copier1ClientId, topicA, topicB, + tProducer, copier1Created ).fork _ <- copier1Created.await _ <- ZIO.logDebug("Starting copier 2") copier2ClientId = copyingGroup + "-2" - copier2Created <- Promise.make[Throwable, Unit] + copier2Created <- Promise.make[Nothing, Unit] copier2 <- makeCopyingTransactionalConsumer( "2", copyingGroup, copier2ClientId, topicA, topicB, + tProducer, copier2Created ).fork _ <- ZIO.logDebug("Waiting for copier 2 to start") diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala index 6cab115c4..82471dc80 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala @@ -206,8 +206,8 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { } abstract class MockCommitter extends Committer { - override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit - override val markCommittedInTransaction: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit + override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit + override val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit override def processQueuedCommits(consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean): Task[Unit] = ZIO.unit diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index aa2a926bb..9d6ad7a8a 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -53,11 +53,11 @@ object KafkaTestUtils { * Note: to run multiple tests in parallel, you need to use different transactional ids via * `transactionalProducer(transactionalId)`. */ - val transactionalProducer: ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] = + val transactionalProducer: ZLayer[Kafka, Throwable, TransactionalProducer] = transactionalProducer("test-transaction") - def transactionalProducer(transactionalId: String): ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] = - ZLayer.makeSome[Kafka with Consumer, TransactionalProducer]( + def transactionalProducer(transactionalId: String): ZLayer[Kafka, Throwable, TransactionalProducer] = + ZLayer.makeSome[Kafka, TransactionalProducer]( ZLayer(transactionalProducerSettings(transactionalId)), TransactionalProducer.live ) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 6ebca93db..3f24a6d48 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -160,9 +160,14 @@ trait Consumer { def metrics: Task[Map[MetricName, Metric]] /** - * Used internally by the [[zio.kafka.producer.TransactionalProducer]] + * Register a commit that was done externally, that is, not by this consumer. + * + * This method is useful when you want to use rebalance-safe-commits, but you are not committing to the Kafka brokers, + * but to some external system, for example a relational database. + * + * See also [[zio.kafka.consumer.ConsumerSettings.withRebalanceSafeCommits]]. */ - def registerOffsetsCommittedInTransaction(offsetBatch: OffsetBatch): Task[Unit] + def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit] } object Consumer { @@ -609,9 +614,7 @@ private[consumer] final class ConsumerLive private[consumer] ( override def metrics: Task[Map[MetricName, Metric]] = consumer.withConsumer(_.metrics().asScala.toMap) - override def registerOffsetsCommittedInTransaction( - offsetBatch: OffsetBatch - ): Task[Unit] = - runloopAccess.withRunloopZIO(true)(runloop => runloop.registerOffsetsCommittedInTransaction(offsetBatch)) + override def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] = + runloopAccess.registerExternalCommits(externallyCommittedOffsets) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 4b7b017b9..5db7221cd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -231,6 +231,9 @@ final case class ConsumerSettings( * Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this * calculates to 3 minutes. See [[#withMaxRebalanceDuration]] to change the default. * + * External commits (that is, commits to an external system, e.g. a relational database) must be registered to the + * consumer with [[Consumer.registerExternalCommits]]. + * * When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any * offset commits from these streams have a high chance of being delayed (commits are not possible during some phases * of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala index d4efa4455..2a7145dbe 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala @@ -11,8 +11,12 @@ import java.lang.Math.max import scala.collection.mutable private[internal] trait Committer { + + /** A function to commit offsets. */ val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] - val markCommittedInTransaction: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] + + /** A function to register offsets that have been committed externally. */ + val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] /** * Takes commits from the queue, commits them and adds them to pending commits diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala index 21d550bcf..557b6f2eb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala @@ -23,11 +23,14 @@ private[consumer] final class LiveCommitter( pendingCommits: Ref.Synchronized[Chunk[Commit]] ) extends Committer { - override val markCommittedInTransaction: Map[ + override val registerExternalCommits: Map[ TopicPartition, OffsetAndMetadata ] => Task[Unit] = offsets => - committedOffsetsRef.modify(_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))).unit + committedOffsetsRef.modify { + // The continuation promise can be `null` because this commit is not actually handled by the consumer. + _.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null))) + }.unit /** This is the implementation behind the user facing api `Offset.commit`. */ override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 0355f25e6..843087c5c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -533,8 +533,8 @@ private[consumer] final class Runloop private ( .unit } - def registerOffsetsCommittedInTransaction(offsetBatch: OffsetBatch): Task[Unit] = - committer.markCommittedInTransaction(offsetBatch.offsets) + def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit] = + committer.registerExternalCommits(offsetBatch.offsets) } object Runloop { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 5a0f2980e..6b9941b30 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -6,7 +6,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, OffsetBatch, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } import zio._ @@ -31,7 +31,7 @@ private[consumer] final class RunloopAccess private ( diagnostics: Diagnostics ) { - def withRunloopZIO[E]( + private def withRunloopZIO[E]( requireRunning: Boolean )(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] = runloopStateRef.updateSomeAndGetZIO { @@ -66,6 +66,9 @@ private[consumer] final class RunloopAccess private ( } } yield stream + def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] = + withRunloopZIO(requireRunning = true)(_.registerExternalCommits(externallyCommittedOffsets)) + } private[consumer] object RunloopAccess { diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 431caf103..173f43100 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -7,7 +7,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException import org.apache.kafka.common.serialization.ByteArraySerializer import zio.Cause.Fail import zio._ -import zio.kafka.consumer.{ Consumer, OffsetBatch } +import zio.kafka.consumer.OffsetBatch import java.util import scala.jdk.CollectionConverters._ @@ -22,12 +22,11 @@ object TransactionalProducer { private final class LiveTransactionalProducer( live: ProducerLive, - semaphore: Semaphore, - consumer: Consumer + semaphore: Semaphore ) extends TransactionalProducer { private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction()) - private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): ZIO[Any, Throwable, Unit] = { + private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): Task[Unit] = { val sendOffsetsToTransaction: Task[Unit] = ZIO.suspend { @inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] = @@ -49,12 +48,10 @@ object TransactionalProducer { } } - sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> - ZIO.attemptBlocking(live.p.commitTransaction()) *> - consumer.registerOffsetsCommittedInTransaction(offsetBatch).unit + sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) } - private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): ZIO[Any, Nothing, Unit] = + private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] = exit match { case Exit.Success(_) => transaction.offsetBatchRef.get @@ -78,7 +75,7 @@ object TransactionalProducer { def createTransaction: ZIO[TransactionalProducer & Scope, Throwable, Transaction] = ZIO.service[TransactionalProducer].flatMap(_.createTransaction) - val live: RLayer[TransactionalProducerSettings with Consumer, TransactionalProducer] = + val live: RLayer[TransactionalProducerSettings, TransactionalProducer] = ZLayer.scoped { for { settings <- ZIO.service[TransactionalProducerSettings] @@ -86,7 +83,7 @@ object TransactionalProducer { } yield producer } - def make(settings: TransactionalProducerSettings): ZIO[Scope with Consumer, Throwable, TransactionalProducer] = + def make(settings: TransactionalProducerSettings): ZIO[Scope, Throwable, TransactionalProducer] = for { rawProducer <- ZIO.acquireRelease( ZIO.attempt( @@ -105,7 +102,6 @@ object TransactionalProducer { settings.producerSettings.sendBufferSize ) live = new ProducerLive(rawProducer, runtime, sendQueue) - _ <- ZIO.blocking(live.sendFromQueue).forkScoped - consumer <- ZIO.service[Consumer] - } yield new LiveTransactionalProducer(live, semaphore, consumer) + _ <- ZIO.blocking(live.sendFromQueue).forkScoped + } yield new LiveTransactionalProducer(live, semaphore) } From afc01ee4bce1bb99ce92625eace03709b9f713cf Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 11 Jan 2025 22:02:00 +0100 Subject: [PATCH 09/12] Add a test --- zio-kafka-test/src/test/resources/logback.xml | 1 + .../zio/kafka/consumer/ConsumerSpec.scala | 112 ++++++++++++++++++ .../zio/kafka/consumer/OffsetBatch.scala | 2 + 3 files changed, 115 insertions(+) diff --git a/zio-kafka-test/src/test/resources/logback.xml b/zio-kafka-test/src/test/resources/logback.xml index 59861dd6b..a3b439724 100644 --- a/zio-kafka-test/src/test/resources/logback.xml +++ b/zio-kafka-test/src/test/resources/logback.xml @@ -16,6 +16,7 @@ + diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ce7cbae65..6a04e81b1 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -28,6 +28,8 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.concurrent.ExecutionException import scala.reflect.ClassTag @@ -888,6 +890,116 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { testForPartitionAssignmentStrategy[CooperativeStickyAssignor] ) }: _*), + test("external commits are used when rebalanceSafeCommits is enabled") { + + /* + * Outline of this test + * - A producer generates some messages on every partition of a topic (2 partitions), + * - Consumer 1 starts reading from the topic. It is the only consumer so it handles all partitions. This + * consumer has `rebalanceSafeCommits` enabled. It does not commit offsets, but it does register external + * commits (it does not actually commit anywhere). In addition we set `maxRebalanceDuration` to 20 seconds. + * - After a few messages consumer 2 is started and a rebalance starts. + * - We measure how long the rebalance takes. + * + * When the rebalance finishes immediately, we know that the external commits were used. If it finishes in 20 + * seconds, we know that the external commits were not used. + */ + val partitionCount = 2 + + def makeConsumer( + clientId: String, + groupId: String, + rebalanceSafeCommits: Boolean, + diagnostics: Diagnostics + ): ZIO[Scope with Kafka, Throwable, Consumer] = + for { + settings <- consumerSettings( + clientId = clientId, + groupId = Some(groupId), + `max.poll.records` = 1, + rebalanceSafeCommits = rebalanceSafeCommits, + maxRebalanceDuration = 20.seconds, + commitTimeout = 1.second + ) + consumer <- Consumer.make(settings, diagnostics) + } yield consumer + + for { + topic <- randomTopic + subscription = Subscription.topics(topic) + clientId1 <- randomClient + clientId2 <- randomClient + groupId <- randomGroup + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = partitionCount)) + // Produce one message to all partitions, every 500 ms + _ <- ZStream + .fromSchedule(Schedule.fixed(500.millis)) + .mapZIO { i => + ZIO.foreachDiscard(0 until partitionCount) { p => + produceMany(topic, p, Seq((s"key-$p-$i", s"msg-$p-$i"))) + } + } + .runDrain + .fork + _ <- ZIO.logDebug("Starting consumer 1") + rebalanceEndTimePromise <- Promise.make[Nothing, Instant] + c1Diagnostics = new Diagnostics { + override def emit(event: => DiagnosticEvent): UIO[Unit] = event match { + case r: DiagnosticEvent.Rebalance if r.assigned.size == 1 => + Clock.instant.flatMap(rebalanceEndTimePromise.succeed).unit + case _ => ZIO.unit + } + } + c1 <- makeConsumer(clientId1, groupId, rebalanceSafeCommits = true, c1Diagnostics) + c1Started <- Promise.make[Nothing, Unit] + c1Offsets <- Ref.make(Chunk.empty[Offset]) + _ <- + ZIO + .logAnnotate("consumer", "1") { + // When the stream ends, the topic subscription ends as well. Because of that the consumer + // shuts down and commits are no longer possible. Therefore, we signal the second consumer in + // such a way that it doesn't close the stream. + c1 + .plainStream(subscription, Serde.string, Serde.string) + .tap { record => + for { + _ <- + ZIO.logDebug( + s"Received record with offset ${record.partition}:${record.offset.offset} and key ${record.key}" + ) + // Signal that consumer 2 can start when a record was seen for every partition. + offsets <- c1Offsets.updateAndGet(_ :+ record.offset) + _ <- c1Started.succeed(()).when(offsets.map(_.partition).toSet.size == partitionCount) + // Register an external commit (which we're not actually doing 😀) + _ <- c1.registerExternalCommits(OffsetBatch(record.offset)).unit + } yield () + } + .runDrain + } + .fork + _ <- c1Started.await + _ <- ZIO.logDebug("Starting consumer 2") + c2 <- makeConsumer(clientId2, groupId, rebalanceSafeCommits = false, Diagnostics.NoOp) + rebalanceStartTime <- Clock.instant + _ <- ZIO + .logAnnotate("consumer", "2") { + c2 + .plainStream(subscription, Serde.string, Serde.string) + .tap(msg => ZIO.logDebug(s"Received ${msg.key}")) + .runDrain + } + .fork + _ <- ZIO.logDebug("Waiting for rebalance to end") + rebalanceEndTime <- rebalanceEndTimePromise.await + _ <- c1.stopConsumption *> c2.stopConsumption + rebalanceDuration = Duration + .fromInterval(rebalanceStartTime, rebalanceEndTime) + .truncatedTo(ChronoUnit.SECONDS) + .getSeconds + .toInt + _ <- ZIO.logError(s"Rebalance took $rebalanceDuration seconds") + } yield assertTrue(rebalanceDuration <= 2) + }, test("partitions for topic doesn't fail if doesn't exist") { for { topic <- randomTopic diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala index 6c4b5a977..7ec264f0b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala @@ -24,6 +24,8 @@ sealed trait OffsetBatch { object OffsetBatch { val empty: OffsetBatch = EmptyOffsetBatch + def apply(offset: Offset): OffsetBatch = empty.add(offset) + def apply(offsets: Iterable[Offset]): OffsetBatch = offsets.foldLeft(empty)(_ add _) } From 442c970c63bcac4ed3336c0f7a47305821819a70 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 12 Jan 2025 11:41:02 +0100 Subject: [PATCH 10/12] Fix logging in test --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 6a04e81b1..8dd44fbee 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -946,7 +946,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { c1Diagnostics = new Diagnostics { override def emit(event: => DiagnosticEvent): UIO[Unit] = event match { case r: DiagnosticEvent.Rebalance if r.assigned.size == 1 => - Clock.instant.flatMap(rebalanceEndTimePromise.succeed).unit + ZIO.logDebug(s"Rebalance finished: $r") *> + Clock.instant.flatMap(rebalanceEndTimePromise.succeed).unit + case r: DiagnosticEvent.Rebalance => + ZIO.logDebug(s"Rebalance finished: $r") case _ => ZIO.unit } } @@ -997,7 +1000,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .truncatedTo(ChronoUnit.SECONDS) .getSeconds .toInt - _ <- ZIO.logError(s"Rebalance took $rebalanceDuration seconds") + _ <- ZIO.logDebug(s"Rebalance took $rebalanceDuration seconds") } yield assertTrue(rebalanceDuration <= 2) }, test("partitions for topic doesn't fail if doesn't exist") { From ef2d73d9bb9f56031f51cb9210b85e10b06a7f16 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 12 Jan 2025 13:46:59 +0100 Subject: [PATCH 11/12] Add documentation --- docs/preventing-duplicates.md | 37 +++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/docs/preventing-duplicates.md b/docs/preventing-duplicates.md index 4b350e974..5b700ec36 100644 --- a/docs/preventing-duplicates.md +++ b/docs/preventing-duplicates.md @@ -26,6 +26,39 @@ a partition. For this to work correctly, your program must process a chunk of records within max-rebalance-duration. The clock starts the moment the chunk is pushed into the stream and ends when the commits for these records complete. -For more information see the scaladocs in `ConsumerSettings`, read the description of -[pull request #1098](https://github.com/zio/zio-kafka/pull/1098) that introduced this feature, or watch the presentation +In addition, your program must commit the offsets of consumed records. The most straightforward way is to commit to the +Kafka brokers. This is done by calling `.commit` on the offset of consumed records (see the consumer documentation). +However, there are more options: external commits and transactional producing. + +### Commit to an external system + +When you commit to an external system (e.g. by writing to a relational database) the zio-kafka consumer needs to know +about those commits before it can work in rebalance-safe-commits mode. Inform zio-kafka about external commits by +invoking method `Consumer.registerExternalCommits(offsetBatch: OffsetBatch)` (available since zio-kafka 2.9.2). + +Here is what this could look like: + +```scala +import zio.kafka.consumer._ + +consumer.plainStream(Subscription.topics("topic2000"), Serde.string, Serde.string) + .mapZIO { record => + database.store(record.offset) *> // <-- the external commit + consumer.registerExternalCommits(OffsetBatch(record.offset)) + } + .runDrain +``` + +### Commit with a transactional producer + +Although transactional producing is possible with zio-kafka, it is not easy and the code is very messy (see +`ConsumerSpec` for an example). Transactional producing can not be used in combination with rebalance-safe-commits mode. + +Zio-kafka v3.0.0 will make transactional producing much easier. + +## More information + +There is more information in the scaladocs of `ConsumerSettings` and the description of +[pull request #1098](https://github.com/zio/zio-kafka/pull/1098) that introduced this feature. +You can also watch the presentation [Making ZIO-Kafka Safer And Faster](https://www.youtube.com/watch?v=MJoRwEyyVxM). The relevant part starts at 10:24. From e60eca1c651ccf599ae53e4377d8d68db19d6df8 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 13 Jan 2025 08:39:03 +0100 Subject: [PATCH 12/12] fix version in docs --- docs/preventing-duplicates.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/preventing-duplicates.md b/docs/preventing-duplicates.md index 5b700ec36..7c87f1f62 100644 --- a/docs/preventing-duplicates.md +++ b/docs/preventing-duplicates.md @@ -34,7 +34,7 @@ However, there are more options: external commits and transactional producing. When you commit to an external system (e.g. by writing to a relational database) the zio-kafka consumer needs to know about those commits before it can work in rebalance-safe-commits mode. Inform zio-kafka about external commits by -invoking method `Consumer.registerExternalCommits(offsetBatch: OffsetBatch)` (available since zio-kafka 2.9.2). +invoking method `Consumer.registerExternalCommits(offsetBatch: OffsetBatch)` (available since zio-kafka 2.10.0). Here is what this could look like: