From c22a55f4ba510cdc17a660f155e962eb714cc12d Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 11 Jan 2025 10:43:07 +0100 Subject: [PATCH] Pivot to allowing registration of external commits The changes to the transactional producer are removed and will come back in a later PR. Make `RunloopAccess.withRunloopZIO` private again. --- .../zio/kafka/consumer/ConsumerSpec.scala | 131 +++++++++++------- .../internal/RebalanceCoordinatorSpec.scala | 4 +- .../zio/kafka/testkit/KafkaTestUtils.scala | 6 +- .../scala/zio/kafka/consumer/Consumer.scala | 15 +- .../zio/kafka/consumer/ConsumerSettings.scala | 3 + .../kafka/consumer/internal/Committer.scala | 6 +- .../consumer/internal/LiveCommitter.scala | 7 +- .../zio/kafka/consumer/internal/Runloop.scala | 4 +- .../consumer/internal/RunloopAccess.scala | 7 +- .../producer/TransactionalProducer.scala | 22 ++- 10 files changed, 124 insertions(+), 81 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index d3f94deb0..ce7cbae65 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1234,67 +1234,96 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val allMessages = (1 to messageCount).map(i => s"$i" -> f"msg$i%06d") val (messagesBeforeRebalance, messagesAfterRebalance) = allMessages.splitAt(messageCount / 2) + def transactionalRebalanceListener(streamCompleteOnRebalanceRef: Ref[Option[Promise[Nothing, Unit]]]) = + RebalanceListener( + onAssigned = _ => ZIO.unit, + onRevoked = _ => + streamCompleteOnRebalanceRef.get.flatMap { + case Some(p) => + ZIO.logDebug("onRevoked, awaiting stream completion") *> + p.await.timeoutFail(new InterruptedException("Timed out waiting stream to complete"))(1.minute) + case None => ZIO.unit + }, + onLost = _ => ZIO.logDebug("Lost some partitions") + ) + def makeCopyingTransactionalConsumer( name: String, consumerGroupId: String, clientId: String, fromTopic: String, toTopic: String, - consumerCreated: Promise[Throwable, Unit] + tProducer: TransactionalProducer, + consumerCreated: Promise[Nothing, Unit] ): ZIO[Kafka, Throwable, Unit] = ZIO.logAnnotate("consumer", name) { - ZIO.scoped { - (for { - consumedMessagesCounter <- Ref.make(0) - _ <- consumedMessagesCounter.get - .flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed")) - .repeat(Schedule.fixed(1.second)) - .fork - - transactionalId <- randomThing("transactional") - tProducerSettings <- transactionalProducerSettings(transactionalId) - tProducer <- - TransactionalProducer.make(tProducerSettings) - - tConsumer <- - Consumer - .partitionedStream(Subscription.topics(fromTopic), Serde.string, Serde.string) - .flatMapPar(Int.MaxValue) { case (_, partitionStream) => - ZStream.fromZIO(consumerCreated.succeed(())) *> - partitionStream.mapChunksZIO { records => - ZIO.scoped { - for { - t <- tProducer.createTransaction - _ <- t.produceChunkBatch( - records.map(r => new ProducerRecord(toTopic, r.key, r.value)), - Serde.string, - Serde.string, - OffsetBatch(records.map(_.offset)) - ) - _ <- consumedMessagesCounter.update(_ + records.size) - } yield Chunk.empty - } - } - } - .runDrain - .tapError(e => ZIO.logError(s"Error: $e") *> consumerCreated.fail(e)) <* ZIO.logDebug("Done") - } yield tConsumer) - .provideSome[Kafka & Scope]( - transactionalConsumer( - clientId, - consumerGroupId, - rebalanceSafeCommits = true, - properties = Map( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> - implicitly[ClassTag[T]].runtimeClass.getName, - ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" + for { + consumedMessagesCounter <- Ref.make(0) + _ <- consumedMessagesCounter.get + .flatMap(consumed => ZIO.logDebug(s"Consumed so far: $consumed")) + .repeat(Schedule.fixed(1.second)) + .fork + streamCompleteOnRebalanceRef <- Ref.make[Option[Promise[Nothing, Unit]]](None) + tConsumer <- + Consumer + .partitionedAssignmentStream(Subscription.topics(fromTopic), Serde.string, Serde.string) + .mapZIO { assignedPartitions => + for { + p <- Promise.make[Nothing, Unit] + _ <- streamCompleteOnRebalanceRef.set(Some(p)) + _ <- ZIO.logDebug(s"${assignedPartitions.size} partitions assigned") + _ <- consumerCreated.succeed(()) + partitionStreams = assignedPartitions.map(_._2) + s <- ZStream + .mergeAllUnbounded(64)(partitionStreams: _*) + .mapChunksZIO { records => + ZIO.scoped { + for { + t <- tProducer.createTransaction + _ <- t.produceChunkBatch( + records.map(r => new ProducerRecord(toTopic, r.key, r.value)), + Serde.string, + Serde.string, + OffsetBatch(records.map(_.offset)) + ) + _ <- consumedMessagesCounter.update(_ + records.size) + } yield Chunk.empty + }.uninterruptible + } + .runDrain + .ensuring { + for { + _ <- streamCompleteOnRebalanceRef.set(None) + _ <- p.succeed(()) + c <- consumedMessagesCounter.get + _ <- ZIO.logDebug(s"Consumed $c messages") + } yield () + } + } yield s + } + .runDrain + .provideSome[Kafka]( + transactionalConsumer( + clientId, + consumerGroupId, + restartStreamOnRebalancing = true, + properties = Map( + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> + implicitly[ClassTag[T]].runtimeClass.getName, + ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200" + ), + rebalanceListener = transactionalRebalanceListener(streamCompleteOnRebalanceRef) ) ) - ) - } + .tapError(e => ZIO.logError(s"Error: $e")) <* ZIO.logDebug("Done") + } yield tConsumer } for { + transactionalId <- randomThing("transactional") + tProducerSettings <- transactionalProducerSettings(transactionalId) + tProducer <- TransactionalProducer.make(tProducerSettings) + topicA <- randomTopic topicB <- randomTopic _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topicA, partitions = partitionCount)) @@ -1306,26 +1335,28 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- ZIO.logDebug("Starting copier 1") copier1ClientId = copyingGroup + "-1" - copier1Created <- Promise.make[Throwable, Unit] + copier1Created <- Promise.make[Nothing, Unit] copier1 <- makeCopyingTransactionalConsumer( "1", copyingGroup, copier1ClientId, topicA, topicB, + tProducer, copier1Created ).fork _ <- copier1Created.await _ <- ZIO.logDebug("Starting copier 2") copier2ClientId = copyingGroup + "-2" - copier2Created <- Promise.make[Throwable, Unit] + copier2Created <- Promise.make[Nothing, Unit] copier2 <- makeCopyingTransactionalConsumer( "2", copyingGroup, copier2ClientId, topicA, topicB, + tProducer, copier2Created ).fork _ <- ZIO.logDebug("Waiting for copier 2 to start") diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala index 6cab115c4..82471dc80 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala @@ -206,8 +206,8 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { } abstract class MockCommitter extends Committer { - override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit - override val markCommittedInTransaction: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit + override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit + override val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit override def processQueuedCommits(consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean): Task[Unit] = ZIO.unit diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index aa2a926bb..9d6ad7a8a 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -53,11 +53,11 @@ object KafkaTestUtils { * Note: to run multiple tests in parallel, you need to use different transactional ids via * `transactionalProducer(transactionalId)`. */ - val transactionalProducer: ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] = + val transactionalProducer: ZLayer[Kafka, Throwable, TransactionalProducer] = transactionalProducer("test-transaction") - def transactionalProducer(transactionalId: String): ZLayer[Kafka with Consumer, Throwable, TransactionalProducer] = - ZLayer.makeSome[Kafka with Consumer, TransactionalProducer]( + def transactionalProducer(transactionalId: String): ZLayer[Kafka, Throwable, TransactionalProducer] = + ZLayer.makeSome[Kafka, TransactionalProducer]( ZLayer(transactionalProducerSettings(transactionalId)), TransactionalProducer.live ) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 6ebca93db..3f24a6d48 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -160,9 +160,14 @@ trait Consumer { def metrics: Task[Map[MetricName, Metric]] /** - * Used internally by the [[zio.kafka.producer.TransactionalProducer]] + * Register a commit that was done externally, that is, not by this consumer. + * + * This method is useful when you want to use rebalance-safe-commits, but you are not committing to the Kafka brokers, + * but to some external system, for example a relational database. + * + * See also [[zio.kafka.consumer.ConsumerSettings.withRebalanceSafeCommits]]. */ - def registerOffsetsCommittedInTransaction(offsetBatch: OffsetBatch): Task[Unit] + def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit] } object Consumer { @@ -609,9 +614,7 @@ private[consumer] final class ConsumerLive private[consumer] ( override def metrics: Task[Map[MetricName, Metric]] = consumer.withConsumer(_.metrics().asScala.toMap) - override def registerOffsetsCommittedInTransaction( - offsetBatch: OffsetBatch - ): Task[Unit] = - runloopAccess.withRunloopZIO(true)(runloop => runloop.registerOffsetsCommittedInTransaction(offsetBatch)) + override def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] = + runloopAccess.registerExternalCommits(externallyCommittedOffsets) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 20325fd02..194048adf 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -230,6 +230,9 @@ final case class ConsumerSettings( * Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this * calculates to 3 minutes. See [[#withMaxRebalanceDuration]] to change the default. * + * External commits (that is, commits to an external system, e.g. a relational database) must be registered to the + * consumer with [[Consumer.registerExternalCommits]]. + * * When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any * offset commits from these streams have a high chance of being delayed (commits are not possible during some phases * of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala index d4efa4455..2a7145dbe 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala @@ -11,8 +11,12 @@ import java.lang.Math.max import scala.collection.mutable private[internal] trait Committer { + + /** A function to commit offsets. */ val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] - val markCommittedInTransaction: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] + + /** A function to register offsets that have been committed externally. */ + val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] /** * Takes commits from the queue, commits them and adds them to pending commits diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala index 21d550bcf..557b6f2eb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala @@ -23,11 +23,14 @@ private[consumer] final class LiveCommitter( pendingCommits: Ref.Synchronized[Chunk[Commit]] ) extends Committer { - override val markCommittedInTransaction: Map[ + override val registerExternalCommits: Map[ TopicPartition, OffsetAndMetadata ] => Task[Unit] = offsets => - committedOffsetsRef.modify(_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))).unit + committedOffsetsRef.modify { + // The continuation promise can be `null` because this commit is not actually handled by the consumer. + _.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null))) + }.unit /** This is the implementation behind the user facing api `Offset.commit`. */ override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 0355f25e6..843087c5c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -533,8 +533,8 @@ private[consumer] final class Runloop private ( .unit } - def registerOffsetsCommittedInTransaction(offsetBatch: OffsetBatch): Task[Unit] = - committer.markCommittedInTransaction(offsetBatch.offsets) + def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit] = + committer.registerExternalCommits(offsetBatch.offsets) } object Runloop { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 5a0f2980e..6b9941b30 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -6,7 +6,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, OffsetBatch, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } import zio._ @@ -31,7 +31,7 @@ private[consumer] final class RunloopAccess private ( diagnostics: Diagnostics ) { - def withRunloopZIO[E]( + private def withRunloopZIO[E]( requireRunning: Boolean )(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] = runloopStateRef.updateSomeAndGetZIO { @@ -66,6 +66,9 @@ private[consumer] final class RunloopAccess private ( } } yield stream + def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] = + withRunloopZIO(requireRunning = true)(_.registerExternalCommits(externallyCommittedOffsets)) + } private[consumer] object RunloopAccess { diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 431caf103..173f43100 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -7,7 +7,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException import org.apache.kafka.common.serialization.ByteArraySerializer import zio.Cause.Fail import zio._ -import zio.kafka.consumer.{ Consumer, OffsetBatch } +import zio.kafka.consumer.OffsetBatch import java.util import scala.jdk.CollectionConverters._ @@ -22,12 +22,11 @@ object TransactionalProducer { private final class LiveTransactionalProducer( live: ProducerLive, - semaphore: Semaphore, - consumer: Consumer + semaphore: Semaphore ) extends TransactionalProducer { private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction()) - private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): ZIO[Any, Throwable, Unit] = { + private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): Task[Unit] = { val sendOffsetsToTransaction: Task[Unit] = ZIO.suspend { @inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] = @@ -49,12 +48,10 @@ object TransactionalProducer { } } - sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> - ZIO.attemptBlocking(live.p.commitTransaction()) *> - consumer.registerOffsetsCommittedInTransaction(offsetBatch).unit + sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) } - private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): ZIO[Any, Nothing, Unit] = + private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] = exit match { case Exit.Success(_) => transaction.offsetBatchRef.get @@ -78,7 +75,7 @@ object TransactionalProducer { def createTransaction: ZIO[TransactionalProducer & Scope, Throwable, Transaction] = ZIO.service[TransactionalProducer].flatMap(_.createTransaction) - val live: RLayer[TransactionalProducerSettings with Consumer, TransactionalProducer] = + val live: RLayer[TransactionalProducerSettings, TransactionalProducer] = ZLayer.scoped { for { settings <- ZIO.service[TransactionalProducerSettings] @@ -86,7 +83,7 @@ object TransactionalProducer { } yield producer } - def make(settings: TransactionalProducerSettings): ZIO[Scope with Consumer, Throwable, TransactionalProducer] = + def make(settings: TransactionalProducerSettings): ZIO[Scope, Throwable, TransactionalProducer] = for { rawProducer <- ZIO.acquireRelease( ZIO.attempt( @@ -105,7 +102,6 @@ object TransactionalProducer { settings.producerSettings.sendBufferSize ) live = new ProducerLive(rawProducer, runtime, sendQueue) - _ <- ZIO.blocking(live.sendFromQueue).forkScoped - consumer <- ZIO.service[Consumer] - } yield new LiveTransactionalProducer(live, semaphore, consumer) + _ <- ZIO.blocking(live.sendFromQueue).forkScoped + } yield new LiveTransactionalProducer(live, semaphore) }