Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically merge commits #1073

Merged
merged 2 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.stream._

import java.util
import java.util.{ Map => JavaMap }
import scala.collection.mutable
import scala.jdk.CollectionConverters._

//noinspection SimplifyWhenInspection,SimplifyUnlessInspection
Expand Down Expand Up @@ -110,6 +112,7 @@ private[consumer] final class Runloop private (
}
}

/** This is the implementation behind the user facing api `Offset.commit`. */
private val commit: Map[TopicPartition, Long] => Task[Unit] =
offsets =>
for {
Expand All @@ -119,33 +122,57 @@ private[consumer] final class Runloop private (
_ <- p.await.timeoutFail(CommitTimeout)(commitTimeout)
} yield ()

private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = {
val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => cmd.cont.done(e).asInstanceOf[UIO[Unit]]
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsets))
/** Merge commits and prepare parameters for calling `consumer.commitAsync`. */
private def asyncCommitParameters(
commits: Chunk[RunloopCommand.Commit]
): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = {
val offsets = commits
guizmaii marked this conversation as resolved.
Show resolved Hide resolved
.foldLeft(mutable.Map.empty[TopicPartition, Long]) { case (acc, commit) =>
commit.offsets.foreach { case (tp, offset) =>
acc += (tp -> acc.get(tp).map(_ max offset).getOrElse(offset))
}
acc
}
.toMap
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
val offsetsWithMetaData = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e))
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *>
commandQueue.offer(cmd).unit
case err =>
cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err))
for {
_ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried")
_ <- commandQueue.offerAll(commits)
} yield ()
case err: Throwable =>
cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err))
}
val callback =
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit =
Unsafe.unsafe { implicit u =>
runtime.unsafe.run(if (exception eq null) onSuccess else onFailure(exception)).getOrThrowFiberFailure()
runtime.unsafe.run {
if (exception eq null) onSuccess else onFailure(exception)
}
.getOrThrowFiberFailure()
}
}
(offsetsWithMetaData.asJava, callback, onFailure)
}

// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
consumer.runloopAccess { c =>
ZIO
.attempt(c.commitAsync(offsets.asJava, callback))
private def handleCommits(state: State, commits: Chunk[RunloopCommand.Commit]): UIO[State] =
if (commits.isEmpty) {
ZIO.succeed(state)
} else {
val (offsets, callback, onFailure) = asyncCommitParameters(commits)
val newState = state.addCommits(commits)
consumer.runloopAccess { c =>
// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
ZIO.attempt(c.commitAsync(offsets, callback))
}
.catchAll(onFailure)
.as(newState)
Comment on lines +166 to +174
Copy link
Member

@guizmaii guizmaii Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 I'd wrap this in a ZIO.suspendSucced:

   ZIO.suspendSucced {
      val (offsets, callback, onFailure) = asyncCommitParameters(commits)
      val newState                       = state.addCommits(commits)
      consumer.runloopAccess { c =>
        // We don't wait for the completion of the commit here, because it
        // will only complete once we poll again.
        ZIO.attempt(c.commitAsync(offsets, callback))
      }
        .catchAll(onFailure)
        .as(newState)
  }

So that all the code is evaluated when the returned IO is evaluated. It makes code simpler to follow IMO

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'll prepare another PR with that change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, this is a pure method, it doesn't return a ZIO so this suggestion doesn't make sense.

}
}

/**
* Does all needed to end revoked partitions:
Expand Down Expand Up @@ -235,7 +262,9 @@ private[consumer] final class Runloop private (
.as(tps)
}

// Pause partitions for which there is no demand and resume those for which there is now demand
/**
* Pause partitions for which there is no demand and resume those for which there is now demand.
*/
private def resumeAndPausePartitions(
c: ByteArrayKafkaConsumer,
assignment: Set[TopicPartition],
Expand Down Expand Up @@ -337,8 +366,9 @@ private[consumer] final class Runloop private (
}
}
startingStreams <-
if (pollResult.startingTps.isEmpty) ZIO.succeed(Chunk.empty[PartitionStreamControl])
else {
if (pollResult.startingTps.isEmpty) {
ZIO.succeed(Chunk.empty[PartitionStreamControl])
} else {
ZIO
.foreach(Chunk.fromIterable(pollResult.startingTps))(newPartitionStream)
.tap { newStreams =>
Expand Down Expand Up @@ -385,7 +415,6 @@ private[consumer] final class Runloop private (

cmd match {
case req: RunloopCommand.Request => ZIO.succeed(state.addRequest(req))
case cmd: RunloopCommand.Commit => doCommit(cmd).as(state.addCommit(cmd))
Copy link
Collaborator

@svroonland svroonland Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep this line here and change doCommit to just update the state, and then execute the commits before calling handlePoll? That would avoid the need to collect the commit commands separately, which keeps the core of the Runloop loop a bit cleaner IMO and reduces the diff.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle commits before the other commands. The other commands might stop the subscription, or stuff like that. We have an open issue (#852) about not handling all the commits when the streams are closed. By handling commits first, this is somewhat prevented (we still need a more fundamental solution though).

Because we need the commit command first, we need to collect them.

(Aside: in the past we didn't use collect for commands, instead we pattern matched inside the handleCommand method. You can ask @guizmaii about this change.)

IMHO splitting commit handling over multiple methods will decrease legibility, and is already pretty complex.

Finally, please keep in mind that in the next PR the commits will come from a separate queue and the collect is replaced by pulling from the queue.

case cmd @ RunloopCommand.AddSubscription(newSubscription, _) =>
state.subscriptionState match {
case SubscriptionState.NotSubscribed =>
Expand Down Expand Up @@ -468,15 +497,15 @@ private[consumer] final class Runloop private (

/**
* Poll behavior:
* - Run until stop is set to true
* - Process commands as soon as they are queued, unless in the middle of polling
* - Process all currently queued commands before polling instead of one by one
* - Immediately after polling, if there are available commands, process them instead of waiting until some periodic
* trigger
* - Poll only when subscribed (leads to exceptions from the Apache Kafka Consumer if not)
* - Poll continuously when there are (still) unfulfilled requests or pending commits
* - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after
* initialization and rebalancing
* - Run until the StopRunloop command is received
* - Process all currently queued Commits
* - Process all currently queued commands
* - Poll the Kafka broker
* - After command handling and after polling, determine whether the runloop should continue:
* - Poll only when subscribed (leads to exceptions from the Apache Kafka Consumer if not)
* - Poll continuously when there are (still) unfulfilled requests or pending commits
* - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after
* initialization and rebalancing
*/
private def run(initialState: State): ZIO[Scope, Throwable, Any] = {
import Runloop.StreamOps
Expand All @@ -488,8 +517,10 @@ private[consumer] final class Runloop private (
.runFoldChunksDiscardZIO(initialState) { (state, commands) =>
for {
_ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}")
commitCommands = commands.collect { case cmd: RunloopCommand.Commit => cmd }
stateAfterCommits <- handleCommits(state, commitCommands)
streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd }
stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand)
stateAfterCommands <- ZIO.foldLeft(streamCommands)(stateAfterCommits)(handleCommand)

updatedStateAfterPoll <- if (stateAfterCommands.shouldPoll) handlePoll(stateAfterCommands)
else ZIO.succeed(stateAfterCommands)
Expand Down Expand Up @@ -608,8 +639,8 @@ private[consumer] object Runloop {
assignedStreams: Chunk[PartitionStreamControl],
subscriptionState: SubscriptionState
) {
def addCommit(c: RunloopCommand.Commit): State = copy(pendingCommits = pendingCommits :+ c)
def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r)
def addCommits(c: Chunk[RunloopCommand.Commit]): State = copy(pendingCommits = pendingCommits ++ c)
def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r)

def shouldPoll: Boolean =
subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object RunloopCommand {
case object StopRunloop extends Control
case object StopAllStreams extends StreamCommand

final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends StreamCommand {
final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends RunloopCommand {
@inline def isDone: UIO[Boolean] = cont.isDone
@inline def isPending: UIO[Boolean] = isDone.negate
}
Expand Down
Loading