From ad2c2a210ed7f9d942a0649fb5ebab156ee16c3b Mon Sep 17 00:00:00 2001 From: Flavien Bert Date: Wed, 4 Oct 2023 11:31:59 +0200 Subject: [PATCH 1/4] enable the consumer to commit an offset with metadata --- .github/workflows/ci.yml | 4 +-- .../zio/kafka/consumer/ConsumerSpec.scala | 23 ++++++++++++++++ .../kafka/consumer/CommittableRecord.scala | 9 ++++--- .../scala/zio/kafka/consumer/Offset.scala | 21 +++++++++++---- .../zio/kafka/consumer/OffsetBatch.scala | 27 +++++++++++-------- .../diagnostics/DiagnosticEvent.scala | 2 +- .../zio/kafka/consumer/internal/Runloop.scala | 17 +++++++----- .../consumer/internal/RunloopCommand.scala | 4 ++- .../producer/TransactionalProducer.scala | 2 +- 9 files changed, 77 insertions(+), 32 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f7afd9d27..7135989f0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,9 +13,7 @@ env: branches: - master - series/0.x - pull_request: - branches-ignore: - - gh-pages + pull_request: {} jobs: build: name: Build diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 48a494665..395871792 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -349,6 +349,29 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client, Some(group))) } yield assert(offsets.values.map(_.map(_.offset)))(forall(isSome(equalTo(nrMessages.toLong / nrPartitions)))) }, + test("commits an offset with metadata") { + for { + topic <- randomTopic + group <- randomGroup + metadata <- randomThing("metadata") + client <- randomClient + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = 1)) + _ <- produceOne(topic, "key", "msg") + + // Consume messages + subscription = Subscription.topics(topic) + offsets <- (Consumer + .partitionedStream(subscription, Serde.string, Serde.string) + .flatMap(_._2.map(_.offset.withMetadata(metadata))) + .take(1) + .transduce(Consumer.offsetBatches) + .take(1) + .mapZIO(_.commit) + .runDrain *> + Consumer.committed(Set(new TopicPartition(topic, 0)))) + .provideSomeLayer[Kafka](consumer(client, Some(group))) + } yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata))) + }, test("handle rebalancing by completing topic-partition streams") { val nrMessages = 50 val nrPartitions = 6 // Must be even and strictly positive diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index f9583f5a7..d138ea6d8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -1,13 +1,13 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord } +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata } import org.apache.kafka.common.TopicPartition import zio.kafka.serde.Deserializer import zio.{ RIO, Task } final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], - private val commitHandle: Map[TopicPartition, Long] => Task[Unit], + private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { def deserializeWith[R, K1, V1]( @@ -44,14 +44,15 @@ final case class CommittableRecord[K, V]( partition = record.partition(), offset = record.offset(), commitHandle = commitHandle, - consumerGroupMetadata = consumerGroupMetadata + consumerGroupMetadata = consumerGroupMetadata, + metadata = None ) } object CommittableRecord { def apply[K, V]( record: ConsumerRecord[K, V], - commitHandle: Map[TopicPartition, Long] => Task[Unit], + commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ): CommittableRecord[K, V] = new CommittableRecord( diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala index 69f8b9842..c0cd0b65c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala @@ -1,16 +1,21 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, RetriableCommitFailedException } +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata, RetriableCommitFailedException } import org.apache.kafka.common.TopicPartition import zio.{ RIO, Schedule, Task } sealed trait Offset { + def topic: String def partition: Int def offset: Long def commit: Task[Unit] def batch: OffsetBatch def consumerGroupMetadata: Option[ConsumerGroupMetadata] + def withMetadata(metadata: String): Offset + + private[consumer] def metadata: Option[String] + private[consumer] def asJavaOffsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offset, metadata.orNull) /** * Attempts to commit and retries according to the given policy when the commit fails with a @@ -39,9 +44,15 @@ private final case class OffsetImpl( topic: String, partition: Int, offset: Long, - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] + commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + consumerGroupMetadata: Option[ConsumerGroupMetadata], + metadata: Option[String] = None ) extends Offset { - def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset)) - def batch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata) + def commit: Task[Unit] = commitHandle(Map(topicPartition -> asJavaOffsetAndMetadata)) + def batch: OffsetBatch = OffsetBatchImpl( + Map(topicPartition -> asJavaOffsetAndMetadata), + commitHandle, + consumerGroupMetadata + ) + def withMetadata(metadata: String): OffsetImpl = copy(metadata = Some(metadata)) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala index 3c0c0f6cc..385e4bcc4 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala @@ -1,11 +1,11 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } import org.apache.kafka.common.TopicPartition import zio.{ RIO, Schedule, Task, ZIO } sealed trait OffsetBatch { - def offsets: Map[TopicPartition, Long] + def offsets: Map[TopicPartition, OffsetAndMetadata] def commit: Task[Unit] def add(offset: Offset): OffsetBatch @deprecated("Use add(Offset) instead", "2.1.4") @@ -28,26 +28,31 @@ object OffsetBatch { } private final case class OffsetBatchImpl( - offsets: Map[TopicPartition, Long], - commitHandle: Map[TopicPartition, Long] => Task[Unit], + offsets: Map[TopicPartition, OffsetAndMetadata], + commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ) extends OffsetBatch { override def commit: Task[Unit] = commitHandle(offsets) - override def add(offset: Offset): OffsetBatch = + override def add(offset: Offset): OffsetBatch = { + val maxOffsetAndMetadata = offsets.get(offset.topicPartition) match { + case Some(existing) if existing.offset > offset.offset => existing + case _ => offset.asJavaOffsetAndMetadata + } + copy( - offsets = offsets + (offset.topicPartition -> (offsets - .getOrElse(offset.topicPartition, -1L) max offset.offset)) + offsets = offsets + (offset.topicPartition -> maxOffsetAndMetadata) ) + } override def merge(offset: Offset): OffsetBatch = add(offset) override def merge(otherOffsets: OffsetBatch): OffsetBatch = { - val newOffsets = Map.newBuilder[TopicPartition, Long] + val newOffsets = Map.newBuilder[TopicPartition, OffsetAndMetadata] newOffsets ++= offsets otherOffsets.offsets.foreach { case (tp, offset) => - val existing = offsets.getOrElse(tp, -1L) - if (existing < offset) + val existing = offsets.getOrElse(tp, new OffsetAndMetadata(-1L)) + if (existing.offset < offset.offset) newOffsets += tp -> offset } @@ -56,7 +61,7 @@ private final case class OffsetBatchImpl( } case object EmptyOffsetBatch extends OffsetBatch { - override val offsets: Map[TopicPartition, Long] = Map.empty + override val offsets: Map[TopicPartition, OffsetAndMetadata] = Map.empty override val commit: Task[Unit] = ZIO.unit override def add(offset: Offset): OffsetBatch = offset.batch override def merge(offset: Offset): OffsetBatch = add(offset) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala index f22f2d7d5..a268c5c6c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala @@ -15,7 +15,7 @@ object DiagnosticEvent { sealed trait Commit extends DiagnosticEvent object Commit { - final case class Started(offsets: Map[TopicPartition, Long]) extends Commit + final case class Started(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit final case class Success(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: Throwable) extends Commit } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 43cf4ccf9..9de116e0b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -113,7 +113,7 @@ private[consumer] final class Runloop private ( } /** This is the implementation behind the user facing api `Offset.commit`. */ - private val commit: Map[TopicPartition, Long] => Task[Unit] = + private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = offsets => for { p <- Promise.make[Throwable, Unit] @@ -127,16 +127,21 @@ private[consumer] final class Runloop private ( commits: Chunk[RunloopCommand.Commit] ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { val offsets = commits - .foldLeft(mutable.Map.empty[TopicPartition, Long]) { case (acc, commit) => + .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => commit.offsets.foreach { case (tp, offset) => - acc += (tp -> acc.get(tp).map(_ max offset).getOrElse(offset)) + acc += (tp -> acc + .get(tp) + .map(current => if (current.offset() > offset.offset()) current else offset) + .getOrElse(offset)) } acc } .toMap - val offsetsWithMetaData = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) } - val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) - val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) + val offsetsWithMetaData = offsets.map { case (tp, offset) => + tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) + } + val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) + val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) val onFailure: Throwable => UIO[Unit] = { case _: RebalanceInProgressException => for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala index a750b077a..a5259b2c2 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala @@ -1,5 +1,6 @@ package zio.kafka.consumer.internal +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import zio._ import zio.kafka.consumer.{ InvalidSubscriptionUnion, Subscription } @@ -19,7 +20,8 @@ object RunloopCommand { case object StopRunloop extends Control case object StopAllStreams extends StreamCommand - final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends RunloopCommand { + final case class Commit(offsets: Map[TopicPartition, OffsetAndMetadata], cont: Promise[Throwable, Unit]) + extends RunloopCommand { @inline def isDone: UIO[Boolean] = cont.isDone @inline def isPending: UIO[Boolean] = isDone.negate } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index b5b1d5dce..98380254b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -41,7 +41,7 @@ object TransactionalProducer { case Some(consumerGroupMetadata) => val offsets: util.Map[TopicPartition, OffsetAndMetadata] = offsetBatch.offsets.map { case (topicPartition, offset) => - topicPartition -> new OffsetAndMetadata(offset + 1) + topicPartition -> new OffsetAndMetadata(offset.offset + 1, offset.metadata) }.asJava ZIO.attemptBlocking(live.p.sendOffsetsToTransaction(offsets, consumerGroupMetadata)) From 5285caa8b149dd1282942af0f99565bbab83d8ba Mon Sep 17 00:00:00 2001 From: Flavien Bert Date: Thu, 12 Oct 2023 14:35:08 +0200 Subject: [PATCH 2/4] revert ci workflow --- .github/workflows/ci.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7135989f0..f7afd9d27 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,9 @@ env: branches: - master - series/0.x - pull_request: {} + pull_request: + branches-ignore: + - gh-pages jobs: build: name: Build From 76ed412d8c2c0b201f4362af3675e5626fd8c72b Mon Sep 17 00:00:00 2001 From: Jules Ivanic Date: Fri, 27 Oct 2023 13:03:06 +0400 Subject: [PATCH 3/4] Update zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala Co-authored-by: Erik van Oosten --- .../src/main/scala/zio/kafka/consumer/OffsetBatch.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala index 385e4bcc4..cd6dafacb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala @@ -51,9 +51,11 @@ private final case class OffsetBatchImpl( val newOffsets = Map.newBuilder[TopicPartition, OffsetAndMetadata] newOffsets ++= offsets otherOffsets.offsets.foreach { case (tp, offset) => - val existing = offsets.getOrElse(tp, new OffsetAndMetadata(-1L)) - if (existing.offset < offset.offset) - newOffsets += tp -> offset + val laterOffset = offsets.get(tp) match { + case Some(existing) => if (existing.offset < offset.offset) offset else existing + case None => offset + } + newOffsets += tp -> laterOffset } copy(offsets = newOffsets.result()) From 83a75fb3c8e16f5675af0f28db47aca94dd37309 Mon Sep 17 00:00:00 2001 From: Flavien Bert Date: Fri, 27 Oct 2023 17:41:10 +0200 Subject: [PATCH 4/4] fmt code --- zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala index cd6dafacb..6c4b5a977 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala @@ -53,7 +53,7 @@ private final case class OffsetBatchImpl( otherOffsets.offsets.foreach { case (tp, offset) => val laterOffset = offsets.get(tp) match { case Some(existing) => if (existing.offset < offset.offset) offset else existing - case None => offset + case None => offset } newOffsets += tp -> laterOffset }