From 06195d55d695739668e942dea9675ef1f6bfc266 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sat, 30 Sep 2023 14:34:23 +0200 Subject: [PATCH 1/2] WIP on new committing API --- .../zio/kafka/bench/ConsumerBenchmark.scala | 2 +- .../src/test/scala/zio/kafka/AdminSpec.scala | 11 +++-- .../zio/kafka/consumer/ConsumerSpec.scala | 20 ++++---- .../kafka/consumer/SubscriptionsSpec.scala | 2 +- ...setBatch.scala => CommittableOffset.scala} | 41 ++++++++++------ .../kafka/consumer/CommittableRecord.scala | 8 +--- .../scala/zio/kafka/consumer/Consumer.scala | 38 ++++++++++++++- .../scala/zio/kafka/consumer/Offset.scala | 47 ------------------- .../zio/kafka/consumer/internal/Runloop.scala | 16 +++---- .../consumer/internal/RunloopAccess.scala | 5 +- 10 files changed, 95 insertions(+), 95 deletions(-) rename zio-kafka/src/main/scala/zio/kafka/consumer/{OffsetBatch.scala => CommittableOffset.scala} (69%) delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala index b85b413fd..b41f024e0 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala @@ -67,7 +67,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { .aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis)) .tap(batch => counter.update(_ + batch.size)) .map(OffsetBatch.apply) - .mapZIO(_.commit) + .mapZIO(Consumer.commit) .takeUntilZIO(_ => counter.get.map(_ >= nrMessages)) .runDrain .provideSome[Kafka](env) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index 33416e7d0..756c7e76c 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -233,7 +233,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val records = committableRecords.map(_.record) val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - offsetBatch.commit.as(records) + Consumer.commit(offsetBatch).as(records) } .runCollect .provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID))) @@ -301,7 +301,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .map(_.offset) + .foreach(Consumer.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -344,7 +345,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .map(_.offset) + .foreach(Consumer.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -645,7 +647,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { groupInstanceId: Option[String] = None ): ZIO[Kafka, Throwable, Unit] = Consumer .plainStream(Subscription.topics(topicName), Serde.string, Serde.string) - .foreach(_.offset.commit) + .map(_.offset) + .foreach(Consumer.commit) .provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId)) private def getStableConsumerGroupDescription( diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index d68b93a8a..95b2784f2 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -196,7 +196,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val records = committableRecords.map(_.record) val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - offsetBatch.commit.as(records) + Consumer.commit(offsetBatch).as(records) } .runCollect .provideSomeLayer[Kafka]( @@ -214,7 +214,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val records = committableRecords.map(_.record) val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - offsetBatch.commit.as(records) + Consumer.commit(offsetBatch).as(records) } .runCollect .provideSomeLayer[Kafka]( @@ -287,7 +287,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .zipWithIndex .tap { case (record, idx) => (Consumer.stopConsumption <* ZIO.logDebug("Stopped consumption")).when(idx == 3) *> - record.offset.commit <* ZIO.logDebug(s"Committed $idx") + Consumer.commit(record.offset) <* ZIO.logDebug(s"Committed $idx") } .tap { case (_, idx) => ZIO.logDebug(s"Consumed $idx") } .runDrain @@ -315,7 +315,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield if (nr < 10) Seq(record.offset) else Seq.empty } .transduce(Consumer.offsetBatches) - .mapZIO(_.commit) + .mapZIO(Consumer.commit) .runDrain *> Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -343,7 +343,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(nrMessages.toLong) .transduce(Consumer.offsetBatches) .take(1) - .mapZIO(_.commit) + .mapZIO(Consumer.commit) .runDrain *> Consumer.committed((0 until nrPartitions).map(new TopicPartition(topic, _)).toSet)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -465,7 +465,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val records = committableRecords.map(_.record) val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - offsetBatch.commit.as(records) + Consumer.commit(offsetBatch).as(records) } .runCollect .provideSomeLayer[Kafka](consumer(client1, Some(group))) @@ -675,7 +675,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *> partitionStream.mapChunksZIO { records => - OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition) + Consumer.commit(OffsetBatch(records.map(_.offset))) *> messagesReceived(tp.partition) .update(_ + records.size) .as(records) } @@ -814,7 +814,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .mapZIO(record => messagesReceivedConsumer1.update(_ + 1).as(record)) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapZIO(Consumer.commit) .runDrain } .mapZIO(_ => drainCount.updateAndGet(_ + 1)) @@ -839,7 +839,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .mapZIO(record => messagesReceivedConsumer2.update(_ + 1).as(record)) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapZIO(Consumer.commit) .runDrain .provideSomeLayer[Kafka]( customConsumer("consumer2", Some(group)) @@ -1207,7 +1207,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(11) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) - .mapZIO(_.commit) // Hangs without timeout + .mapZIO(Consumer.commit) // Hangs without timeout .runDrain .exit .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds)) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index d33bdab52..60234e670 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -270,7 +270,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink .collectAll[CommittableRecord[String, String]] ) - .mapZIO { case (offsetBatch, records) => offsetBatch.commit.as(records) } + .mapZIO { case (offsetBatch, records) => Consumer.commit(offsetBatch).as(records) } .flattenChunks .runCollect .tap(records => recordsConsumed.update(_ ++ records)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala similarity index 69% rename from zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala rename to zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala index 3c0c0f6cc..276ab8400 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala @@ -2,23 +2,18 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.ConsumerGroupMetadata import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task, ZIO } -sealed trait OffsetBatch { +trait CommittableOffset { def offsets: Map[TopicPartition, Long] - def commit: Task[Unit] +} + +// TODO remove the trait and just keep the implementation..? +sealed trait OffsetBatch extends CommittableOffset { def add(offset: Offset): OffsetBatch @deprecated("Use add(Offset) instead", "2.1.4") def merge(offset: Offset): OffsetBatch def merge(offsets: OffsetBatch): OffsetBatch def consumerGroupMetadata: Option[ConsumerGroupMetadata] - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) } object OffsetBatch { @@ -29,11 +24,8 @@ object OffsetBatch { private final case class OffsetBatchImpl( offsets: Map[TopicPartition, Long], - commitHandle: Map[TopicPartition, Long] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ) extends OffsetBatch { - override def commit: Task[Unit] = commitHandle(offsets) - override def add(offset: Offset): OffsetBatch = copy( offsets = offsets + (offset.topicPartition -> (offsets @@ -57,9 +49,30 @@ private final case class OffsetBatchImpl( case object EmptyOffsetBatch extends OffsetBatch { override val offsets: Map[TopicPartition, Long] = Map.empty - override val commit: Task[Unit] = ZIO.unit override def add(offset: Offset): OffsetBatch = offset.batch override def merge(offset: Offset): OffsetBatch = add(offset) override def merge(offsets: OffsetBatch): OffsetBatch = offsets override def consumerGroupMetadata: Option[ConsumerGroupMetadata] = None } + +sealed trait Offset extends CommittableOffset { + def topic: String + def partition: Int + def offset: Long + def batch: OffsetBatch + def consumerGroupMetadata: Option[ConsumerGroupMetadata] + + final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition) +} + +object Offset {} + +private final case class OffsetImpl( + topic: String, + partition: Int, + offset: Long, + consumerGroupMetadata: Option[ConsumerGroupMetadata] +) extends Offset { + override def batch: OffsetBatch = OffsetBatchImpl(offsets, consumerGroupMetadata) + override def offsets: Map[TopicPartition, Long] = Map(topicPartition -> offset) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index f9583f5a7..c1c6870eb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -1,13 +1,12 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord } -import org.apache.kafka.common.TopicPartition +import zio.RIO import zio.kafka.serde.Deserializer -import zio.{ RIO, Task } +// TODO name CommittableRecord is now a bit meaningless final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], - private val commitHandle: Map[TopicPartition, Long] => Task[Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { def deserializeWith[R, K1, V1]( @@ -43,7 +42,6 @@ final case class CommittableRecord[K, V]( topic = record.topic(), partition = record.partition(), offset = record.offset(), - commitHandle = commitHandle, consumerGroupMetadata = consumerGroupMetadata ) } @@ -51,12 +49,10 @@ final case class CommittableRecord[K, V]( object CommittableRecord { def apply[K, V]( record: ConsumerRecord[K, V], - commitHandle: Map[TopicPartition, Long] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ): CommittableRecord[K, V] = new CommittableRecord( record = record, - commitHandle = commitHandle, consumerGroupMetadata = consumerGroupMetadata ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index a76169763..06996fb86 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -4,7 +4,8 @@ import org.apache.kafka.clients.consumer.{ Consumer => JConsumer, ConsumerRecord, OffsetAndMetadata, - OffsetAndTimestamp + OffsetAndTimestamp, + RetriableCommitFailedException } import org.apache.kafka.common._ import zio._ @@ -157,6 +158,14 @@ trait Consumer { * Expose internal consumer metrics */ def metrics: Task[Map[MetricName, Metric]] + + def commit(offset: CommittableOffset): Task[Unit] + + /** + * Attempts to commit and retries according to the given policy when the commit fails with a + * RetriableCommitFailedException + */ + def commitOrRetry[R](offset: CommittableOffset, policy: Schedule[R, Throwable, Any]): RIO[R, Unit] } object Consumer { @@ -392,6 +401,21 @@ object Consumer { def metrics: RIO[Consumer, Map[MetricName, Metric]] = ZIO.serviceWithZIO(_.metrics) + /** + * Accessor method + */ + def commit(offset: CommittableOffset): RIO[Consumer, Unit] = + ZIO.serviceWithZIO(_.commit(offset)) + + /** + * Accessor method + */ + def commitOrRetry[R]( + offset: CommittableOffset, + policy: Schedule[R, Throwable, Any] + ): ZIO[R with Consumer, Throwable, Unit] = + ZIO.serviceWithZIO[Consumer](_.commitOrRetry[R](offset, policy)) + sealed trait OffsetRetrieval object OffsetRetrieval { final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval @@ -517,7 +541,7 @@ private[consumer] final class ConsumerLive private[consumer] ( } .provideEnvironment(r) .aggregateAsync(offsetBatches) - .mapZIO(_.commitOrRetry(commitRetryPolicy)) + .mapZIO(commitOrRetry(_, commitRetryPolicy)) .runDrain } yield () @@ -550,4 +574,14 @@ private[consumer] final class ConsumerLive private[consumer] ( override def metrics: Task[Map[MetricName, Metric]] = consumer.withConsumer(_.metrics().asScala.toMap) + override def commit(offset: CommittableOffset): Task[Unit] = runloopAccess.commit(offset.offsets) + + override def commitOrRetry[R](offset: CommittableOffset, policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = + commit(offset) + .retry( + Schedule.recurWhile[Throwable] { + case _: RetriableCommitFailedException => true + case _ => false + } && policy + ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala deleted file mode 100644 index 69f8b9842..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ /dev/null @@ -1,47 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, RetriableCommitFailedException } -import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task } - -sealed trait Offset { - def topic: String - def partition: Int - def offset: Long - def commit: Task[Unit] - def batch: OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - final def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) - - final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition) -} - -object Offset { - private[consumer] def commitOrRetry[R, B]( - commit: Task[Unit], - policy: Schedule[R, Throwable, B] - ): RIO[R, Unit] = - commit.retry( - Schedule.recurWhile[Throwable] { - case _: RetriableCommitFailedException => true - case _ => false - } && policy - ) -} - -private final case class OffsetImpl( - topic: String, - partition: Int, - offset: Long, - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] -) extends Offset { - def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset)) - def batch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata) -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 13d718803..e7524648f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -115,14 +115,13 @@ private[consumer] final class Runloop private ( } } - private val commit: Map[TopicPartition, Long] => Task[Unit] = - offsets => - for { - p <- Promise.make[Throwable, Unit] - _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) - } yield () + def commit(offsets: Map[TopicPartition, Long]): Task[Unit] = + for { + p <- Promise.make[Throwable, Unit] + _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + } yield () private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = { val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) } @@ -214,7 +213,6 @@ private[consumer] final class Runloop private ( builder += CommittableRecord[Array[Byte], Array[Byte]]( record = consumerRecord, - commitHandle = commit, consumerGroupMetadata = consumerGroupMetadata ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 938029890..ba088222c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -7,7 +7,7 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } -import zio.{ Hub, IO, Ref, Scope, UIO, ZIO, ZLayer } +import zio.{ Hub, IO, Ref, Scope, Task, UIO, ZIO, ZLayer } private[internal] sealed trait RunloopState private[internal] object RunloopState { @@ -65,6 +65,9 @@ private[consumer] final class RunloopAccess private ( } } yield stream + def commit(offsets: Map[TopicPartition, Long]): Task[Unit] = + withRunloopZIO(true)(_.commit(offsets)) + } private[consumer] object RunloopAccess { From d213f3a1310b4eeaec71783c946b3f362fcd21cd Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 29 Oct 2023 09:05:31 +0100 Subject: [PATCH 2/2] Cleanup some methods on OffsetBatch --- .../kafka/consumer/CommittableOffset.scala | 71 ++++++++++--------- .../scala/zio/kafka/consumer/Consumer.scala | 2 +- .../zio/kafka/producer/Transaction.scala | 2 +- .../producer/TransactionalProducer.scala | 6 +- 4 files changed, 42 insertions(+), 39 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala index fc7a2988a..cdf8f5b4c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala @@ -3,17 +3,16 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } import org.apache.kafka.common.TopicPartition -trait CommittableOffset { - def offsets: Map[TopicPartition, OffsetAndMetadata] +/** + * Supertype for a single offset or a batch of offsets for multiple topic-partitions + */ +sealed trait CommittableOffset { + def offsetsAsMap: Map[TopicPartition, OffsetAndMetadata] + def consumerGroupMetadata: Option[ConsumerGroupMetadata] } -// TODO remove the trait and just keep the implementation..? sealed trait OffsetBatch extends CommittableOffset { - def add(offset: Offset): OffsetBatch - @deprecated("Use add(Offset) instead", "2.1.4") - def merge(offset: Offset): OffsetBatch - def merge(offsets: OffsetBatch): OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] + def add(offset: CommittableOffset): OffsetBatch } object OffsetBatch { @@ -23,42 +22,46 @@ object OffsetBatch { } private final case class OffsetBatchImpl( - offsets: Map[TopicPartition, OffsetAndMetadata], + offsetsAsMap: Map[TopicPartition, OffsetAndMetadata], consumerGroupMetadata: Option[ConsumerGroupMetadata] ) extends OffsetBatch { - override def add(offset: Offset): OffsetBatch = { - val maxOffsetAndMetadata = offsets.get(offset.topicPartition) match { - case Some(existing) if existing.offset > offset.offset => existing - case _ => offset.asJavaOffsetAndMetadata + def add(offset: CommittableOffset): OffsetBatch = + offset match { + case batch: OffsetBatch => merge(batch) + case offset: Offset => + val maxOffsetAndMetadata = offsetsAsMap.get(offset.topicPartition) match { + case Some(existing) if existing.offset > offset.offset => existing + case _ => offset.asJavaOffsetAndMetadata + } + + copy( + offsetsAsMap = offsetsAsMap + (offset.topicPartition -> maxOffsetAndMetadata) + ) } - copy( - offsets = offsets + (offset.topicPartition -> maxOffsetAndMetadata) - ) - } - - override def merge(offset: Offset): OffsetBatch = add(offset) - - override def merge(otherOffsets: OffsetBatch): OffsetBatch = { + private def merge(otherOffsets: OffsetBatch): OffsetBatch = { val newOffsets = Map.newBuilder[TopicPartition, OffsetAndMetadata] - newOffsets ++= offsets - otherOffsets.offsets.foreach { case (tp, offset) => - val laterOffset = offsets.get(tp) match { + newOffsets ++= offsetsAsMap + otherOffsets.offsetsAsMap.foreach { case (tp, offset) => + val laterOffset = offsetsAsMap.get(tp) match { case Some(existing) => if (existing.offset < offset.offset) offset else existing case None => offset } newOffsets += tp -> laterOffset } - copy(offsets = newOffsets.result()) + copy(offsetsAsMap = newOffsets.result()) } } case object EmptyOffsetBatch extends OffsetBatch { - override val offsets: Map[TopicPartition, OffsetAndMetadata] = Map.empty - override def add(offset: Offset): OffsetBatch = offset.batch - override def merge(offset: Offset): OffsetBatch = add(offset) - override def merge(offsets: OffsetBatch): OffsetBatch = offsets + override val offsetsAsMap: Map[TopicPartition, OffsetAndMetadata] = Map.empty + + override def add(offset: CommittableOffset): OffsetBatch = offset match { + case batch: OffsetBatch => batch + case o: Offset => OffsetBatchImpl(Map(o.topicPartition -> o.asJavaOffsetAndMetadata), consumerGroupMetadata) + } + override def consumerGroupMetadata: Option[ConsumerGroupMetadata] = None } @@ -76,16 +79,14 @@ sealed trait Offset extends CommittableOffset { final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition) } -object Offset {} - -private final case class OffsetImpl( +final case class OffsetImpl( topic: String, partition: Int, offset: Long, consumerGroupMetadata: Option[ConsumerGroupMetadata], metadata: Option[String] = None ) extends Offset { - override def batch: OffsetBatch = OffsetBatchImpl(offsets, consumerGroupMetadata) - override def offsets: Map[TopicPartition, OffsetAndMetadata] = Map(topicPartition -> asJavaOffsetAndMetadata) - def withMetadata(metadata: String): OffsetImpl = copy(metadata = Some(metadata)) + def batch: OffsetBatch = OffsetBatchImpl(offsetsAsMap, consumerGroupMetadata) + override def offsetsAsMap: Map[TopicPartition, OffsetAndMetadata] = Map(topicPartition -> asJavaOffsetAndMetadata) + def withMetadata(metadata: String): Offset = copy(metadata = Some(metadata)) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 06996fb86..5d0dba83d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -574,7 +574,7 @@ private[consumer] final class ConsumerLive private[consumer] ( override def metrics: Task[Map[MetricName, Metric]] = consumer.withConsumer(_.metrics().asScala.toMap) - override def commit(offset: CommittableOffset): Task[Unit] = runloopAccess.commit(offset.offsets) + override def commit(offset: CommittableOffset): Task[Unit] = runloopAccess.commit(offset.offsetsAsMap) override def commitOrRetry[R](offset: CommittableOffset, policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = commit(offset) diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala index 428575818..065e86571 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala @@ -82,7 +82,7 @@ final private[producer] class TransactionImpl( offsets: OffsetBatch ): RIO[R, Chunk[RecordMetadata]] = haltIfClosed *> - offsetBatchRef.update(_ merge offsets) *> + offsetBatchRef.update(_ add offsets) *> producer.produceChunk[R, K, V](records, keySerializer, valueSerializer) def abort: IO[TransactionalProducer.UserInitiatedAbort.type, Nothing] = diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 98380254b..a7fc1b5c9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -40,7 +40,7 @@ object TransactionalProducer { case None => invalidGroupIdException case Some(consumerGroupMetadata) => val offsets: util.Map[TopicPartition, OffsetAndMetadata] = - offsetBatch.offsets.map { case (topicPartition, offset) => + offsetBatch.offsetsAsMap.map { case (topicPartition, offset) => topicPartition -> new OffsetAndMetadata(offset.offset + 1, offset.metadata) }.asJava @@ -48,7 +48,9 @@ object TransactionalProducer { } } - sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) + sendOffsetsToTransaction.when(offsetBatch.offsetsAsMap.nonEmpty) *> ZIO.attemptBlocking( + live.p.commitTransaction() + ) } private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] =