diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index bab6d13f6..90643afdd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -42,9 +42,9 @@ trait Consumer { def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] def commit(offsetBatch: OffsetBatch): Task[Unit] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit] + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] /** * Retrieve the last committed offset for the given topic-partitions @@ -468,25 +468,25 @@ private[consumer] final class ConsumerLive private[consumer] ( } override def commit(record: ConsumerRecord[_, _]): Task[Unit] = - runloopAccess.commit(OffsetBatch.from(record)) + commit(OffsetBatch.from(record)) override def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] = - runloopAccess.commit(OffsetBatch.from(records)) + commit(OffsetBatch.from(records)) override def commit(offsetBatch: OffsetBatch): Task[Unit] = runloopAccess.commit(offsetBatch) - override def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] = - runloopAccess.commitOrRetry(policy)(OffsetBatch.from(record)) + override def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] = + commitOrRetry(retryPolicy, OffsetBatch.from(record)) override def commitOrRetry[R]( - policy: Schedule[R, Throwable, Any], + retryPolicy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]] ): RIO[R, Unit] = - runloopAccess.commitOrRetry(policy)(OffsetBatch.from(records)) + commitOrRetry(retryPolicy, OffsetBatch.from(records)) - override def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] = - runloopAccess.commitOrRetry(policy)(offsetBatch) + override def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] = + runloopAccess.commitOrRetry(retryPolicy, offsetBatch) override def committed( partitions: Set[TopicPartition], diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 86d3d1494..f074fc1fd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -116,12 +116,15 @@ private[consumer] final class Runloop private ( } } - private[internal] def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = + private[internal] def commitOrRetry[R]( + retryPolicy: Schedule[R, Throwable, Any], + offsetBatch: OffsetBatch + ): RIO[R, Unit] = commit(offsetBatch).retry( Schedule.recurWhile[Throwable] { case _: RetriableCommitFailedException => true case _ => false - } && policy + } && retryPolicy ) private[internal] def commit(offsets: OffsetBatch): Task[Unit] = diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index ea4c9fc28..db76039f5 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -69,8 +69,8 @@ private[consumer] final class RunloopAccess private ( def commit(offsetBatch: OffsetBatch): Task[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.commit(offsetBatch)) - def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = - withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(offsetBatch)) + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(retryPolicy, offsetBatch)) }