Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rebalance-safe-commits with external commits #1425

Merged
merged 12 commits into from
Jan 13, 2025
37 changes: 35 additions & 2 deletions docs/preventing-duplicates.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,39 @@ a partition.
For this to work correctly, your program must process a chunk of records within max-rebalance-duration. The clock
starts the moment the chunk is pushed into the stream and ends when the commits for these records complete.

For more information see the scaladocs in `ConsumerSettings`, read the description of
[pull request #1098](https://github.com/zio/zio-kafka/pull/1098) that introduced this feature, or watch the presentation
In addition, your program must commit the offsets of consumed records. The most straightforward way is to commit to the
Kafka brokers. This is done by calling `.commit` on the offset of consumed records (see the consumer documentation).
However, there are more options: external commits and transactional producing.

### Commit to an external system

When you commit to an external system (e.g. by writing to a relational database) the zio-kafka consumer needs to know
about those commits before it can work in rebalance-safe-commits mode. Inform zio-kafka about external commits by
invoking method `Consumer.registerExternalCommits(offsetBatch: OffsetBatch)` (available since zio-kafka 2.9.2).

Here is what this could look like:

```scala
import zio.kafka.consumer._

consumer.plainStream(Subscription.topics("topic2000"), Serde.string, Serde.string)
.mapZIO { record =>
database.store(record.offset) *> // <-- the external commit
consumer.registerExternalCommits(OffsetBatch(record.offset))
}
.runDrain
```

### Commit with a transactional producer

Although transactional producing is possible with zio-kafka, it is not easy and the code is very messy (see
`ConsumerSpec` for an example). Transactional producing can not be used in combination with rebalance-safe-commits mode.

Zio-kafka v3.0.0 will make transactional producing much easier.

## More information

There is more information in the scaladocs of `ConsumerSettings` and the description of
[pull request #1098](https://github.com/zio/zio-kafka/pull/1098) that introduced this feature.
You can also watch the presentation
[Making ZIO-Kafka Safer And Faster](https://www.youtube.com/watch?v=MJoRwEyyVxM). The relevant part starts at 10:24.
1 change: 1 addition & 0 deletions zio-kafka-test/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

<!-- Some useful loggers to configure while debugging tests: -->
<logger name="zio.kafka" level="INFO" />
<logger name="zio.kafka.consumer.internal.Runloop" level="INFO" />
<logger name="zio.kafka.consumer.ConsumerSpec" level="INFO" />

<root level="INFO">
Expand Down
115 changes: 115 additions & 0 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.ExecutionException
import scala.reflect.ClassTag

Expand Down Expand Up @@ -888,6 +890,119 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
testForPartitionAssignmentStrategy[CooperativeStickyAssignor]
)
}: _*),
test("external commits are used when rebalanceSafeCommits is enabled") {

/*
* Outline of this test
* - A producer generates some messages on every partition of a topic (2 partitions),
* - Consumer 1 starts reading from the topic. It is the only consumer so it handles all partitions. This
* consumer has `rebalanceSafeCommits` enabled. It does not commit offsets, but it does register external
* commits (it does not actually commit anywhere). In addition we set `maxRebalanceDuration` to 20 seconds.
* - After a few messages consumer 2 is started and a rebalance starts.
* - We measure how long the rebalance takes.
*
* When the rebalance finishes immediately, we know that the external commits were used. If it finishes in 20
* seconds, we know that the external commits were not used.
*/
val partitionCount = 2

def makeConsumer(
clientId: String,
groupId: String,
rebalanceSafeCommits: Boolean,
diagnostics: Diagnostics
): ZIO[Scope with Kafka, Throwable, Consumer] =
for {
settings <- consumerSettings(
clientId = clientId,
groupId = Some(groupId),
`max.poll.records` = 1,
rebalanceSafeCommits = rebalanceSafeCommits,
maxRebalanceDuration = 20.seconds,
commitTimeout = 1.second
)
consumer <- Consumer.make(settings, diagnostics)
} yield consumer

for {
topic <- randomTopic
subscription = Subscription.topics(topic)
clientId1 <- randomClient
clientId2 <- randomClient
groupId <- randomGroup
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = partitionCount))
// Produce one message to all partitions, every 500 ms
_ <- ZStream
.fromSchedule(Schedule.fixed(500.millis))
.mapZIO { i =>
ZIO.foreachDiscard(0 until partitionCount) { p =>
produceMany(topic, p, Seq((s"key-$p-$i", s"msg-$p-$i")))
}
}
.runDrain
.fork
_ <- ZIO.logDebug("Starting consumer 1")
rebalanceEndTimePromise <- Promise.make[Nothing, Instant]
c1Diagnostics = new Diagnostics {
override def emit(event: => DiagnosticEvent): UIO[Unit] = event match {
case r: DiagnosticEvent.Rebalance if r.assigned.size == 1 =>
ZIO.logDebug(s"Rebalance finished: $r") *>
Clock.instant.flatMap(rebalanceEndTimePromise.succeed).unit
case r: DiagnosticEvent.Rebalance =>
ZIO.logDebug(s"Rebalance finished: $r")
case _ => ZIO.unit
}
}
c1 <- makeConsumer(clientId1, groupId, rebalanceSafeCommits = true, c1Diagnostics)
c1Started <- Promise.make[Nothing, Unit]
c1Offsets <- Ref.make(Chunk.empty[Offset])
_ <-
ZIO
.logAnnotate("consumer", "1") {
// When the stream ends, the topic subscription ends as well. Because of that the consumer
// shuts down and commits are no longer possible. Therefore, we signal the second consumer in
// such a way that it doesn't close the stream.
c1
.plainStream(subscription, Serde.string, Serde.string)
.tap { record =>
for {
_ <-
ZIO.logDebug(
s"Received record with offset ${record.partition}:${record.offset.offset} and key ${record.key}"
)
// Signal that consumer 2 can start when a record was seen for every partition.
offsets <- c1Offsets.updateAndGet(_ :+ record.offset)
_ <- c1Started.succeed(()).when(offsets.map(_.partition).toSet.size == partitionCount)
// Register an external commit (which we're not actually doing 😀)
_ <- c1.registerExternalCommits(OffsetBatch(record.offset)).unit
} yield ()
}
.runDrain
}
.fork
_ <- c1Started.await
_ <- ZIO.logDebug("Starting consumer 2")
c2 <- makeConsumer(clientId2, groupId, rebalanceSafeCommits = false, Diagnostics.NoOp)
rebalanceStartTime <- Clock.instant
_ <- ZIO
.logAnnotate("consumer", "2") {
c2
.plainStream(subscription, Serde.string, Serde.string)
.tap(msg => ZIO.logDebug(s"Received ${msg.key}"))
.runDrain
}
.fork
_ <- ZIO.logDebug("Waiting for rebalance to end")
rebalanceEndTime <- rebalanceEndTimePromise.await
_ <- c1.stopConsumption *> c2.stopConsumption
rebalanceDuration = Duration
.fromInterval(rebalanceStartTime, rebalanceEndTime)
.truncatedTo(ChronoUnit.SECONDS)
.getSeconds
.toInt
_ <- ZIO.logDebug(s"Rebalance took $rebalanceDuration seconds")
} yield assertTrue(rebalanceDuration <= 2)
},
test("partitions for topic doesn't fail if doesn't exist") {
for {
topic <- randomTopic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j {
}

abstract class MockCommitter extends Committer {
override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit
override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit
override val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit

override def processQueuedCommits(consumer: ByteArrayKafkaConsumer, executeOnEmpty: Boolean): Task[Unit] = ZIO.unit

Expand Down
13 changes: 13 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ trait Consumer {
* Expose internal consumer metrics
*/
def metrics: Task[Map[MetricName, Metric]]

/**
* Register a commit that was done externally, that is, not by this consumer.
*
* This method is useful when you want to use rebalance-safe-commits, but you are not committing to the Kafka brokers,
* but to some external system, for example a relational database.
*
* See also [[zio.kafka.consumer.ConsumerSettings.withRebalanceSafeCommits]].
*/
def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit]
}

object Consumer {
Expand Down Expand Up @@ -604,4 +614,7 @@ private[consumer] final class ConsumerLive private[consumer] (
override def metrics: Task[Map[MetricName, Metric]] =
consumer.withConsumer(_.metrics().asScala.toMap)

override def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] =
runloopAccess.registerExternalCommits(externallyCommittedOffsets)

}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ final case class ConsumerSettings(
* Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this
* calculates to 3 minutes. See [[#withMaxRebalanceDuration]] to change the default.
*
* External commits (that is, commits to an external system, e.g. a relational database) must be registered to the
* consumer with [[Consumer.registerExternalCommits]].
*
* When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any
* offset commits from these streams have a high chance of being delayed (commits are not possible during some phases
* of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ sealed trait OffsetBatch {
object OffsetBatch {
val empty: OffsetBatch = EmptyOffsetBatch

def apply(offset: Offset): OffsetBatch = empty.add(offset)

def apply(offsets: Iterable[Offset]): OffsetBatch = offsets.foldLeft(empty)(_ add _)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ import java.lang.Math.max
import scala.collection.mutable

private[internal] trait Committer {

/** A function to commit offsets. */
val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit]

/** A function to register offsets that have been committed externally. */
val registerExternalCommits: Map[TopicPartition, OffsetAndMetadata] => Task[Unit]

/**
* Takes commits from the queue, commits them and adds them to pending commits
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ private[consumer] final class LiveCommitter(
pendingCommits: Ref.Synchronized[Chunk[Commit]]
) extends Committer {

override val registerExternalCommits: Map[
TopicPartition,
OffsetAndMetadata
] => Task[Unit] = offsets =>
committedOffsetsRef.modify {
// The continuation promise can be `null` because this commit is not actually handled by the consumer.
_.addCommits(Chunk(Commit(java.lang.System.nanoTime(), offsets, null)))
}.unit

/** This is the implementation behind the user facing api `Offset.commit`. */
override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] =
offsets =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,9 @@ private[consumer] final class Runloop private (
.repeat(runloopMetricsSchedule)
.unit
}

def registerExternalCommits(offsetBatch: OffsetBatch): Task[Unit] =
committer.registerExternalCommits(offsetBatch.offsets)
}

object Runloop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription }
import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, OffsetBatch, Subscription }
import zio.stream.{ Stream, Take, UStream, ZStream }
import zio._

Expand Down Expand Up @@ -66,6 +66,9 @@ private[consumer] final class RunloopAccess private (
}
} yield stream

def registerExternalCommits(externallyCommittedOffsets: OffsetBatch): Task[Unit] =
withRunloopZIO(requireRunning = true)(_.registerExternalCommits(externallyCommittedOffsets))

}

private[consumer] object RunloopAccess {
Expand Down
Loading