Skip to content

Commit

Permalink
Always end streams in rebalance listener, support lost partitions (#1089
Browse files Browse the repository at this point in the history
)

Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used.

Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data.

A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'.

The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830).

Also: prevent storing pending requests for streams that are no longer assigned.
  • Loading branch information
erikvanoosten authored Nov 4, 2023
1 parent 2086cb2 commit 21361c1
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 242 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import sbt.Def
lazy val kafkaVersion = "3.6.0"
lazy val embeddedKafkaVersion = "3.6.0" // Should be the same as kafkaVersion, except for the patch part

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11"
lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

Expand Down
200 changes: 100 additions & 100 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,30 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assertCompletes
},
test("restartStreamsOnRebalancing mode closes all partition streams") {
// Test plan:
// - Throughout the test, continuously produce to all partitions of a topic.
// - Start consumer 1:
// - track which partitions are assigned after each rebalance,
// - track which streams stopped.
// - Start consumer 2 but finish after just a few records. This results in 2 rebalances for consumer 1.
// - Verify that in the first rebalance, consumer 1 ends the streams for _all_ partitions,
// and then starts them again.
//
// NOTE: we need to use the cooperative sticky assignor. The default assignor `ConsumerPartitionAssignor`,
// revokes all partitions and re-assigns them on every rebalance. This means that all streams are restarted
// on every rebalance, exactly what `restartStreamOnRebalancing` would have caused. In other words, with the
// default assignor the externally visible behavior is the same, regardless of whether
// `restartStreamOnRebalancing` is `true` or `false`.

val nrPartitions = 5
val nrMessages = 100
val partitionIds = Chunk.fromIterable(0 until nrPartitions)

def awaitRebalance[A](partitionAssignments: Ref[Chunk[A]], nr: Int): ZIO[Any, Nothing, Unit] =
partitionAssignments.get
.repeat(
Schedule.recurUntil((_: Chunk[A]).size >= nr) && Schedule.fixed(100.millis)
)
.unit

for {
// Produce messages on several partitions
Expand All @@ -750,116 +772,94 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
client2 <- randomClient

_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions))
_ <- ZIO.foreachDiscard(1 to nrMessages) { i =>
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}

// Consume messages
messagesReceived <-
ZIO.foreach((0 until nrPartitions).toList)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap)
drainCount <- Ref.make(0)
subscription = Subscription.topics(topic)
fib <- ZIO
.logAnnotate("consumer", "1") {
Consumer
.partitionedAssignmentStream(subscription, Serde.string, Serde.string)
.rechunk(1)
.mapZIO { partitions =>
ZIO.logDebug(s"Got partition assignment ${partitions.map(_._1).mkString(",")}") *>
ZStream
.fromIterable(partitions)
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *>
partitionStream.mapChunksZIO { records =>
OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition)
.update(_ + records.size)
.as(records)
}
}
.runDrain
}
.mapZIO(_ =>
drainCount.updateAndGet(_ + 1).flatMap {
case 2 => ZIO.logDebug("Stopping consumption") *> Consumer.stopConsumption
// 1: when consumer on fib2 starts
// 2: when consumer on fib2 stops, end of test
case _ => ZIO.unit
}
)
.runDrain
.provideSomeLayer[Kafka](
consumer(
client1,
Some(group),
clientInstanceId = Some("consumer1"),
restartStreamOnRebalancing = true,
properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10")
)
)
// Continuously produce messages throughout the test
_ <- ZStream
.fromSchedule(Schedule.fixed(100.millis))
.mapZIO { i =>
ZIO.foreach(partitionIds) { p =>
produceMany(topic, p, Seq((s"key.$p.$i", s"msg.$p.$i")))
}
.fork
// fib is running, consuming all the published messages from all partitions.
// Waiting until it recorded all messages
_ <- ZIO
.foreach(messagesReceived.values)(_.get)
.map(_.sum)
.repeat(Schedule.recurUntil((n: Int) => n == nrMessages) && Schedule.fixed(100.millis))

// Starting a new consumer that will stop after receiving 20 messages,
// causing two rebalancing events for fib1 consumers on start and stop
fib2 <- ZIO
.logAnnotate("consumer", "2") {
}
.runDrain
.forkScoped

// Consumer 1
streamsStarted <- Ref.make[Chunk[Set[Int]]](Chunk.empty)
streamsStopped <- Ref.make[Chunk[Int]](Chunk.empty)
consumer1Settings <-
consumerSettings(
client1,
Some(group),
restartStreamOnRebalancing = true
).map {
_.withProperties(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName
)
}
fib1 <- ZIO
.logAnnotate("consumer", "1") {
Consumer
.plainStream(subscription, Serde.string, Serde.string)
.take(20)
.partitionedAssignmentStream(Subscription.topics(topic), Serde.string, Serde.string)
.rechunk(1)
.mapZIO { assignments =>
ZIO.logDebug(s"Got partition assignment ${assignments.map(_._1).mkString(",")}") *>
streamsStarted.update(_ :+ assignments.map(_._1.partition()).toSet) *>
ZStream
.fromIterable(assignments)
.flatMapPar(Int.MaxValue) { case (tp, partitionStream) =>
ZStream.finalizer {
ZIO.logDebug(s"Stream for ${tp.toString} is done") *>
streamsStopped.update(_ :+ tp.partition())
} *>
partitionStream.mapChunksZIO { records =>
OffsetBatch(records.map(_.offset)).commit.as(records)
}
}
.runDrain
}
.runDrain
.provideSomeLayer[Kafka](
consumer(
client2,
Some(group),
clientInstanceId = Some("consumer2"),
properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10")
)
ZLayer.succeed(consumer1Settings) >>> minimalConsumer()
)
}
.fork

// Waiting until fib1's partition streams got restarted because of the rebalancing
_ <- drainCount.get.repeat(Schedule.recurUntil((n: Int) => n == 1) && Schedule.fixed(100.millis))
_ <- ZIO.logDebug("Consumer 1 finished rebalancing")
// Wait until consumer 1 was assigned some partitions
_ <- awaitRebalance(streamsStarted, 1)

// All messages processed, the partition streams of fib are still running.
// Saving the values and resetting the counters
messagesReceived0 <-
ZIO
.foreach((0 until nrPartitions).toList) { i =>
messagesReceived(i).get.flatMap { v =>
Ref.make(v).map(r => i -> r)
} <* messagesReceived(i).set(0)
}
.map(_.toMap)
// Consumer 2
// Stop after receiving 20 messages, causing two rebalancing events for consumer 1.
consumer2Settings <- consumerSettings(client2, Some(group))
_ <- ZIO
.logAnnotate("consumer", "2") {
Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.take(20)
.runDrain
.provideSomeLayer[Kafka](
ZLayer.succeed(consumer2Settings) >>> minimalConsumer()
)
}
.forkScoped

// Publishing another N messages - now they will be distributed among the two consumers until
// fib2 stops after 20 messages
_ <- ZIO.foreachDiscard((nrMessages + 1) to (2 * nrMessages)) { i =>
produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i"))
}
_ <- fib2.join
_ <- ZIO.logDebug("Consumer 2 done")
_ <- fib.join
_ <- ZIO.logDebug("Consumer 1 done")
// fib2 terminates after 20 messages, fib terminates after fib2 because of the rebalancing (drainCount==2)
messagesPerPartition0 <-
ZIO.foreach(messagesReceived0.values)(_.get) // counts from the first N messages (single consumer)
messagesPerPartition <-
ZIO.foreach(messagesReceived.values)(_.get) // counts from fib after the second consumer joined

// The first set must contain all the produced messages
// The second set must have at least one and maximum N-20 (because fib2 stops after consuming 20) -
// the exact count cannot be known because fib2's termination triggers fib1's rebalancing asynchronously.
} yield assert(messagesPerPartition0)(forall(equalTo(nrMessages / nrPartitions))) &&
assert(messagesPerPartition.view.sum)(isGreaterThan(0) && isLessThanEqualTo(nrMessages - 20))
} @@ TestAspect.nonFlaky(3),
// Wait until consumer 1's partitions were revoked, and assigned again
_ <- awaitRebalance(streamsStarted, 3)
_ <- fib1.interrupt

// The started streams after each rebalance
streamsStarted <- streamsStarted.get
_ <- ZIO.logDebug(s"partitions for started streams: $streamsStarted")

streamsStopped <- streamsStopped.get
_ <- ZIO.logDebug(s"partitions for stopped streams: $streamsStopped")
} yield assertTrue(
// During the first rebalance, all partitions are stopped:
streamsStopped.take(nrPartitions).toSet == partitionIds.toSet,
// Some streams that were assigned at the beginning, are started after the first rebalance:
(streamsStarted(0) intersect streamsStarted(1)).nonEmpty
)
},
test("handles RebalanceInProgressExceptions transparently") {
val nrPartitions = 5
val nrMessages = 10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ final class PartitionStreamControl private (
queueInfoRef.get.map(_.deadlineExceeded(now))

/** To be invoked when the partition was lost. */
private[internal] def lost: UIO[Boolean] =
interruptionPromise.fail(new RuntimeException(s"Partition ${tp.toString} was lost"))
private[internal] def lost: UIO[Boolean] = {
val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace
interruptionPromise.fail(lostException)
}

/** To be invoked when the stream is no longer processing. */
private[internal] def halt: UIO[Boolean] = {
Expand Down
Loading

0 comments on commit 21361c1

Please sign in to comment.