diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala new file mode 100644 index 000000000..94213604f --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala @@ -0,0 +1,127 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer.{ ConsumerRecord, MockConsumer, OffsetResetStrategy } +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } +import zio._ +import zio.kafka.consumer.{ ConsumerSettings, Subscription } +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.metrics.{ MetricState, Metrics } +import zio.stream.{ Take, ZStream } +import zio.test.TestAspect.withLiveClock +import zio.test._ + +import scala.jdk.CollectionConverters._ + +object RunloopSpec extends ZIOSpecDefault { + + private type BinaryMockConsumer = MockConsumer[Array[Byte], Array[Byte]] + private type PartitionsHub = Hub[Take[Throwable, PartitionAssignment]] + + private val tp10 = new TopicPartition("t1", 0) + private val key123 = "123".getBytes + + private val consumerSettings = ConsumerSettings(List("bootstrap")) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("RunloopSpec")( + test("runloop creates a new partition stream and polls for new records") { + withRunloop { (mockConsumer, partitionsHub, runloop) => + mockConsumer.schedulePollTask { () => + mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L)).asJava) + mockConsumer.rebalance(Seq(tp10).asJava) + mockConsumer.addRecord(makeConsumerRecord(tp10, key123)) + } + for { + streamStream <- ZStream.fromHubScoped(partitionsHub) + _ <- runloop.addSubscription(Subscription.Topics(Set(tp10.topic()))) + record <- streamStream + .map(_.exit) + .flattenExitOption + .flattenChunks + .take(1) + .mapZIO { case (_, stream) => + stream.runHead + .someOrFail(new AssertionError("Expected at least 1 record")) + } + .runHead + .someOrFail(new AssertionError("Expected at least 1 record from the streams")) + } yield assertTrue( + record.key sameElements key123 + ) + } + }, + test("runloop retries poll upon AuthorizationException and AuthenticationException") { + withRunloop { (mockConsumer, partitionsHub, runloop) => + mockConsumer.schedulePollTask { () => + mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L)).asJava) + mockConsumer.rebalance(Seq(tp10).asJava) + } + mockConsumer.schedulePollTask { () => + mockConsumer.setPollException(new AuthorizationException("~~test~~")) + } + mockConsumer.schedulePollTask { () => + mockConsumer.setPollException(new AuthenticationException("~~test~~")) + } + mockConsumer.schedulePollTask { () => + mockConsumer.addRecord(makeConsumerRecord(tp10, key123)) + } + for { + streamStream <- ZStream.fromHubScoped(partitionsHub) + _ <- runloop.addSubscription(Subscription.Topics(Set(tp10.topic()))) + record <- streamStream + .map(_.exit) + .flattenExitOption + .flattenChunks + .take(1) + .mapZIO { case (_, stream) => + stream.runHead + .someOrFail(new AssertionError("Expected at least 1 record")) + } + .runHead + .someOrFail(new AssertionError("Expected at least 1 record from the streams")) + authErrorCount <- ZIO.metrics.map(counterValue("ziokafka_consumer_poll_auth_errors")) + } yield assertTrue( + record.key sameElements key123, + authErrorCount.contains(2d) + ) + } + } + ) @@ withLiveClock + + private def withRunloop( + f: (BinaryMockConsumer, PartitionsHub, Runloop) => ZIO[Scope, Throwable, TestResult] + ): ZIO[Scope, Throwable, TestResult] = + ZIO.scoped { + val mockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) + for { + consumerAccess <- ConsumerAccess.make(mockConsumer) + consumerScope <- ZIO.scope + partitionsHub <- ZIO + .acquireRelease(Hub.unbounded[Take[Throwable, PartitionAssignment]])(_.shutdown) + .provide(ZLayer.succeed(consumerScope)) + runloop <- Runloop.make( + consumerSettings, + 100.millis, + 100.millis, + Diagnostics.NoOp, + consumerAccess, + partitionsHub + ) + result <- f(mockConsumer, partitionsHub, runloop) + } yield result + } + + private def makeConsumerRecord(tp: TopicPartition, key: Array[Byte]): ConsumerRecord[Array[Byte], Array[Byte]] = + new ConsumerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), 0L, key, "value".getBytes) + + private def counterValue(counterName: String)(metrics: Metrics): Option[Double] = + metrics.metrics + .find(_.metricKey.name == counterName) + .map(_.metricState) + .flatMap { + case MetricState.Counter(count) => Some(count) + case _ => Option.empty[Double] + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 700fb6602..934454132 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -17,9 +17,6 @@ import zio.metrics.MetricLabel * .withProperties(properties) * .... etc. * }}} - * - * @param bootstrapServers - * the Kafka bootstrap servers */ final case class ConsumerSettings( properties: Map[String, AnyRef] = Map.empty, @@ -33,7 +30,8 @@ final case class ConsumerSettings( maxRebalanceDuration: Option[Duration] = None, fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(), metricLabels: Set[MetricLabel] = Set.empty, - runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis) + runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis), + authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis) ) { /** @@ -300,6 +298,22 @@ final case class ConsumerSettings( def withRunloopMetricsSchedule(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ConsumerSettings = copy(runloopMetricsSchedule = runloopMetricsSchedule) + /** + * @param authErrorRetrySchedule + * The schedule at which the consumer will retry polling the broker for more records, even though a poll fails with + * an [[org.apache.kafka.common.errors.AuthorizationException]] or + * [[org.apache.kafka.common.errors.AuthenticationException]]. + * + * This setting helps with failed polls due to too slow authorization or authentication in the broker. You may also + * consider increasing `pollTimeout` to reduce auth-work on the broker. + * + * Set to `Schedule.stop` to fail the consumer on the first auth error. + * + * The default is {{{Schedule.recurs(5) && Schedule.spaced(500.millis)}}} which is, to retry 5 times, spaced by 500ms. + */ + def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, Throwable, Any]): ConsumerSettings = + copy(authErrorRetrySchedule = authErrorRetrySchedule) + } object ConsumerSettings { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala index 44aede826..ffe5844df 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala @@ -17,6 +17,7 @@ private[internal] trait ConsumerMetrics { def observeAggregatedCommit(latency: Duration, commitSize: Long): UIO[Unit] def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit] def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] + def observePollAuthError(): UIO[Unit] } /** @@ -342,4 +343,20 @@ private[internal] class ZioConsumerMetrics(metricLabels: Set[MetricLabel]) exten _ <- commitQueueSizeHistogram.update(commitQueueSize) } yield () + // ----------------------------------------------------- + // + // Poll auth error metrics + // + + private val pollAuthErrorCounter: Metric.Counter[Int] = + Metric + .counterInt( + "ziokafka_consumer_poll_auth_errors", + "The number of polls that ended with an authentication or authorization error." + ) + .tagged(metricLabels) + + def observePollAuthError(): UIO[Unit] = + pollAuthErrorCounter.increment + } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 98a9f815f..bec04862a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -2,7 +2,7 @@ package zio.kafka.consumer.internal import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.RebalanceInProgressException +import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException, RebalanceInProgressException } import zio._ import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval } import zio.kafka.consumer._ @@ -450,6 +450,15 @@ private[consumer] final class Runloop private ( if (recordsOrNull eq null) ConsumerRecords.empty[Array[Byte], Array[Byte]]() else recordsOrNull } + // Recover from spurious auth failures: + .retry( + Schedule.recurWhileZIO[Any, Throwable] { + case _: AuthorizationException | _: AuthenticationException => + consumerMetrics.observePollAuthError().as(true) + case _ => ZIO.succeed(false) + } && + settings.authErrorRetrySchedule + ) private def handlePoll(state: State): Task[State] = { for { @@ -924,7 +933,7 @@ object Runloop { val offset = offsetAndMeta.offset() val maxOffset = updatedOffsets.get(tp) match { case Some(existingOffset) => - offsetIncrease += max(0L, (offset - existingOffset)) + offsetIncrease += max(0L, offset - existingOffset) max(existingOffset, offset) case None => // This partition was not committed to from this consumer yet. Therefore we do not know the offset