Skip to content

Commit

Permalink
Reduce diff
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Nov 26, 2024
1 parent 7050acc commit 208cd46
Showing 1 changed file with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ private[consumer] final class LiveCommitter(
}
} yield ()

private def mergeCommitOffsets(commits: Chunk[Commit]): Map[TopicPartition, OffsetAndMetadata] =
commits
.foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) =>
commit.offsets.foreach { case (tp, offset) =>
acc += (tp -> acc
.get(tp)
.map(current => if (current.offset() > offset.offset()) current else offset)
.getOrElse(offset))
}
acc
}
.toMap

private def handleCommitCompletion(
commits: Chunk[Commit],
offsets: Map[TopicPartition, OffsetAndMetadata],
Expand Down Expand Up @@ -100,19 +113,6 @@ private[consumer] final class LiveCommitter(
}
.unit

private def mergeCommitOffsets(commits: Chunk[Commit]): Map[TopicPartition, OffsetAndMetadata] =
commits
.foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) =>
commit.offsets.foreach { case (tp, offset) =>
acc += (tp -> acc
.get(tp)
.map(current => if (current.offset() > offset.offset()) current else offset)
.getOrElse(offset))
}
acc
}
.toMap

/**
* Wrapper that converts KafkaConsumer#commitAsync to ZIO
*
Expand Down

0 comments on commit 208cd46

Please sign in to comment.