diff --git a/docs/preventing-duplicates.md b/docs/preventing-duplicates.md
index 4b350e974..7c87f1f62 100644
--- a/docs/preventing-duplicates.md
+++ b/docs/preventing-duplicates.md
@@ -26,6 +26,39 @@ a partition.
For this to work correctly, your program must process a chunk of records within max-rebalance-duration. The clock
starts the moment the chunk is pushed into the stream and ends when the commits for these records complete.
-For more information see the scaladocs in `ConsumerSettings`, read the description of
-[pull request #1098](https://github.com/zio/zio-kafka/pull/1098) that introduced this feature, or watch the presentation
+In addition, your program must commit the offsets of consumed records. The most straightforward way is to commit to the
+Kafka brokers. This is done by calling `.commit` on the offset of consumed records (see the consumer documentation).
+However, there are more options: external commits and transactional producing.
+
+### Commit to an external system
+
+When you commit to an external system (e.g. by writing to a relational database) the zio-kafka consumer needs to know
+about those commits before it can work in rebalance-safe-commits mode. Inform zio-kafka about external commits by
+invoking method `Consumer.registerExternalCommits(offsetBatch: OffsetBatch)` (available since zio-kafka 2.10.0).
+
+Here is what this could look like:
+
+```scala
+import zio.kafka.consumer._
+
+consumer.plainStream(Subscription.topics("topic2000"), Serde.string, Serde.string)
+ .mapZIO { record =>
+ database.store(record.offset) *> // <-- the external commit
+ consumer.registerExternalCommits(OffsetBatch(record.offset))
+ }
+ .runDrain
+```
+
+### Commit with a transactional producer
+
+Although transactional producing is possible with zio-kafka, it is not easy and the code is very messy (see
+`ConsumerSpec` for an example). Transactional producing can not be used in combination with rebalance-safe-commits mode.
+
+Zio-kafka v3.0.0 will make transactional producing much easier.
+
+## More information
+
+There is more information in the scaladocs of `ConsumerSettings` and the description of
+[pull request #1098](https://github.com/zio/zio-kafka/pull/1098) that introduced this feature.
+You can also watch the presentation
[Making ZIO-Kafka Safer And Faster](https://www.youtube.com/watch?v=MJoRwEyyVxM). The relevant part starts at 10:24.
diff --git a/zio-kafka-test/src/test/resources/logback.xml b/zio-kafka-test/src/test/resources/logback.xml
index 59861dd6b..a3b439724 100644
--- a/zio-kafka-test/src/test/resources/logback.xml
+++ b/zio-kafka-test/src/test/resources/logback.xml
@@ -16,6 +16,7 @@
+
diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
index ce7cbae65..8dd44fbee 100644
--- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
+++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
@@ -28,6 +28,8 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._
+import java.time.Instant
+import java.time.temporal.ChronoUnit
import java.util.concurrent.ExecutionException
import scala.reflect.ClassTag
@@ -888,6 +890,119 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
testForPartitionAssignmentStrategy[CooperativeStickyAssignor]
)
}: _*),
+ test("external commits are used when rebalanceSafeCommits is enabled") {
+
+ /*
+ * Outline of this test
+ * - A producer generates some messages on every partition of a topic (2 partitions),
+ * - Consumer 1 starts reading from the topic. It is the only consumer so it handles all partitions. This
+ * consumer has `rebalanceSafeCommits` enabled. It does not commit offsets, but it does register external
+ * commits (it does not actually commit anywhere). In addition we set `maxRebalanceDuration` to 20 seconds.
+ * - After a few messages consumer 2 is started and a rebalance starts.
+ * - We measure how long the rebalance takes.
+ *
+ * When the rebalance finishes immediately, we know that the external commits were used. If it finishes in 20
+ * seconds, we know that the external commits were not used.
+ */
+ val partitionCount = 2
+
+ def makeConsumer(
+ clientId: String,
+ groupId: String,
+ rebalanceSafeCommits: Boolean,
+ diagnostics: Diagnostics
+ ): ZIO[Scope with Kafka, Throwable, Consumer] =
+ for {
+ settings <- consumerSettings(
+ clientId = clientId,
+ groupId = Some(groupId),
+ `max.poll.records` = 1,
+ rebalanceSafeCommits = rebalanceSafeCommits,
+ maxRebalanceDuration = 20.seconds,
+ commitTimeout = 1.second
+ )
+ consumer <- Consumer.make(settings, diagnostics)
+ } yield consumer
+
+ for {
+ topic <- randomTopic
+ subscription = Subscription.topics(topic)
+ clientId1 <- randomClient
+ clientId2 <- randomClient
+ groupId <- randomGroup
+ _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = partitionCount))
+ // Produce one message to all partitions, every 500 ms
+ _ <- ZStream
+ .fromSchedule(Schedule.fixed(500.millis))
+ .mapZIO { i =>
+ ZIO.foreachDiscard(0 until partitionCount) { p =>
+ produceMany(topic, p, Seq((s"key-$p-$i", s"msg-$p-$i")))
+ }
+ }
+ .runDrain
+ .fork
+ _ <- ZIO.logDebug("Starting consumer 1")
+ rebalanceEndTimePromise <- Promise.make[Nothing, Instant]
+ c1Diagnostics = new Diagnostics {
+ override def emit(event: => DiagnosticEvent): UIO[Unit] = event match {
+ case r: DiagnosticEvent.Rebalance if r.assigned.size == 1 =>
+ ZIO.logDebug(s"Rebalance finished: $r") *>
+ Clock.instant.flatMap(rebalanceEndTimePromise.succeed).unit
+ case r: DiagnosticEvent.Rebalance =>
+ ZIO.logDebug(s"Rebalance finished: $r")
+ case _ => ZIO.unit
+ }
+ }
+ c1 <- makeConsumer(clientId1, groupId, rebalanceSafeCommits = true, c1Diagnostics)
+ c1Started <- Promise.make[Nothing, Unit]
+ c1Offsets <- Ref.make(Chunk.empty[Offset])
+ _ <-
+ ZIO
+ .logAnnotate("consumer", "1") {
+ // When the stream ends, the topic subscription ends as well. Because of that the consumer
+ // shuts down and commits are no longer possible. Therefore, we signal the second consumer in
+ // such a way that it doesn't close the stream.
+ c1
+ .plainStream(subscription, Serde.string, Serde.string)
+ .tap { record =>
+ for {
+ _ <-
+ ZIO.logDebug(
+ s"Received record with offset ${record.partition}:${record.offset.offset} and key ${record.key}"
+ )
+ // Signal that consumer 2 can start when a record was seen for every partition.
+ offsets <- c1Offsets.updateAndGet(_ :+ record.offset)
+ _ <- c1Started.succeed(()).when(offsets.map(_.partition).toSet.size == partitionCount)
+ // Register an external commit (which we're not actually doing 😀)
+ _ <- c1.registerExternalCommits(OffsetBatch(record.offset)).unit
+ } yield ()
+ }
+ .runDrain
+ }
+ .fork
+ _ <- c1Started.await
+ _ <- ZIO.logDebug("Starting consumer 2")
+ c2 <- makeConsumer(clientId2, groupId, rebalanceSafeCommits = false, Diagnostics.NoOp)
+ rebalanceStartTime <- Clock.instant
+ _ <- ZIO
+ .logAnnotate("consumer", "2") {
+ c2
+ .plainStream(subscription, Serde.string, Serde.string)
+ .tap(msg => ZIO.logDebug(s"Received ${msg.key}"))
+ .runDrain
+ }
+ .fork
+ _ <- ZIO.logDebug("Waiting for rebalance to end")
+ rebalanceEndTime <- rebalanceEndTimePromise.await
+ _ <- c1.stopConsumption *> c2.stopConsumption
+ rebalanceDuration = Duration
+ .fromInterval(rebalanceStartTime, rebalanceEndTime)
+ .truncatedTo(ChronoUnit.SECONDS)
+ .getSeconds
+ .toInt
+ _ <- ZIO.logDebug(s"Rebalance took $rebalanceDuration seconds")
+ } yield assertTrue(rebalanceDuration <= 2)
+ },
test("partitions for topic doesn't fail if doesn't exist") {
for {
topic <- randomTopic
diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala
index 514063acf..82471dc80 100644
--- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala
+++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala
@@ -206,7 +206,8 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {
}
abstract class MockCommitter extends Committer {
- override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit
+ override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit
+ override val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit
override def processQueuedCommits(consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean): Task[Unit] = ZIO.unit
diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
index 9d7ab0a22..3f24a6d48 100644
--- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
+++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
@@ -158,6 +158,16 @@ trait Consumer {
* Expose internal consumer metrics
*/
def metrics: Task[Map[MetricName, Metric]]
+
+ /**
+ * Register a commit that was done externally, that is, not by this consumer.
+ *
+ * This method is useful when you want to use rebalance-safe-commits, but you are not committing to the Kafka brokers,
+ * but to some external system, for example a relational database.
+ *
+ * See also [[zio.kafka.consumer.ConsumerSettings.withRebalanceSafeCommits]].
+ */
+ def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit]
}
object Consumer {
@@ -604,4 +614,7 @@ private[consumer] final class ConsumerLive private[consumer] (
override def metrics: Task[Map[MetricName, Metric]] =
consumer.withConsumer(_.metrics().asScala.toMap)
+ override def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] =
+ runloopAccess.registerExternalCommits(externallyCommittedOffsets)
+
}
diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
index 4b7b017b9..5db7221cd 100644
--- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
+++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
@@ -231,6 +231,9 @@ final case class ConsumerSettings(
* Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this
* calculates to 3 minutes. See [[#withMaxRebalanceDuration]] to change the default.
*
+ * External commits (that is, commits to an external system, e.g. a relational database) must be registered to the
+ * consumer with [[Consumer.registerExternalCommits]].
+ *
* When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any
* offset commits from these streams have a high chance of being delayed (commits are not possible during some phases
* of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will
diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala
index 6c4b5a977..7ec264f0b 100644
--- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala
+++ b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala
@@ -24,6 +24,8 @@ sealed trait OffsetBatch {
object OffsetBatch {
val empty: OffsetBatch = EmptyOffsetBatch
+ def apply(offset: Offset): OffsetBatch = empty.add(offset)
+
def apply(offsets: Iterable[Offset]): OffsetBatch = offsets.foldLeft(empty)(_ add _)
}
diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala
index a40427dcc..2a7145dbe 100644
--- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala
+++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala
@@ -11,8 +11,13 @@ import java.lang.Math.max
import scala.collection.mutable
private[internal] trait Committer {
+
+ /** A function to commit offsets. */
val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit]
+ /** A function to register offsets that have been committed externally. */
+ val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit]
+
/**
* Takes commits from the queue, commits them and adds them to pending commits
*
diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala
index 6ababd40f..557b6f2eb 100644
--- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala
+++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala
@@ -23,6 +23,15 @@ private[consumer] final class LiveCommitter(
pendingCommits: Ref.Synchronized[Chunk[Commit]]
) extends Committer {
+ override val registerExternalCommits: Map[
+ TopicPartition,
+ OffsetAndMetadata
+ ] => Task[Unit] = offsets =>
+ committedOffsetsRef.modify {
+ // The continuation promise can be `null` because this commit is not actually handled by the consumer.
+ _.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))
+ }.unit
+
/** This is the implementation behind the user facing api `Offset.commit`. */
override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] =
offsets =>
diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
index 3c277d8c0..843087c5c 100644
--- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
+++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
@@ -532,6 +532,9 @@ private[consumer] final class Runloop private (
.repeat(runloopMetricsSchedule)
.unit
}
+
+ def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit] =
+ committer.registerExternalCommits(offsetBatch.offsets)
}
object Runloop {
diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala
index 90c787f84..6b9941b30 100644
--- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala
+++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala
@@ -6,7 +6,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
-import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription }
+import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, OffsetBatch, Subscription }
import zio.stream.{ Stream, Take, UStream, ZStream }
import zio._
@@ -66,6 +66,9 @@ private[consumer] final class RunloopAccess private (
}
} yield stream
+ def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] =
+ withRunloopZIO(requireRunning = true)(_.registerExternalCommits(externallyCommittedOffsets))
+
}
private[consumer] object RunloopAccess {