diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 51c4f1279..01123113e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -47,6 +47,8 @@ trait Consumer { timeout: Duration = Duration.Infinity ): Task[Map[TopicPartition, Option[OffsetAndMetadata]]] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] + def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] /** @@ -454,6 +456,9 @@ private[consumer] final class ConsumerLive private[consumer] ( _.committed(partitions.asJava, timeout.asJava).asScala.map { case (k, v) => k -> Option(v) }.toMap ) + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = + runloopAccess.commitOrRetry(policy)(record) + override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 34ebaf6e6..1366e60bf 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -118,6 +118,16 @@ private[consumer] final class Runloop private ( private[internal] def commit(record: CommittableRecord[_, _]): Task[Unit] = commit.apply(Map(record.topicPartition -> record.record.offset())) + private[internal] def commitOrRetry[R]( + policy: Schedule[R, Throwable, Any] + )(record: CommittableRecord[_, _]): RIO[R, Unit] = + commit(record).retry( + Schedule.recurWhile[Throwable] { + case _: RetriableCommitFailedException => true + case _ => false + } && policy + ) + private val commit: Map[TopicPartition, Long] => Task[Unit] = offsets => for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 31e6587fc..a80c9b6e7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -30,9 +30,9 @@ private[consumer] final class RunloopAccess private ( diagnostics: Diagnostics ) { - private def withRunloopZIO[E]( + private def withRunloopZIO[R, E]( shouldStartIfNot: Boolean - )(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] = + )(whenRunning: Runloop => ZIO[R, E, Unit]): ZIO[R, E, Unit] = runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply) }.flatMap { @@ -68,6 +68,9 @@ private[consumer] final class RunloopAccess private ( def commit(record: CommittableRecord[_, _]): Task[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.commit(record)) + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(record)) + } private[consumer] object RunloopAccess {