Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer.commit method instead of record.offset.commit #1093

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
.aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis))
.tap(batch => counter.update(_ + batch.size))
.map(OffsetBatch.apply)
.mapZIO(_.commit)
.mapZIO(Consumer.commit)
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.runDrain
.provideSome[Kafka](env)
Expand Down
11 changes: 7 additions & 4 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))

offsetBatch.commit.as(records)
Consumer.commit(offsetBatch).as(records)
}
.runCollect
.provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID)))
Expand Down Expand Up @@ -301,7 +301,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.map(_.offset)
.foreach(Consumer.commit)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -344,7 +345,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.map(_.offset)
.foreach(Consumer.commit)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -645,7 +647,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
groupInstanceId: Option[String] = None
): ZIO[Kafka, Throwable, Unit] = Consumer
.plainStream(Subscription.topics(topicName), Serde.string, Serde.string)
.foreach(_.offset.commit)
.map(_.offset)
.foreach(Consumer.commit)
.provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId))

private def getStableConsumerGroupDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))

offsetBatch.commit.as(records)
Consumer.commit(offsetBatch).as(records)
}
.runCollect
.provideSomeLayer[Kafka](
Expand All @@ -214,7 +214,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))

offsetBatch.commit.as(records)
Consumer.commit(offsetBatch).as(records)
}
.runCollect
.provideSomeLayer[Kafka](
Expand Down Expand Up @@ -287,7 +287,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.zipWithIndex
.tap { case (record, idx) =>
(Consumer.stopConsumption <* ZIO.logDebug("Stopped consumption")).when(idx == 3) *>
record.offset.commit <* ZIO.logDebug(s"Committed $idx")
Consumer.commit(record.offset) <* ZIO.logDebug(s"Committed $idx")
}
.tap { case (_, idx) => ZIO.logDebug(s"Consumed $idx") }
.runDrain
Expand Down Expand Up @@ -315,7 +315,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield if (nr < 10) Seq(record.offset) else Seq.empty
}
.transduce(Consumer.offsetBatches)
.mapZIO(_.commit)
.mapZIO(Consumer.commit)
.runDrain *>
Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head))
.provideSomeLayer[Kafka](consumer(client, Some(group)))
Expand Down Expand Up @@ -410,7 +410,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.take(nrMessages.toLong)
.transduce(Consumer.offsetBatches)
.take(1)
.mapZIO(_.commit)
.mapZIO(Consumer.commit)
.runDrain *>
Consumer.committed((0 until nrPartitions).map(new TopicPartition(topic, _)).toSet))
.provideSomeLayer[Kafka](consumer(client, Some(group)))
Expand All @@ -433,7 +433,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.take(1)
.transduce(Consumer.offsetBatches)
.take(1)
.mapZIO(_.commit)
.mapZIO(Consumer.commit)
.runDrain *>
Consumer.committed(Set(new TopicPartition(topic, 0))))
.provideSomeLayer[Kafka](consumer(client, Some(group)))
Expand Down Expand Up @@ -555,7 +555,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))

offsetBatch.commit.as(records)
Consumer.commit(offsetBatch).as(records)
}
.runCollect
.provideSomeLayer[Kafka](consumer(client1, Some(group)))
Expand Down Expand Up @@ -765,7 +765,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *>
partitionStream.mapChunksZIO { records =>
OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition)
Consumer.commit(OffsetBatch(records.map(_.offset))) *> messagesReceived(tp.partition)
.update(_ + records.size)
.as(records)
}
Expand Down Expand Up @@ -904,7 +904,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.mapZIO(record => messagesReceivedConsumer1.update(_ + 1).as(record))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(offsetBatch => offsetBatch.commit)
.mapZIO(Consumer.commit)
.runDrain
}
.mapZIO(_ => drainCount.updateAndGet(_ + 1))
Expand All @@ -929,7 +929,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.mapZIO(record => messagesReceivedConsumer2.update(_ + 1).as(record))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(offsetBatch => offsetBatch.commit)
.mapZIO(Consumer.commit)
.runDrain
.provideSomeLayer[Kafka](
customConsumer("consumer2", Some(group))
Expand Down Expand Up @@ -1303,7 +1303,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.take(11)
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit) // Hangs without timeout
.mapZIO(Consumer.commit) // Hangs without timeout
.runDrain
.exit
.provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink
.collectAll[CommittableRecord[String, String]]
)
.mapZIO { case (offsetBatch, records) => offsetBatch.commit.as(records) }
.mapZIO { case (offsetBatch, records) => Consumer.commit(offsetBatch).as(records) }
.flattenChunks
.runCollect
.tap(records => recordsConsumed.update(_ ++ records))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition

/**
* Supertype for a single offset or a batch of offsets for multiple topic-partitions
*/
sealed trait CommittableOffset {
def offsetsAsMap: Map[TopicPartition, OffsetAndMetadata]
def consumerGroupMetadata: Option[ConsumerGroupMetadata]
}

sealed trait OffsetBatch extends CommittableOffset {
def add(offset: CommittableOffset): OffsetBatch
}

object OffsetBatch {
val empty: OffsetBatch = EmptyOffsetBatch

def apply(offsets: Iterable[Offset]): OffsetBatch = offsets.foldLeft(empty)(_ add _)
}

private final case class OffsetBatchImpl(
offsetsAsMap: Map[TopicPartition, OffsetAndMetadata],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
) extends OffsetBatch {
def add(offset: CommittableOffset): OffsetBatch =
offset match {
case batch: OffsetBatch => merge(batch)
case offset: Offset =>
val maxOffsetAndMetadata = offsetsAsMap.get(offset.topicPartition) match {
case Some(existing) if existing.offset > offset.offset => existing
case _ => offset.asJavaOffsetAndMetadata
}

copy(
offsetsAsMap = offsetsAsMap + (offset.topicPartition -> maxOffsetAndMetadata)
)
}

private def merge(otherOffsets: OffsetBatch): OffsetBatch = {
val newOffsets = Map.newBuilder[TopicPartition, OffsetAndMetadata]
newOffsets ++= offsetsAsMap
otherOffsets.offsetsAsMap.foreach { case (tp, offset) =>
val laterOffset = offsetsAsMap.get(tp) match {
case Some(existing) => if (existing.offset < offset.offset) offset else existing
case None => offset
}
newOffsets += tp -> laterOffset
}

copy(offsetsAsMap = newOffsets.result())
}
}

case object EmptyOffsetBatch extends OffsetBatch {
override val offsetsAsMap: Map[TopicPartition, OffsetAndMetadata] = Map.empty

override def add(offset: CommittableOffset): OffsetBatch = offset match {
case batch: OffsetBatch => batch
case o: Offset => OffsetBatchImpl(Map(o.topicPartition -> o.asJavaOffsetAndMetadata), consumerGroupMetadata)
}

override def consumerGroupMetadata: Option[ConsumerGroupMetadata] = None
}

sealed trait Offset extends CommittableOffset {
def topic: String
def partition: Int
def offset: Long
def batch: OffsetBatch
def consumerGroupMetadata: Option[ConsumerGroupMetadata]
def withMetadata(metadata: String): Offset

private[consumer] def metadata: Option[String]
private[consumer] def asJavaOffsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offset, metadata.orNull)

final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition)
}

final case class OffsetImpl(
topic: String,
partition: Int,
offset: Long,
consumerGroupMetadata: Option[ConsumerGroupMetadata],
metadata: Option[String] = None
) extends Offset {
def batch: OffsetBatch = OffsetBatchImpl(offsetsAsMap, consumerGroupMetadata)
override def offsetsAsMap: Map[TopicPartition, OffsetAndMetadata] = Map(topicPartition -> asJavaOffsetAndMetadata)
def withMetadata(metadata: String): Offset = copy(metadata = Some(metadata))
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord }
import zio.RIO
import zio.kafka.serde.Deserializer
import zio.{ RIO, Task }

// TODO name CommittableRecord is now a bit meaningless
final case class CommittableRecord[K, V](
record: ConsumerRecord[K, V],
private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
private val consumerGroupMetadata: Option[ConsumerGroupMetadata]
) {
def deserializeWith[R, K1, V1](
Expand Down Expand Up @@ -43,7 +42,6 @@ final case class CommittableRecord[K, V](
topic = record.topic(),
partition = record.partition(),
offset = record.offset(),
commitHandle = commitHandle,
consumerGroupMetadata = consumerGroupMetadata,
metadata = None
)
Expand All @@ -52,12 +50,10 @@ final case class CommittableRecord[K, V](
object CommittableRecord {
def apply[K, V](
record: ConsumerRecord[K, V],
commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
): CommittableRecord[K, V] =
new CommittableRecord(
record = record,
commitHandle = commitHandle,
consumerGroupMetadata = consumerGroupMetadata
)
}
38 changes: 36 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import org.apache.kafka.clients.consumer.{
Consumer => JConsumer,
ConsumerRecord,
OffsetAndMetadata,
OffsetAndTimestamp
OffsetAndTimestamp,
RetriableCommitFailedException
}
import org.apache.kafka.common._
import zio._
Expand Down Expand Up @@ -157,6 +158,14 @@ trait Consumer {
* Expose internal consumer metrics
*/
def metrics: Task[Map[MetricName, Metric]]

def commit(offset: CommittableOffset): Task[Unit]

/**
* Attempts to commit and retries according to the given policy when the commit fails with a
* RetriableCommitFailedException
*/
def commitOrRetry[R](offset: CommittableOffset, policy: Schedule[R, Throwable, Any]): RIO[R, Unit]
}

object Consumer {
Expand Down Expand Up @@ -392,6 +401,21 @@ object Consumer {
def metrics: RIO[Consumer, Map[MetricName, Metric]] =
ZIO.serviceWithZIO(_.metrics)

/**
* Accessor method
*/
def commit(offset: CommittableOffset): RIO[Consumer, Unit] =
ZIO.serviceWithZIO(_.commit(offset))

/**
* Accessor method
*/
def commitOrRetry[R](
offset: CommittableOffset,
policy: Schedule[R, Throwable, Any]
): ZIO[R with Consumer, Throwable, Unit] =
ZIO.serviceWithZIO[Consumer](_.commitOrRetry[R](offset, policy))

sealed trait OffsetRetrieval
object OffsetRetrieval {
final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval
Expand Down Expand Up @@ -517,7 +541,7 @@ private[consumer] final class ConsumerLive private[consumer] (
}
.provideEnvironment(r)
.aggregateAsync(offsetBatches)
.mapZIO(_.commitOrRetry(commitRetryPolicy))
.mapZIO(commitOrRetry(_, commitRetryPolicy))
.runDrain
} yield ()

Expand Down Expand Up @@ -550,4 +574,14 @@ private[consumer] final class ConsumerLive private[consumer] (
override def metrics: Task[Map[MetricName, Metric]] =
consumer.withConsumer(_.metrics().asScala.toMap)

override def commit(offset: CommittableOffset): Task[Unit] = runloopAccess.commit(offset.offsetsAsMap)

override def commitOrRetry[R](offset: CommittableOffset, policy: Schedule[R, Throwable, Any]): RIO[R, Unit] =
commit(offset)
.retry(
Schedule.recurWhile[Throwable] {
case _: RetriableCommitFailedException => true
case _ => false
} && policy
)
}
Loading
Loading