Skip to content

Commit

Permalink
Merge branch 'master' into fetchstrategy-unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten authored Oct 14, 2023
2 parents 022c414 + e1c00c8 commit 92309f2
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 34 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ lazy val embeddedKafkaVersion = "3.6.0" // Should be the same as kafkaVersion, e

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2"
lazy val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.3"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
val zioSbtVersion = "0.4.0-alpha.21"
val zioSbtVersion = "0.4.0-alpha.22"

addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion)
Expand Down
93 changes: 62 additions & 31 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.stream._

import java.util
import java.util.{ Map => JavaMap }
import scala.collection.mutable
import scala.jdk.CollectionConverters._

//noinspection SimplifyWhenInspection,SimplifyUnlessInspection
Expand Down Expand Up @@ -110,6 +112,7 @@ private[consumer] final class Runloop private (
}
}

/** This is the implementation behind the user facing api `Offset.commit`. */
private val commit: Map[TopicPartition, Long] => Task[Unit] =
offsets =>
for {
Expand All @@ -119,33 +122,57 @@ private[consumer] final class Runloop private (
_ <- p.await.timeoutFail(CommitTimeout)(commitTimeout)
} yield ()

private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = {
val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => cmd.cont.done(e).asInstanceOf[UIO[Unit]]
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsets))
/** Merge commits and prepare parameters for calling `consumer.commitAsync`. */
private def asyncCommitParameters(
commits: Chunk[RunloopCommand.Commit]
): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = {
val offsets = commits
.foldLeft(mutable.Map.empty[TopicPartition, Long]) { case (acc, commit) =>
commit.offsets.foreach { case (tp, offset) =>
acc += (tp -> acc.get(tp).map(_ max offset).getOrElse(offset))
}
acc
}
.toMap
val offsetsWithMetaData = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e))
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *>
commandQueue.offer(cmd).unit
case err =>
cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err))
for {
_ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried")
_ <- commandQueue.offerAll(commits)
} yield ()
case err: Throwable =>
cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err))
}
val callback =
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit =
Unsafe.unsafe { implicit u =>
runtime.unsafe.run(if (exception eq null) onSuccess else onFailure(exception)).getOrThrowFiberFailure()
runtime.unsafe.run {
if (exception eq null) onSuccess else onFailure(exception)
}
.getOrThrowFiberFailure()
}
}
(offsetsWithMetaData.asJava, callback, onFailure)
}

// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
consumer.runloopAccess { c =>
ZIO
.attempt(c.commitAsync(offsets.asJava, callback))
private def handleCommits(state: State, commits: Chunk[RunloopCommand.Commit]): UIO[State] =
if (commits.isEmpty) {
ZIO.succeed(state)
} else {
val (offsets, callback, onFailure) = asyncCommitParameters(commits)
val newState = state.addCommits(commits)
consumer.runloopAccess { c =>
// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
ZIO.attempt(c.commitAsync(offsets, callback))
}
.catchAll(onFailure)
.as(newState)
}
}

/**
* Does all needed to end revoked partitions:
Expand Down Expand Up @@ -235,7 +262,9 @@ private[consumer] final class Runloop private (
.as(tps)
}

// Pause partitions for which there is no demand and resume those for which there is now demand
/**
* Pause partitions for which there is no demand and resume those for which there is now demand.
*/
private def resumeAndPausePartitions(
c: ByteArrayKafkaConsumer,
assignment: Set[TopicPartition],
Expand Down Expand Up @@ -337,8 +366,9 @@ private[consumer] final class Runloop private (
}
}
startingStreams <-
if (pollResult.startingTps.isEmpty) ZIO.succeed(Chunk.empty[PartitionStreamControl])
else {
if (pollResult.startingTps.isEmpty) {
ZIO.succeed(Chunk.empty[PartitionStreamControl])
} else {
ZIO
.foreach(Chunk.fromIterable(pollResult.startingTps))(newPartitionStream)
.tap { newStreams =>
Expand Down Expand Up @@ -385,7 +415,6 @@ private[consumer] final class Runloop private (

cmd match {
case req: RunloopCommand.Request => ZIO.succeed(state.addRequest(req))
case cmd: RunloopCommand.Commit => doCommit(cmd).as(state.addCommit(cmd))
case cmd @ RunloopCommand.AddSubscription(newSubscription, _) =>
state.subscriptionState match {
case SubscriptionState.NotSubscribed =>
Expand Down Expand Up @@ -468,15 +497,15 @@ private[consumer] final class Runloop private (

/**
* Poll behavior:
* - Run until stop is set to true
* - Process commands as soon as they are queued, unless in the middle of polling
* - Process all currently queued commands before polling instead of one by one
* - Immediately after polling, if there are available commands, process them instead of waiting until some periodic
* trigger
* - Poll only when subscribed (leads to exceptions from the Apache Kafka Consumer if not)
* - Poll continuously when there are (still) unfulfilled requests or pending commits
* - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after
* initialization and rebalancing
* - Run until the StopRunloop command is received
* - Process all currently queued Commits
* - Process all currently queued commands
* - Poll the Kafka broker
* - After command handling and after polling, determine whether the runloop should continue:
* - Poll only when subscribed (leads to exceptions from the Apache Kafka Consumer if not)
* - Poll continuously when there are (still) unfulfilled requests or pending commits
* - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after
* initialization and rebalancing
*/
private def run(initialState: State): ZIO[Scope, Throwable, Any] = {
import Runloop.StreamOps
Expand All @@ -488,8 +517,10 @@ private[consumer] final class Runloop private (
.runFoldChunksDiscardZIO(initialState) { (state, commands) =>
for {
_ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}")
commitCommands = commands.collect { case cmd: RunloopCommand.Commit => cmd }
stateAfterCommits <- handleCommits(state, commitCommands)
streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd }
stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand)
stateAfterCommands <- ZIO.foldLeft(streamCommands)(stateAfterCommits)(handleCommand)

updatedStateAfterPoll <- if (stateAfterCommands.shouldPoll) handlePoll(stateAfterCommands)
else ZIO.succeed(stateAfterCommands)
Expand Down Expand Up @@ -608,8 +639,8 @@ private[consumer] object Runloop {
assignedStreams: Chunk[PartitionStreamControl],
subscriptionState: SubscriptionState
) {
def addCommit(c: RunloopCommand.Commit): State = copy(pendingCommits = pendingCommits :+ c)
def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r)
def addCommits(c: Chunk[RunloopCommand.Commit]): State = copy(pendingCommits = pendingCommits ++ c)
def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r)

def shouldPoll: Boolean =
subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object RunloopCommand {
case object StopRunloop extends Control
case object StopAllStreams extends StreamCommand

final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends StreamCommand {
final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends RunloopCommand {
@inline def isDone: UIO[Boolean] = cont.isDone
@inline def isPending: UIO[Boolean] = isDone.negate
}
Expand Down

0 comments on commit 92309f2

Please sign in to comment.