Skip to content

Commit

Permalink
Add Consumer.commit and Consumer.commitOrRetry methods
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Sep 9, 2023
1 parent d365961 commit f065200
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
5 changes: 5 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ trait Consumer {
timeout: Duration = Duration.Infinity
): Task[Map[TopicPartition, Option[OffsetAndMetadata]]]

def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit]

def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]]

/**
Expand Down Expand Up @@ -454,6 +456,9 @@ private[consumer] final class ConsumerLive private[consumer] (
_.committed(partitions.asJava, timeout.asJava).asScala.map { case (k, v) => k -> Option(v) }.toMap
)

override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] =
runloopAccess.commitOrRetry(policy)(record)

override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] =
consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap)

Expand Down
10 changes: 10 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ private[consumer] final class Runloop private (
private[internal] def commit(record: CommittableRecord[_, _]): Task[Unit] =
commit.apply(Map(record.topicPartition -> record.record.offset()))

private[internal] def commitOrRetry[R](
policy: Schedule[R, Throwable, Any]
)(record: CommittableRecord[_, _]): RIO[R, Unit] =
commit(record).retry(
Schedule.recurWhile[Throwable] {
case _: RetriableCommitFailedException => true
case _ => false
} && policy
)

private val commit: Map[TopicPartition, Long] => Task[Unit] =
offsets =>
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ private[consumer] final class RunloopAccess private (
diagnostics: Diagnostics
) {

private def withRunloopZIO[E](
private def withRunloopZIO[R, E](
shouldStartIfNot: Boolean
)(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] =
)(whenRunning: Runloop => ZIO[R, E, Unit]): ZIO[R, E, Unit] =
runloopStateRef.updateSomeAndGetZIO {
case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply)
}.flatMap {
Expand Down Expand Up @@ -68,6 +68,9 @@ private[consumer] final class RunloopAccess private (
def commit(record: CommittableRecord[_, _]): Task[Unit] =
withRunloopZIO(shouldStartIfNot = false)(_.commit(record))

def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] =
withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(record))

}

private[consumer] object RunloopAccess {
Expand Down

0 comments on commit f065200

Please sign in to comment.