Skip to content

Commit

Permalink
Raw implementation of #1022 (comment)
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Sep 9, 2023
1 parent f065200 commit 372e92f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 6 deletions.
13 changes: 13 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ trait Consumer {

def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit]

/**
* The `UIO` represents the action of putting the Record(s) in the "commit batch" The `Task` represents the action of
* commiting the batch to Kaka
*/
def commitAccumBatch[R](
commitschedule: Schedule[R, Any, Any]
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]]

def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]]

/**
Expand Down Expand Up @@ -459,6 +467,11 @@ private[consumer] final class ConsumerLive private[consumer] (
override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] =
runloopAccess.commitOrRetry(policy)(record)

override def commitAccumBatch[R](
commitSchedule: Schedule[R, Any, Any]
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] =
runloopAccess.commitAccumBatch(commitSchedule)

override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] =
consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap)

Expand Down
36 changes: 34 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private[consumer] final class Runloop private (
userRebalanceListener: RebalanceListener,
restartStreamsOnRebalancing: Boolean,
currentStateRef: Ref[State],
fetchStrategy: FetchStrategy
fetchStrategy: FetchStrategy,
runloopScope: Scope
) {

private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] =
Expand Down Expand Up @@ -128,6 +129,35 @@ private[consumer] final class Runloop private (
} && policy
)

// noinspection YieldingZIOEffectInspection
private[internal] def commitAccumBatch[R](
commitSchedule: Schedule[R, Any, Any]
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] =
for {
acc <- Ref.Synchronized.make(Map.empty[TopicPartition, Long] -> List.empty[Promise[Throwable, Unit]])
_ <- acc.updateZIO { case (offsets, promises) =>
if (offsets.isEmpty) ZIO.succeed((offsets, promises))
else
commit(offsets)
.foldZIO(
e => ZIO.foreachDiscard(promises)(_.fail(e)),
_ => ZIO.foreachDiscard(promises)(_.succeed(()))
)
.as((Map.empty[TopicPartition, Long], List.empty[Promise[Throwable, Unit]]))
}
.schedule(commitSchedule)
.forkIn(runloopScope)
} yield { (records: Chunk[CommittableRecord[_, _]]) =>
for {
p <- Promise.make[Throwable, Unit]
_ <- acc.update { case (offsets, promises) =>
val newOffsets = offsets ++ records.map(record => record.topicPartition -> record.record.offset())
val newPromises = promises :+ p
(newOffsets, newPromises)
}
} yield p.await
}

private val commit: Map[TopicPartition, Long] => Task[Unit] =
offsets =>
for {
Expand Down Expand Up @@ -587,6 +617,7 @@ private[consumer] object Runloop {
initialState = State.initial
currentStateRef <- Ref.make(initialState)
runtime <- ZIO.runtime[Any]
scope <- ZIO.scope
runloop = new Runloop(
runtime = runtime,
hasGroupId = hasGroupId,
Expand All @@ -602,7 +633,8 @@ private[consumer] object Runloop {
userRebalanceListener = userRebalanceListener,
restartStreamsOnRebalancing = restartStreamsOnRebalancing,
currentStateRef = currentStateRef,
fetchStrategy = fetchStrategy
fetchStrategy = fetchStrategy,
runloopScope = scope
)
_ <- ZIO.logDebug("Starting Runloop")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,22 @@ private[consumer] final class RunloopAccess private (
diagnostics: Diagnostics
) {

private def withRunloopZIO[R, E](
private def withRunloopZIO__[R, E, A](
shouldStartIfNot: Boolean
)(whenRunning: Runloop => ZIO[R, E, Unit]): ZIO[R, E, Unit] =
)(whenRunning: Runloop => ZIO[R, E, A])(orElse: ZIO[R, E, A]): ZIO[R, E, A] =
runloopStateRef.updateSomeAndGetZIO {
case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply)
}.flatMap {
case RunloopState.NotStarted => ZIO.unit
case RunloopState.NotStarted => orElse
case RunloopState.Started(runloop) => whenRunning(runloop)
case RunloopState.Finalized => ZIO.unit
case RunloopState.Finalized => orElse
}

private def withRunloopZIO[R, E](shouldStartIfNot: Boolean)(
whenRunning: Runloop => ZIO[R, E, Unit]
): ZIO[R, E, Unit] =
withRunloopZIO__(shouldStartIfNot)(whenRunning)(ZIO.unit)

/**
* No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped.
*/
Expand Down Expand Up @@ -71,6 +76,13 @@ private[consumer] final class RunloopAccess private (
def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] =
withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(record))

def commitAccumBatch[R](
commitschedule: Schedule[R, Any, Any]
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] =
withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))(
ZIO.succeed((_: Chunk[CommittableRecord[_, _]]) => ZIO.succeed(ZIO.unit))
)

}

private[consumer] object RunloopAccess {
Expand Down

0 comments on commit 372e92f

Please sign in to comment.