diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala index dee0493da..dcd1853ba 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala @@ -67,7 +67,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { .aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis)) .tap(batch => counter.update(_ + batch.size)) .map(OffsetBatch.apply) - .mapZIO(_.commit) + .mapZIO(Consumer.commit) .takeUntilZIO(_ => counter.get.map(_ >= nrMessages)) .runDrain .provideSome[Kafka](env) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index 33416e7d0..756c7e76c 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -233,7 +233,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val records = committableRecords.map(_.record) val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - offsetBatch.commit.as(records) + Consumer.commit(offsetBatch).as(records) } .runCollect .provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID))) @@ -301,7 +301,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .map(_.offset) + .foreach(Consumer.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -344,7 +345,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .map(_.offset) + .foreach(Consumer.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -645,7 +647,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { groupInstanceId: Option[String] = None ): ZIO[Kafka, Throwable, Unit] = Consumer .plainStream(Subscription.topics(topicName), Serde.string, Serde.string) - .foreach(_.offset.commit) + .map(_.offset) + .foreach(Consumer.commit) .provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId)) private def getStableConsumerGroupDescription( diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 63cfc269d..3227dcdcb 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -196,7 +196,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val records = committableRecords.map(_.record) val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - offsetBatch.commit.as(records) + Consumer.commit(offsetBatch).as(records) } .runCollect .provideSomeLayer[Kafka]( @@ -214,7 +214,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val records = committableRecords.map(_.record) val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - offsetBatch.commit.as(records) + Consumer.commit(offsetBatch).as(records) } .runCollect .provideSomeLayer[Kafka]( @@ -287,7 +287,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .zipWithIndex .tap { case (record, idx) => (Consumer.stopConsumption <* ZIO.logDebug("Stopped consumption")).when(idx == 3) *> - record.offset.commit <* ZIO.logDebug(s"Committed $idx") + Consumer.commit(record.offset) <* ZIO.logDebug(s"Committed $idx") } .tap { case (_, idx) => ZIO.logDebug(s"Consumed $idx") } .runDrain @@ -315,7 +315,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield if (nr < 10) Seq(record.offset) else Seq.empty } .transduce(Consumer.offsetBatches) - .mapZIO(_.commit) + .mapZIO(Consumer.commit) .runDrain *> Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -410,7 +410,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(nrMessages.toLong) .transduce(Consumer.offsetBatches) .take(1) - .mapZIO(_.commit) + .mapZIO(Consumer.commit) .runDrain *> Consumer.committed((0 until nrPartitions).map(new TopicPartition(topic, _)).toSet)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -433,7 +433,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(1) .transduce(Consumer.offsetBatches) .take(1) - .mapZIO(_.commit) + .mapZIO(Consumer.commit) .runDrain *> Consumer.committed(Set(new TopicPartition(topic, 0)))) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -555,7 +555,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { val records = committableRecords.map(_.record) val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - offsetBatch.commit.as(records) + Consumer.commit(offsetBatch).as(records) } .runCollect .provideSomeLayer[Kafka](consumer(client1, Some(group))) @@ -765,7 +765,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *> partitionStream.mapChunksZIO { records => - OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition) + Consumer.commit(OffsetBatch(records.map(_.offset))) *> messagesReceived(tp.partition) .update(_ + records.size) .as(records) } @@ -904,7 +904,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .mapZIO(record => messagesReceivedConsumer1.update(_ + 1).as(record)) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapZIO(Consumer.commit) .runDrain } .mapZIO(_ => drainCount.updateAndGet(_ + 1)) @@ -929,7 +929,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .mapZIO(record => messagesReceivedConsumer2.update(_ + 1).as(record)) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapZIO(Consumer.commit) .runDrain .provideSomeLayer[Kafka]( customConsumer("consumer2", Some(group)) @@ -1303,7 +1303,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(11) .map(_.offset) .aggregateAsync(Consumer.offsetBatches) - .mapZIO(_.commit) // Hangs without timeout + .mapZIO(Consumer.commit) // Hangs without timeout .runDrain .exit .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds)) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index d33bdab52..60234e670 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -270,7 +270,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink .collectAll[CommittableRecord[String, String]] ) - .mapZIO { case (offsetBatch, records) => offsetBatch.commit.as(records) } + .mapZIO { case (offsetBatch, records) => Consumer.commit(offsetBatch).as(records) } .flattenChunks .runCollect .tap(records => recordsConsumed.update(_ ++ records)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala new file mode 100644 index 000000000..cdf8f5b4c --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableOffset.scala @@ -0,0 +1,92 @@ +package zio.kafka.consumer + +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } +import org.apache.kafka.common.TopicPartition + +/** + * Supertype for a single offset or a batch of offsets for multiple topic-partitions + */ +sealed trait CommittableOffset { + def offsetsAsMap: Map[TopicPartition, OffsetAndMetadata] + def consumerGroupMetadata: Option[ConsumerGroupMetadata] +} + +sealed trait OffsetBatch extends CommittableOffset { + def add(offset: CommittableOffset): OffsetBatch +} + +object OffsetBatch { + val empty: OffsetBatch = EmptyOffsetBatch + + def apply(offsets: Iterable[Offset]): OffsetBatch = offsets.foldLeft(empty)(_ add _) +} + +private final case class OffsetBatchImpl( + offsetsAsMap: Map[TopicPartition, OffsetAndMetadata], + consumerGroupMetadata: Option[ConsumerGroupMetadata] +) extends OffsetBatch { + def add(offset: CommittableOffset): OffsetBatch = + offset match { + case batch: OffsetBatch => merge(batch) + case offset: Offset => + val maxOffsetAndMetadata = offsetsAsMap.get(offset.topicPartition) match { + case Some(existing) if existing.offset > offset.offset => existing + case _ => offset.asJavaOffsetAndMetadata + } + + copy( + offsetsAsMap = offsetsAsMap + (offset.topicPartition -> maxOffsetAndMetadata) + ) + } + + private def merge(otherOffsets: OffsetBatch): OffsetBatch = { + val newOffsets = Map.newBuilder[TopicPartition, OffsetAndMetadata] + newOffsets ++= offsetsAsMap + otherOffsets.offsetsAsMap.foreach { case (tp, offset) => + val laterOffset = offsetsAsMap.get(tp) match { + case Some(existing) => if (existing.offset < offset.offset) offset else existing + case None => offset + } + newOffsets += tp -> laterOffset + } + + copy(offsetsAsMap = newOffsets.result()) + } +} + +case object EmptyOffsetBatch extends OffsetBatch { + override val offsetsAsMap: Map[TopicPartition, OffsetAndMetadata] = Map.empty + + override def add(offset: CommittableOffset): OffsetBatch = offset match { + case batch: OffsetBatch => batch + case o: Offset => OffsetBatchImpl(Map(o.topicPartition -> o.asJavaOffsetAndMetadata), consumerGroupMetadata) + } + + override def consumerGroupMetadata: Option[ConsumerGroupMetadata] = None +} + +sealed trait Offset extends CommittableOffset { + def topic: String + def partition: Int + def offset: Long + def batch: OffsetBatch + def consumerGroupMetadata: Option[ConsumerGroupMetadata] + def withMetadata(metadata: String): Offset + + private[consumer] def metadata: Option[String] + private[consumer] def asJavaOffsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offset, metadata.orNull) + + final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition) +} + +final case class OffsetImpl( + topic: String, + partition: Int, + offset: Long, + consumerGroupMetadata: Option[ConsumerGroupMetadata], + metadata: Option[String] = None +) extends Offset { + def batch: OffsetBatch = OffsetBatchImpl(offsetsAsMap, consumerGroupMetadata) + override def offsetsAsMap: Map[TopicPartition, OffsetAndMetadata] = Map(topicPartition -> asJavaOffsetAndMetadata) + def withMetadata(metadata: String): Offset = copy(metadata = Some(metadata)) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index d138ea6d8..3ef97b81f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -1,13 +1,12 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata } -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord } +import zio.RIO import zio.kafka.serde.Deserializer -import zio.{ RIO, Task } +// TODO name CommittableRecord is now a bit meaningless final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], - private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { def deserializeWith[R, K1, V1]( @@ -43,7 +42,6 @@ final case class CommittableRecord[K, V]( topic = record.topic(), partition = record.partition(), offset = record.offset(), - commitHandle = commitHandle, consumerGroupMetadata = consumerGroupMetadata, metadata = None ) @@ -52,12 +50,10 @@ final case class CommittableRecord[K, V]( object CommittableRecord { def apply[K, V]( record: ConsumerRecord[K, V], - commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ): CommittableRecord[K, V] = new CommittableRecord( record = record, - commitHandle = commitHandle, consumerGroupMetadata = consumerGroupMetadata ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index a76169763..5d0dba83d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -4,7 +4,8 @@ import org.apache.kafka.clients.consumer.{ Consumer => JConsumer, ConsumerRecord, OffsetAndMetadata, - OffsetAndTimestamp + OffsetAndTimestamp, + RetriableCommitFailedException } import org.apache.kafka.common._ import zio._ @@ -157,6 +158,14 @@ trait Consumer { * Expose internal consumer metrics */ def metrics: Task[Map[MetricName, Metric]] + + def commit(offset: CommittableOffset): Task[Unit] + + /** + * Attempts to commit and retries according to the given policy when the commit fails with a + * RetriableCommitFailedException + */ + def commitOrRetry[R](offset: CommittableOffset, policy: Schedule[R, Throwable, Any]): RIO[R, Unit] } object Consumer { @@ -392,6 +401,21 @@ object Consumer { def metrics: RIO[Consumer, Map[MetricName, Metric]] = ZIO.serviceWithZIO(_.metrics) + /** + * Accessor method + */ + def commit(offset: CommittableOffset): RIO[Consumer, Unit] = + ZIO.serviceWithZIO(_.commit(offset)) + + /** + * Accessor method + */ + def commitOrRetry[R]( + offset: CommittableOffset, + policy: Schedule[R, Throwable, Any] + ): ZIO[R with Consumer, Throwable, Unit] = + ZIO.serviceWithZIO[Consumer](_.commitOrRetry[R](offset, policy)) + sealed trait OffsetRetrieval object OffsetRetrieval { final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval @@ -517,7 +541,7 @@ private[consumer] final class ConsumerLive private[consumer] ( } .provideEnvironment(r) .aggregateAsync(offsetBatches) - .mapZIO(_.commitOrRetry(commitRetryPolicy)) + .mapZIO(commitOrRetry(_, commitRetryPolicy)) .runDrain } yield () @@ -550,4 +574,14 @@ private[consumer] final class ConsumerLive private[consumer] ( override def metrics: Task[Map[MetricName, Metric]] = consumer.withConsumer(_.metrics().asScala.toMap) + override def commit(offset: CommittableOffset): Task[Unit] = runloopAccess.commit(offset.offsetsAsMap) + + override def commitOrRetry[R](offset: CommittableOffset, policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = + commit(offset) + .retry( + Schedule.recurWhile[Throwable] { + case _: RetriableCommitFailedException => true + case _ => false + } && policy + ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala deleted file mode 100644 index c0cd0b65c..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ /dev/null @@ -1,58 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata, RetriableCommitFailedException } -import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task } - -sealed trait Offset { - - def topic: String - def partition: Int - def offset: Long - def commit: Task[Unit] - def batch: OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] - def withMetadata(metadata: String): Offset - - private[consumer] def metadata: Option[String] - private[consumer] def asJavaOffsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offset, metadata.orNull) - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - final def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) - - final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition) -} - -object Offset { - private[consumer] def commitOrRetry[R, B]( - commit: Task[Unit], - policy: Schedule[R, Throwable, B] - ): RIO[R, Unit] = - commit.retry( - Schedule.recurWhile[Throwable] { - case _: RetriableCommitFailedException => true - case _ => false - } && policy - ) -} - -private final case class OffsetImpl( - topic: String, - partition: Int, - offset: Long, - commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata], - metadata: Option[String] = None -) extends Offset { - def commit: Task[Unit] = commitHandle(Map(topicPartition -> asJavaOffsetAndMetadata)) - def batch: OffsetBatch = OffsetBatchImpl( - Map(topicPartition -> asJavaOffsetAndMetadata), - commitHandle, - consumerGroupMetadata - ) - def withMetadata(metadata: String): OffsetImpl = copy(metadata = Some(metadata)) -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala deleted file mode 100644 index 6c4b5a977..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ /dev/null @@ -1,72 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } -import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task, ZIO } - -sealed trait OffsetBatch { - def offsets: Map[TopicPartition, OffsetAndMetadata] - def commit: Task[Unit] - def add(offset: Offset): OffsetBatch - @deprecated("Use add(Offset) instead", "2.1.4") - def merge(offset: Offset): OffsetBatch - def merge(offsets: OffsetBatch): OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) -} - -object OffsetBatch { - val empty: OffsetBatch = EmptyOffsetBatch - - def apply(offsets: Iterable[Offset]): OffsetBatch = offsets.foldLeft(empty)(_ add _) -} - -private final case class OffsetBatchImpl( - offsets: Map[TopicPartition, OffsetAndMetadata], - commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] -) extends OffsetBatch { - override def commit: Task[Unit] = commitHandle(offsets) - - override def add(offset: Offset): OffsetBatch = { - val maxOffsetAndMetadata = offsets.get(offset.topicPartition) match { - case Some(existing) if existing.offset > offset.offset => existing - case _ => offset.asJavaOffsetAndMetadata - } - - copy( - offsets = offsets + (offset.topicPartition -> maxOffsetAndMetadata) - ) - } - - override def merge(offset: Offset): OffsetBatch = add(offset) - - override def merge(otherOffsets: OffsetBatch): OffsetBatch = { - val newOffsets = Map.newBuilder[TopicPartition, OffsetAndMetadata] - newOffsets ++= offsets - otherOffsets.offsets.foreach { case (tp, offset) => - val laterOffset = offsets.get(tp) match { - case Some(existing) => if (existing.offset < offset.offset) offset else existing - case None => offset - } - newOffsets += tp -> laterOffset - } - - copy(offsets = newOffsets.result()) - } -} - -case object EmptyOffsetBatch extends OffsetBatch { - override val offsets: Map[TopicPartition, OffsetAndMetadata] = Map.empty - override val commit: Task[Unit] = ZIO.unit - override def add(offset: Offset): OffsetBatch = offset.batch - override def merge(offset: Offset): OffsetBatch = add(offset) - override def merge(offsets: OffsetBatch): OffsetBatch = offsets - override def consumerGroupMetadata: Option[ConsumerGroupMetadata] = None -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index c7aeb6659..e6ee3d8c0 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -112,15 +112,13 @@ private[consumer] final class Runloop private ( } } - /** This is the implementation behind the user facing api `Offset.commit`. */ - private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = - offsets => - for { - p <- Promise.make[Throwable, Unit] - _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) - } yield () + def commit(offsets: Map[TopicPartition, OffsetAndMetadata]): Task[Unit] = + for { + p <- Promise.make[Throwable, Unit] + _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + } yield () /** Merge commits and prepare parameters for calling `consumer.commitAsync`. */ private def asyncCommitParameters( @@ -241,7 +239,6 @@ private[consumer] final class Runloop private ( builder += CommittableRecord[Array[Byte], Array[Byte]]( record = consumerRecord, - commitHandle = commit, consumerGroupMetadata = consumerGroupMetadata ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 3cacf9164..486bbe85a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -8,7 +8,9 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } +import zio.{ Hub, IO, Ref, Scope, Task, UIO, ZIO, ZLayer } import zio._ +import org.apache.kafka.clients.consumer.OffsetAndMetadata import scala.util.Try @@ -68,6 +70,9 @@ private[consumer] final class RunloopAccess private ( } } yield stream + def commit(offsets: Map[TopicPartition, OffsetAndMetadata]): Task[Unit] = + withRunloopZIO(true)(_.commit(offsets)) + } private[consumer] object RunloopAccess { diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala index 428575818..065e86571 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala @@ -82,7 +82,7 @@ final private[producer] class TransactionImpl( offsets: OffsetBatch ): RIO[R, Chunk[RecordMetadata]] = haltIfClosed *> - offsetBatchRef.update(_ merge offsets) *> + offsetBatchRef.update(_ add offsets) *> producer.produceChunk[R, K, V](records, keySerializer, valueSerializer) def abort: IO[TransactionalProducer.UserInitiatedAbort.type, Nothing] = diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 98380254b..a7fc1b5c9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -40,7 +40,7 @@ object TransactionalProducer { case None => invalidGroupIdException case Some(consumerGroupMetadata) => val offsets: util.Map[TopicPartition, OffsetAndMetadata] = - offsetBatch.offsets.map { case (topicPartition, offset) => + offsetBatch.offsetsAsMap.map { case (topicPartition, offset) => topicPartition -> new OffsetAndMetadata(offset.offset + 1, offset.metadata) }.asJava @@ -48,7 +48,9 @@ object TransactionalProducer { } } - sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) + sendOffsetsToTransaction.when(offsetBatch.offsetsAsMap.nonEmpty) *> ZIO.attemptBlocking( + live.p.commitTransaction() + ) } private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] =