Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Sep 9, 2023
1 parent a60db8e commit a73a345
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 15 deletions.
22 changes: 11 additions & 11 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ trait Consumer {
def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit]
def commit(offsetBatch: OffsetBatch): Task[Unit]

def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit]
def commitOrRetry[R](policy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit]
def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit]
def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit]
def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit]
def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit]

/**
* Retrieve the last committed offset for the given topic-partitions
Expand Down Expand Up @@ -468,25 +468,25 @@ private[consumer] final class ConsumerLive private[consumer] (
}

override def commit(record: ConsumerRecord[_, _]): Task[Unit] =
runloopAccess.commit(OffsetBatch.from(record))
commit(OffsetBatch.from(record))

override def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] =
runloopAccess.commit(OffsetBatch.from(records))
commit(OffsetBatch.from(records))

override def commit(offsetBatch: OffsetBatch): Task[Unit] =
runloopAccess.commit(offsetBatch)

override def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] =
runloopAccess.commitOrRetry(policy)(OffsetBatch.from(record))
override def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] =
commitOrRetry(retryPolicy, OffsetBatch.from(record))

override def commitOrRetry[R](
policy: Schedule[R, Throwable, Any],
retryPolicy: Schedule[R, Throwable, Any],
records: Chunk[ConsumerRecord[_, _]]
): RIO[R, Unit] =
runloopAccess.commitOrRetry(policy)(OffsetBatch.from(records))
commitOrRetry(retryPolicy, OffsetBatch.from(records))

override def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] =
runloopAccess.commitOrRetry(policy)(offsetBatch)
override def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] =
runloopAccess.commitOrRetry(retryPolicy, offsetBatch)

override def committed(
partitions: Set[TopicPartition],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,15 @@ private[consumer] final class Runloop private (
}
}

private[internal] def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] =
private[internal] def commitOrRetry[R](
retryPolicy: Schedule[R, Throwable, Any],
offsetBatch: OffsetBatch
): RIO[R, Unit] =
commit(offsetBatch).retry(
Schedule.recurWhile[Throwable] {
case _: RetriableCommitFailedException => true
case _ => false
} && policy
} && retryPolicy
)

private[internal] def commit(offsets: OffsetBatch): Task[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ private[consumer] final class RunloopAccess private (
def commit(offsetBatch: OffsetBatch): Task[Unit] =
withRunloopZIO(shouldStartIfNot = false)(_.commit(offsetBatch))

def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] =
withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(offsetBatch))
def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] =
withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(retryPolicy, offsetBatch))

}

Expand Down

0 comments on commit a73a345

Please sign in to comment.