Skip to content

Commit

Permalink
Decouple stream halt detection timeout from max poll interval (#1376)
Browse files Browse the repository at this point in the history
Fixes #1262

---------

Co-authored-by: Erik van Oosten <[email protected]>
  • Loading branch information
svroonland and erikvanoosten authored Nov 13, 2024
1 parent e1a4144 commit 504074f
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault {
for {
control <- createTestControl
now <- Clock.nanoTime
exceeded <- control.maxPollIntervalExceeded(now)
exceeded <- control.maxStreamPullIntervalExceeded(now)
} yield assertTrue(!exceeded)
},
test("maxPollIntervalExceeded returns true after timeout") {
Expand All @@ -123,7 +123,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault {
_ <- control.offerRecords(createTestRecords(1))
now <- Clock.nanoTime
futureTime = now + Duration.fromSeconds(31).toNanos
exceeded <- control.maxPollIntervalExceeded(futureTime)
exceeded <- control.maxStreamPullIntervalExceeded(futureTime)
} yield assertTrue(exceeded)
}
),
Expand Down
4 changes: 3 additions & 1 deletion zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ object Consumer {
* - creating `access` as a fair semaphore with a single permit,
* - acquire a permit from `access` before using the consumer, and release if afterwards,
* - not using the following consumer methods: `subscribe`, `unsubscribe`, `assign`, `poll`, `commit*`, `seek`,
* `pause`, `resume`, and `enforceRebalance`.
* `pause`, `resume`, and `enforceRebalance`,
* - keeping the consumer config given to the java consumer in sync with the properties in `settings` (for example
* by constructing `settings` with `ConsumerSettings(bootstrapServers).withProperties(config)`).
*
* Any deviation of these rules is likely to cause hard to track errors.
*
Expand Down
32 changes: 23 additions & 9 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ final case class ConsumerSettings(
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(),
metricLabels: Set[MetricLabel] = Set.empty,
runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis),
authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis)
authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis),
maxStreamPullIntervalOption: Option[Duration] = None
) {
// Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType:
require(
Expand Down Expand Up @@ -153,21 +154,34 @@ final case class ConsumerSettings(
* Set Kafka's `max.poll.interval.ms` configuration. See
* https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms for more information.
*
* Zio-kafka uses this value also to determine whether a stream stopped processing. If no chunks are pulled from a
* stream for this interval (while data is available) we consider the stream to be halted. When this happens we
* interrupt the stream with a failure. In addition the entire consumer is shutdown. In future versions of zio-kafka
* we may (instead of a shutdown) stop only the affected subscription.
*
* The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval. The
* maximum number of records in a single poll is configured with the `max.poll.records` configuration (see
* https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records and [[withMaxPollRecords]]).
* The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval. See also
* the [[withMaxPollRecords maxPollRecords]] configuration.
*/
def withMaxPollInterval(maxPollInterval: Duration): ConsumerSettings =
withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)

/**
* The maximum time a stream may run without pulling a chunk of records.
*
* Zio-kafka uses this value to determine whether a stream stopped processing. This is to safeguard against alive
* consumers in the consumer group which hold partition assignments but make no progress. If no chunks are pulled by
* user code from a partition stream for this interval (while data is available) we consider the stream to be halted.
* When this happens we interrupt the stream with a failure. In addition, the entire consumer is shutdown. In future
* versions of zio-kafka we may (instead of a shutdown) stop only the affected subscription.
*
* Make sure that all records from a single poll (see [[withMaxPollRecords maxPollRecords]]) can be processed in this
* interval, even when there is no concurrency because the records are all in the same partition.
*
* The default is equal to [[withMaxPollInterval maxPollInterval]]).
*/
def withMaxStreamPullInterval(maxStreamPullInterval: Duration): ConsumerSettings =
copy(maxStreamPullIntervalOption = Some(maxStreamPullInterval))

/**
* Set Kafka's `max.poll.records` configuration. See
* https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records for more information.
*
* The default is 500.
*/
def withMaxPollRecords(maxPollRecords: Int): ConsumerSettings =
withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ abstract class PartitionStream {
* the last pulled offset (if any). The promise completes when the stream completed.
* @param queueInfoRef
* used to track the stream's pull deadline, its queue size, and last pulled offset
* @param maxPollInterval
* see [[zio.kafka.consumer.ConsumerSettings.withMaxPollInterval()]]
* @param maxStreamPullInterval
* see [[zio.kafka.consumer.ConsumerSettings.withMaxStreamPullInterval()]]
*/
final class PartitionStreamControl private (
val tp: TopicPartition,
Expand All @@ -41,9 +41,9 @@ final class PartitionStreamControl private (
interruptionPromise: Promise[Throwable, Nothing],
val completedPromise: Promise[Nothing, Option[Offset]],
queueInfoRef: Ref[QueueInfo],
maxPollInterval: Duration
maxStreamPullInterval: Duration
) extends PartitionStream {
private val maxPollIntervalNanos = maxPollInterval.toNanos
private val maxStreamPullIntervalNanos = maxStreamPullInterval.toNanos

private val logAnnotate = ZIO.logAnnotate(
LogAnnotation("topic", tp.topic()),
Expand All @@ -57,7 +57,7 @@ final class PartitionStreamControl private (
} else {
for {
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
newPullDeadline = now + maxStreamPullIntervalNanos
_ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size))
_ <- dataQueue.offer(Take.chunk(data))
} yield ()
Expand All @@ -81,14 +81,12 @@ final class PartitionStreamControl private (
* `true` when the stream has data available, but none has been pulled for more than `maxPollInterval` (since data
* became available), `false` otherwise
*/
private[internal] def maxPollIntervalExceeded(now: NanoTime): UIO[Boolean] =
private[internal] def maxStreamPullIntervalExceeded(now: NanoTime): UIO[Boolean] =
queueInfoRef.get.map(_.deadlineExceeded(now))

/** To be invoked when the stream is no longer processing. */
private[internal] def halt: UIO[Unit] = {
val timeOutMessage = s"No records were polled for more than $maxPollInterval for topic partition $tp. " +
"Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " +
"needs more time."
val timeOutMessage = s"No records were pulled for more than $maxStreamPullInterval for topic partition $tp."
val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace
interruptionPromise.fail(consumeTimeout).unit
}
Expand Down Expand Up @@ -130,14 +128,14 @@ object PartitionStreamControl {
tp: TopicPartition,
requestData: UIO[Unit],
diagnostics: Diagnostics,
maxPollInterval: Duration
maxStreamPullInterval: Duration
): UIO[PartitionStreamControl] = {
val maxPollIntervalNanos = maxPollInterval.toNanos
val maxStreamPullIntervalNanos = maxStreamPullInterval.toNanos

def registerPull(queueInfo: Ref[QueueInfo], records: Chunk[ByteArrayCommittableRecord]): UIO[Unit] =
for {
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
newPullDeadline = now + maxStreamPullIntervalNanos
_ <- queueInfo.update(_.withPull(newPullDeadline, records))
} yield ()

Expand Down Expand Up @@ -184,7 +182,7 @@ object PartitionStreamControl {
interruptionPromise,
completedPromise,
queueInfo,
maxPollInterval
maxStreamPullInterval
)
}

Expand Down
27 changes: 18 additions & 9 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ private[consumer] final class Runloop private (
topLevelExecutor: Executor,
sameThreadRuntime: Runtime[Any],
consumer: ConsumerAccess,
maxPollInterval: Duration,
commitQueue: Queue[Commit],
commandQueue: Queue[RunloopCommand],
lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent],
partitionsHub: Hub[Take[Throwable, PartitionAssignment]],
diagnostics: Diagnostics,
maxStreamPullInterval: Duration,
maxRebalanceDuration: Duration,
currentStateRef: Ref[State],
committedOffsetsRef: Ref[CommitOffsets]
Expand All @@ -48,7 +48,7 @@ private[consumer] final class Runloop private (
tp,
commandQueue.offer(RunloopCommand.Request(tp)).unit,
diagnostics,
maxPollInterval
maxStreamPullInterval
)

def stopConsumption: UIO[Unit] =
Expand Down Expand Up @@ -657,7 +657,7 @@ private[consumer] final class Runloop private (
pollResult.records
)
updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending)
_ <- checkStreamPollInterval(pollResult.assignedStreams)
_ <- checkStreamPullInterval(pollResult.assignedStreams)
} yield state.copy(
pendingRequests = fulfillResult.pendingRequests,
pendingCommits = updatedPendingCommits,
Expand All @@ -666,20 +666,29 @@ private[consumer] final class Runloop private (
}

/**
* Check each stream to see if it exceeded its poll interval. If so, halt it. In addition, if any stream has exceeded
* its poll interval, shutdown the consumer.
* Check each stream to see if it exceeded its pull interval. If so, halt it. In addition, if any stream has exceeded
* its pull interval, shutdown the consumer.
*/
private def checkStreamPollInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] =
private def checkStreamPullInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = {
def logShutdown(stream: PartitionStreamControl): ZIO[Any, Nothing, Unit] =
ZIO.logError(
s"Stream for ${stream.tp} has not pulled chunks for more than $maxStreamPullInterval, shutting down. " +
"Use ConsumerSettings.withMaxPollInterval or .withMaxStreamPullInterval to set a longer interval when " +
"processing a batch of records needs more time."
)

for {
now <- Clock.nanoTime
anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) =>
stream
.maxPollIntervalExceeded(now)
.maxStreamPullIntervalExceeded(now)
.tap(ZIO.when(_)(logShutdown(stream)))
.tap(exceeded => if (exceeded) stream.halt else ZIO.unit)
.map(acc || _)
}
_ <- shutdown.when(anyExceeded)
} yield ()
}

private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): Task[State] = {
def doChangeSubscription(newSubscriptionState: SubscriptionState): Task[State] =
Expand Down Expand Up @@ -936,7 +945,7 @@ object Runloop {

private[consumer] def make(
settings: ConsumerSettings,
maxPollInterval: Duration,
maxStreamPullInterval: Duration,
maxRebalanceDuration: Duration,
diagnostics: Diagnostics,
consumer: ConsumerAccess,
Expand All @@ -957,12 +966,12 @@ object Runloop {
topLevelExecutor = executor,
sameThreadRuntime = sameThreadRuntime,
consumer = consumer,
maxPollInterval = maxPollInterval,
commitQueue = commitQueue,
commandQueue = commandQueue,
lastRebalanceEvent = lastRebalanceEvent,
partitionsHub = partitionsHub,
diagnostics = diagnostics,
maxStreamPullInterval = maxStreamPullInterval,
maxRebalanceDuration = maxRebalanceDuration,
currentStateRef = currentStateRef,
committedOffsetsRef = committedOffsetsRef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ private[consumer] object RunloopAccess {
): ZIO[Scope, Throwable, RunloopAccess] =
for {
maxPollInterval <- maxPollIntervalConfig(settings)
maxStreamPullInterval = settings.maxStreamPullIntervalOption.getOrElse(maxPollInterval)
// See scaladoc of [[ConsumerSettings.withMaxRebalanceDuration]]:
maxRebalanceDuration = settings.maxRebalanceDuration.getOrElse(((maxPollInterval.toNanos / 5L) * 3L).nanos)
// This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer
Expand All @@ -90,7 +91,7 @@ private[consumer] object RunloopAccess {
makeRunloop = Runloop
.make(
settings = settings,
maxPollInterval = maxPollInterval,
maxStreamPullInterval = maxStreamPullInterval,
maxRebalanceDuration = maxRebalanceDuration,
diagnostics = diagnostics,
consumer = consumerAccess,
Expand Down

0 comments on commit 504074f

Please sign in to comment.