Skip to content


Allow for some auth exceptions during poll (#1270)
Browse files Browse the repository at this point in the history
Some brokers are sometimes too slow to authorize or authenticate a poll.
This results in spurious exceptions. With this change we continue
polling unless the error happens too often.

We test the change with a new type of unit test for Runloop.
  • Loading branch information
erikvanoosten authored Jul 10, 2024
1 parent a8596f3 commit c717359
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.{ ConsumerRecord, MockConsumer, OffsetResetStrategy }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException }
import zio._
import zio.kafka.consumer.{ ConsumerSettings, Subscription }
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.metrics.{ MetricState, Metrics }
import{ Take, ZStream }
import zio.test.TestAspect.withLiveClock
import zio.test._

import scala.jdk.CollectionConverters._

object RunloopSpec extends ZIOSpecDefault {

private type BinaryMockConsumer = MockConsumer[Array[Byte], Array[Byte]]
private type PartitionsHub = Hub[Take[Throwable, PartitionAssignment]]

private val tp10 = new TopicPartition("t1", 0)
private val key123 = "123".getBytes

private val consumerSettings = ConsumerSettings(List("bootstrap"))

override def spec: Spec[TestEnvironment with Scope, Any] =
test("runloop creates a new partition stream and polls for new records") {
withRunloop { (mockConsumer, partitionsHub, runloop) =>
mockConsumer.schedulePollTask { () =>
mockConsumer.updateEndOffsets(Map(tp10 ->
mockConsumer.addRecord(makeConsumerRecord(tp10, key123))
for {
streamStream <- ZStream.fromHubScoped(partitionsHub)
_ <- runloop.addSubscription(Subscription.Topics(Set(tp10.topic())))
record <- streamStream
.mapZIO { case (_, stream) =>
.someOrFail(new AssertionError("Expected at least 1 record"))
.someOrFail(new AssertionError("Expected at least 1 record from the streams"))
} yield assertTrue(
record.key sameElements key123
test("runloop retries poll upon AuthorizationException and AuthenticationException") {
withRunloop { (mockConsumer, partitionsHub, runloop) =>
mockConsumer.schedulePollTask { () =>
mockConsumer.updateEndOffsets(Map(tp10 ->
mockConsumer.schedulePollTask { () =>
mockConsumer.setPollException(new AuthorizationException("~~test~~"))
mockConsumer.schedulePollTask { () =>
mockConsumer.setPollException(new AuthenticationException("~~test~~"))
mockConsumer.schedulePollTask { () =>
mockConsumer.addRecord(makeConsumerRecord(tp10, key123))
for {
streamStream <- ZStream.fromHubScoped(partitionsHub)
_ <- runloop.addSubscription(Subscription.Topics(Set(tp10.topic())))
record <- streamStream
.mapZIO { case (_, stream) =>
.someOrFail(new AssertionError("Expected at least 1 record"))
.someOrFail(new AssertionError("Expected at least 1 record from the streams"))
authErrorCount <-"ziokafka_consumer_poll_auth_errors"))
} yield assertTrue(
record.key sameElements key123,
) @@ withLiveClock

private def withRunloop(
f: (BinaryMockConsumer, PartitionsHub, Runloop) => ZIO[Scope, Throwable, TestResult]
): ZIO[Scope, Throwable, TestResult] =
ZIO.scoped {
val mockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST)
for {
consumerAccess <- ConsumerAccess.make(mockConsumer)
consumerScope <- ZIO.scope
partitionsHub <- ZIO
.acquireRelease(Hub.unbounded[Take[Throwable, PartitionAssignment]])(_.shutdown)
runloop <- Runloop.make(
result <- f(mockConsumer, partitionsHub, runloop)
} yield result

private def makeConsumerRecord(tp: TopicPartition, key: Array[Byte]): ConsumerRecord[Array[Byte], Array[Byte]] =
new ConsumerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), 0L, key, "value".getBytes)

private def counterValue(counterName: String)(metrics: Metrics): Option[Double] =
.find( == counterName)
.flatMap {
case MetricState.Counter(count) => Some(count)
case _ => Option.empty[Double]
22 changes: 18 additions & 4 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import zio.metrics.MetricLabel
* .withProperties(properties)
* .... etc.
* }}}
* @param bootstrapServers
* the Kafka bootstrap servers
final case class ConsumerSettings(
properties: Map[String, AnyRef] = Map.empty,
Expand All @@ -33,7 +30,8 @@ final case class ConsumerSettings(
maxRebalanceDuration: Option[Duration] = None,
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(),
metricLabels: Set[MetricLabel] = Set.empty,
runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis)
runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis),
authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis)
) {

Expand Down Expand Up @@ -300,6 +298,22 @@ final case class ConsumerSettings(
def withRunloopMetricsSchedule(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ConsumerSettings =
copy(runloopMetricsSchedule = runloopMetricsSchedule)

* @param authErrorRetrySchedule
* The schedule at which the consumer will retry polling the broker for more records, even though a poll fails with
* an [[org.apache.kafka.common.errors.AuthorizationException]] or
* [[org.apache.kafka.common.errors.AuthenticationException]].
* This setting helps with failed polls due to too slow authorization or authentication in the broker. You may also
* consider increasing `pollTimeout` to reduce auth-work on the broker.
* Set to `Schedule.stop` to fail the consumer on the first auth error.
* The default is {{{Schedule.recurs(5) && Schedule.spaced(500.millis)}}} which is, to retry 5 times, spaced by 500ms.
def withAuthErrorRetrySchedule(authErrorRetrySchedule: Schedule[Any, Throwable, Any]): ConsumerSettings =
copy(authErrorRetrySchedule = authErrorRetrySchedule)


object ConsumerSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ private[internal] trait ConsumerMetrics {
def observeAggregatedCommit(latency: Duration, commitSize: Long): UIO[Unit]
def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit]
def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit]
def observePollAuthError(): UIO[Unit]

Expand Down Expand Up @@ -342,4 +343,20 @@ private[internal] class ZioConsumerMetrics(metricLabels: Set[MetricLabel]) exten
_ <- commitQueueSizeHistogram.update(commitQueueSize)
} yield ()

// -----------------------------------------------------
// Poll auth error metrics

private val pollAuthErrorCounter: Metric.Counter[Int] =
"The number of polls that ended with an authentication or authorization error."

def observePollAuthError(): UIO[Unit] =

13 changes: 11 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException, RebalanceInProgressException }
import zio._
import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval }
import zio.kafka.consumer._
Expand Down Expand Up @@ -450,6 +450,15 @@ private[consumer] final class Runloop private (
if (recordsOrNull eq null) ConsumerRecords.empty[Array[Byte], Array[Byte]]()
else recordsOrNull
// Recover from spurious auth failures:
Schedule.recurWhileZIO[Any, Throwable] {
case _: AuthorizationException | _: AuthenticationException =>
case _ => ZIO.succeed(false)
} &&

private def handlePoll(state: State): Task[State] = {
for {
Expand Down Expand Up @@ -924,7 +933,7 @@ object Runloop {
val offset = offsetAndMeta.offset()
val maxOffset = updatedOffsets.get(tp) match {
case Some(existingOffset) =>
offsetIncrease += max(0L, (offset - existingOffset))
offsetIncrease += max(0L, offset - existingOffset)
max(existingOffset, offset)
case None =>
// This partition was not committed to from this consumer yet. Therefore we do not know the offset
Expand Down

0 comments on commit c717359

Please sign in to comment.