From 7b9f3c253d8b4ff9d9d13908db23af8efdce1078 Mon Sep 17 00:00:00 2001 From: svroonland Date: Wed, 3 Apr 2024 07:44:21 +0200 Subject: [PATCH] Do not run user rebalance listener on same thread runtime (#1205) --- .../zio/kafka/consumer/ConsumerSpec.scala | 6 +- .../kafka/consumer/RebalanceConsumer.scala | 21 ------ .../kafka/consumer/RebalanceListener.scala | 66 ++++++++++--------- .../zio/kafka/consumer/internal/Runloop.scala | 31 ++++++--- 4 files changed, 59 insertions(+), 65 deletions(-) delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceConsumer.scala diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 80c788dc1..4fae127a0 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1212,15 +1212,15 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { def transactionalRebalanceListener(streamCompleteOnRebalanceRef: Ref[Option[Promise[Nothing, Unit]]]) = RebalanceListener( - onAssigned = (_, _) => ZIO.unit, - onRevoked = (_, _) => + onAssigned = _ => ZIO.unit, + onRevoked = _ => streamCompleteOnRebalanceRef.get.flatMap { case Some(p) => ZIO.logDebug("onRevoked, awaiting stream completion") *> p.await.timeoutFail(new InterruptedException("Timed out waiting stream to complete"))(1.minute) case None => ZIO.unit }, - onLost = (_, _) => ZIO.logDebug("Lost some partitions") + onLost = _ => ZIO.logDebug("Lost some partitions") ) def makeCopyingTransactionalConsumer( diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceConsumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceConsumer.scala deleted file mode 100644 index fd0b90b0b..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceConsumer.scala +++ /dev/null @@ -1,21 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ Consumer => JConsumer, OffsetAndMetadata } -import org.apache.kafka.common.TopicPartition -import zio.{ Task, ZIO } - -import scala.jdk.CollectionConverters._ - -/** - * A subset of Consumer methods available during rebalances. - */ -trait RebalanceConsumer { - def commit(offsets: Map[TopicPartition, OffsetAndMetadata]): Task[Unit] -} - -object RebalanceConsumer { - final case class Live(consumer: JConsumer[Array[Byte], Array[Byte]]) extends RebalanceConsumer { - def commit(offsets: Map[TopicPartition, OffsetAndMetadata]): Task[Unit] = - ZIO.attemptBlocking(consumer.commitSync(offsets.asJava)) - } -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala index 3a00c0442..ae43d9b30 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala @@ -2,19 +2,17 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.ConsumerRebalanceListener import org.apache.kafka.common.TopicPartition -import zio.{ Runtime, Task, Unsafe, ZIO } +import zio.{ Executor, Runtime, Task, Unsafe, ZIO } + import scala.jdk.CollectionConverters._ /** * ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects. - * - * Note that the given ZIO effects are executed directly on the Kafka poll thread. Fork and shift to another executor - * when this is not desired. */ final case class RebalanceListener( - onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit], - onRevoked: (Set[TopicPartition], RebalanceConsumer) => Task[Unit], - onLost: (Set[TopicPartition], RebalanceConsumer) => Task[Unit] + onAssigned: Set[TopicPartition] => Task[Unit], + onRevoked: Set[TopicPartition] => Task[Unit], + onLost: Set[TopicPartition] => Task[Unit] ) { /** @@ -22,21 +20,42 @@ final case class RebalanceListener( */ def ++(that: RebalanceListener): RebalanceListener = RebalanceListener( - (assigned, consumer) => onAssigned(assigned, consumer) *> that.onAssigned(assigned, consumer), - (revoked, consumer) => onRevoked(revoked, consumer) *> that.onRevoked(revoked, consumer), - (lost, consumer) => onLost(lost, consumer) *> that.onLost(lost, consumer) + assigned => onAssigned(assigned) *> that.onAssigned(assigned), + revoked => onRevoked(revoked) *> that.onRevoked(revoked), + lost => onLost(lost) *> that.onLost(lost) ) - def toKafka( - runtime: Runtime[Any], - consumer: RebalanceConsumer + def runOnExecutor(executor: Executor): RebalanceListener = RebalanceListener( + assigned => onAssigned(assigned).onExecutor(executor), + revoked => onRevoked(revoked).onExecutor(executor), + lost => onLost(lost).onExecutor(executor) + ) + +} + +object RebalanceListener { + def apply( + onAssigned: Set[TopicPartition] => Task[Unit], + onRevoked: Set[TopicPartition] => Task[Unit] + ): RebalanceListener = + RebalanceListener(onAssigned, onRevoked, onRevoked) + + val noop: RebalanceListener = RebalanceListener( + _ => ZIO.unit, + _ => ZIO.unit, + _ => ZIO.unit + ) + + private[kafka] def toKafka( + rebalanceListener: RebalanceListener, + runtime: Runtime[Any] ): ConsumerRebalanceListener = new ConsumerRebalanceListener { override def onPartitionsRevoked( partitions: java.util.Collection[TopicPartition] ): Unit = Unsafe.unsafe { implicit u => runtime.unsafe - .run(onRevoked(partitions.asScala.toSet, consumer)) + .run(rebalanceListener.onRevoked(partitions.asScala.toSet)) .getOrThrowFiberFailure() () } @@ -45,7 +64,7 @@ final case class RebalanceListener( partitions: java.util.Collection[TopicPartition] ): Unit = Unsafe.unsafe { implicit u => runtime.unsafe - .run(onAssigned(partitions.asScala.toSet, consumer)) + .run(rebalanceListener.onAssigned(partitions.asScala.toSet)) .getOrThrowFiberFailure() () } @@ -54,24 +73,9 @@ final case class RebalanceListener( partitions: java.util.Collection[TopicPartition] ): Unit = Unsafe.unsafe { implicit u => runtime.unsafe - .run(onLost(partitions.asScala.toSet, consumer)) + .run(rebalanceListener.onLost(partitions.asScala.toSet)) .getOrThrowFiberFailure() () } } - -} - -object RebalanceListener { - def apply( - onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit], - onRevoked: (Set[TopicPartition], RebalanceConsumer) => Task[Unit] - ): RebalanceListener = - RebalanceListener(onAssigned, onRevoked, onRevoked) - - val noop: RebalanceListener = RebalanceListener( - (_, _) => ZIO.unit, - (_, _) => ZIO.unit, - (_, _) => ZIO.unit - ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index de049f62a..98a9f815f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -22,6 +22,7 @@ import scala.jdk.CollectionConverters._ //noinspection SimplifyWhenInspection,SimplifyUnlessInspection private[consumer] final class Runloop private ( settings: ConsumerSettings, + topLevelExecutor: Executor, sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, maxPollInterval: Duration, @@ -74,7 +75,7 @@ private[consumer] final class Runloop private ( private[internal] def removeSubscription(subscription: Subscription): UIO[Unit] = commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit - private val rebalanceListener: RebalanceListener = { + private def makeRebalanceListener: ConsumerRebalanceListener = { // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This // is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the // rebalance listener. @@ -92,7 +93,8 @@ private[consumer] final class Runloop private ( else { for { _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) - _ <- if (rebalanceSafeCommits) consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, state, streamsToEnd)) + _ <- if (rebalanceSafeCommits) + consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, state, streamsToEnd)) else ZIO.unit } yield () } @@ -199,7 +201,7 @@ private[consumer] final class Runloop private ( // - updates `lastRebalanceEvent` // val recordRebalanceRebalancingListener = RebalanceListener( - onAssigned = (assignedTps, _) => + onAssigned = assignedTps => for { rebalanceEvent <- lastRebalanceEvent.get _ <- ZIO.logDebug { @@ -213,7 +215,7 @@ private[consumer] final class Runloop private ( _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd)) _ <- ZIO.logTrace("onAssigned done") } yield (), - onRevoked = (revokedTps, _) => + onRevoked = revokedTps => for { rebalanceEvent <- lastRebalanceEvent.get _ <- ZIO.logDebug { @@ -227,7 +229,7 @@ private[consumer] final class Runloop private ( _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd)) _ <- ZIO.logTrace("onRevoked done") } yield (), - onLost = (lostTps, _) => + onLost = lostTps => for { _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") rebalanceEvent <- lastRebalanceEvent.get @@ -239,7 +241,14 @@ private[consumer] final class Runloop private ( } yield () ) - recordRebalanceRebalancingListener ++ settings.rebalanceListener + // Here we just want to avoid any executor shift if the user provided listener is the noop listener. + val userRebalanceListener = + settings.rebalanceListener match { + case RebalanceListener.noop => RebalanceListener.noop + case _ => settings.rebalanceListener.runOnExecutor(topLevelExecutor) + } + + RebalanceListener.toKafka(recordRebalanceRebalancingListener ++ userRebalanceListener, sameThreadRuntime) } /** This is the implementation behind the user facing api `Offset.commit`. */ @@ -671,14 +680,14 @@ private[consumer] final class Runloop private ( .attempt(c.unsubscribe()) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Pattern(pattern)) => - val rc = RebalanceConsumer.Live(c) + val rebalanceListener = makeRebalanceListener ZIO - .attempt(c.subscribe(pattern.pattern, rebalanceListener.toKafka(sameThreadRuntime, rc))) + .attempt(c.subscribe(pattern.pattern, rebalanceListener)) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Topics(topics)) => - val rc = RebalanceConsumer.Live(c) + val rebalanceListener = makeRebalanceListener ZIO - .attempt(c.subscribe(topics.asJava, rebalanceListener.toKafka(sameThreadRuntime, rc))) + .attempt(c.subscribe(topics.asJava, rebalanceListener)) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Manual(topicPartitions)) => // For manual subscriptions we have to do some manual work before starting the run loop @@ -846,8 +855,10 @@ object Runloop { currentStateRef <- Ref.make(initialState) committedOffsetsRef <- Ref.make(CommitOffsets.empty) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) + executor <- ZIO.executor runloop = new Runloop( settings = settings, + topLevelExecutor = executor, sameThreadRuntime = sameThreadRuntime, consumer = consumer, maxPollInterval = maxPollInterval,