Skip to content

Commit

Permalink
Extract Committer and RebalanceCoordinator classes from Runloop + uni…
Browse files Browse the repository at this point in the history
…t tests (#1375)

The code is mostly just moved to a different place, the logic was left
mostly intact. The 'interface' between the components has been decoupled
more, i.e. the rebalance listener no longer access the full Runloop's
State and the pending commits are stored internally in the Committer.

Care has been taken to make the Committer usable during rebalancing as
well, with the proper access to the Consumer for example. The part that
waits the end of the streams and their commits has been changed to use
the Committer.

Includes unit tests for the two new components.

---------

Co-authored-by: Erik van Oosten <[email protected]>
  • Loading branch information
svroonland and erikvanoosten authored Nov 26, 2024
1 parent b861c99 commit bdde6e3
Show file tree
Hide file tree
Showing 10 changed files with 1,099 additions and 493 deletions.
31 changes: 18 additions & 13 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.ExecutionException
import scala.reflect.ClassTag

//noinspection SimplifyAssertInspection
Expand Down Expand Up @@ -323,21 +324,20 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assert(offset.map(_.offset))(isSome(equalTo(9L)))
},
test("process outstanding commits after a graceful shutdown with aggregateAsync using `maxRebalanceDuration`") {
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
_ <- produceMany(topic, kvs)
messagesReceived <- Ref.make[Int](0)
topic <- randomTopic
group <- randomGroup
client <- randomClient
_ <- scheduledProduce(topic, Schedule.fixed(50.millis).jittered).runDrain.forkScoped
lastProcessedOffset <- Ref.make(0L)
offset <- (
Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.mapConcatZIO { record =>
.mapZIO { record =>
for {
nr <- messagesReceived.updateAndGet(_ + 1)
nr <- lastProcessedOffset.updateAndGet(_ + 1)
_ <- Consumer.stopConsumption.when(nr == 10)
} yield if (nr < 10) Seq(record.offset) else Seq.empty
} yield record.offset
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
Expand All @@ -353,7 +353,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
maxRebalanceDuration = 6.seconds
)
)
} yield assertTrue(offset.map(_.offset).contains(9L))
lastOffset <- lastProcessedOffset.get
} yield assertTrue(offset.map(_.offset).contains(lastOffset))
} @@ TestAspect.nonFlaky(2),
test("a consumer timeout interrupts the stream and shuts down the consumer") {
// Setup of this test:
Expand Down Expand Up @@ -1399,9 +1400,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
}: _*) @@ TestAspect.nonFlaky(2),
test("running streams don't stall after a poll timeout") {
for {
topic <- randomTopic
clientId <- randomClient
_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic))
topic <- randomTopic
clientId <- randomClient
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic)).flatMap(ZIO.fromTry(_)).retryN(3).catchSome {
case e: ExecutionException
if e.getCause.isInstanceOf[org.apache.kafka.common.errors.TopicExistsException] =>
ZIO.unit
}
settings <- consumerSettings(clientId)
consumer <- Consumer.make(settings.withPollTimeout(50.millis))
recordsOut <- Queue.unbounded[Unit]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import zio._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import zio.kafka.consumer.internal.Committer.CommitOffsets
import zio.kafka.consumer.internal.LiveCommitter.Commit
import zio.test._

object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
object CommitOffsetsSpec extends ZIOSpecDefault {

private val tp10 = new TopicPartition("t1", 0)
private val tp11 = new TopicPartition("t1", 1)
Expand All @@ -14,49 +16,49 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
private val tp22 = new TopicPartition("t2", 2)

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("Runloop.CommitOffsets spec")(
suite("CommitOffsets spec")(
test("addCommits adds to empty CommitOffsets") {
val s1 = Runloop.CommitOffsets(Map.empty)
val s1 = CommitOffsets(Map.empty)
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits updates offset when it is higher") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 4L))
val s1 = CommitOffsets(Map(tp10 -> 4L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(
inc == 10 - 4,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits ignores an offset when it is lower") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s1 = CommitOffsets(Map(tp10 -> 10L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits keeps unrelated partitions") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s1 = CommitOffsets(Map(tp10 -> 10L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L, tp11 -> 11L)
)
},
test("addCommits does it all at once") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L))))
assertTrue(
inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 1 + /* tp21 */ 0 + /* tp22 */ 0,
s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 206L, tp21 -> 210L, tp22 -> 220L)
)
},
test("addCommits adds multiple commits") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L))
val (inc, s2) = s1.addCommits(
Chunk(
makeCommit(Map(tp11 -> 11, tp20 -> 199L, tp21 -> 211L, tp22 -> 219L)),
Expand All @@ -69,35 +71,35 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
)
},
test("keepPartitions removes some partitions") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s2 = s1.keepPartitions(Set(tp10))
assertTrue(s2.offsets == Map(tp10 -> 10L))
},
test("does not 'contain' offset when tp is not present") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s1 = CommitOffsets(Map(tp10 -> 10L))
val result = s1.contains(tp20, 10)
assertTrue(!result)
},
test("does not 'contain' a higher offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp10, 11)
assertTrue(!result)
},
test("does 'contain' equal offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp10, 10)
assertTrue(result)
},
test("does 'contain' lower offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp20, 19)
assertTrue(result)
}
)

private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = {
private def makeCommit(offsets: Map[TopicPartition, Long]): Commit = {
val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) }
val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None))
Runloop.Commit(0L, o, p)
Commit(0L, o, p)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RebalanceInProgressException
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.test._
import zio.{ durationInt, Promise, Ref, ZIO }

import java.util.{ Map => JavaMap }
import scala.jdk.CollectionConverters.MapHasAsJava

object CommitterSpec extends ZIOSpecDefault {
override def spec = suite("Committer")(
test("signals that a new commit is available") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter
.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
_ <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
} yield assertCompletes
},
test("handles a successful commit by completing the commit effect") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
_ <- commitFiber.join
} yield assertCompletes
},
test("handles a failed commit by completing the commit effect with a failure") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) =>
ZIO.attempt(callback.onComplete(offsets, new RuntimeException("Commit failed")))
)
result <- commitFiber.await
} yield assertTrue(result.isFailure)
},
test("retries when rebalancing") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) =>
ZIO.attempt(callback.onComplete(offsets, new RebalanceInProgressException("Rebalance in progress")))
)
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
result <- commitFiber.await
} yield assertTrue(result.isSuccess)
},
test("adds 1 to the committed last offset") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
_ <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped
_ <- commitAvailable.await
committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]]
_ <- committer.processQueuedCommits((offsets, callback) =>
committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null))
)
offsetsCommitted <- committedOffsets.await
} yield assertTrue(
offsetsCommitted == Map(tp -> new OffsetAndMetadata(2)).asJava
)
},
test("batches commits from multiple partitions and offsets") {
for {
runtime <- ZIO.runtime[Any]
commitsAvailable <- Promise.make[Nothing, Unit]
nrCommitsDone <- Ref.make(0)
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable =
ZIO.whenZIO(nrCommitsDone.updateAndGet(_ + 1).map(_ == 3))(commitsAvailable.succeed(())).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
tp2 = new TopicPartition("topic", 1)
commitFiber1 <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped
commitFiber2 <- committer.commit(Map(tp -> new OffsetAndMetadata(2))).forkScoped
commitFiber3 <- committer.commit(Map(tp2 -> new OffsetAndMetadata(3))).forkScoped
_ <- commitsAvailable.await
committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]]
_ <- committer.processQueuedCommits((offsets, callback) =>
committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null))
)
_ <- commitFiber1.join zip commitFiber2.join zip commitFiber3.join
offsetsCommitted <- committedOffsets.await
} yield assertTrue(
offsetsCommitted == Map(tp -> new OffsetAndMetadata(3), tp2 -> new OffsetAndMetadata(4)).asJava
)
},
test("keeps track of pending commits") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
pendingCommitsDuringCommit <- committer.pendingCommitCount
_ <- committer.cleanupPendingCommits
pendingCommitsAfterCommit <- committer.pendingCommitCount
_ <- commitFiber.join
} yield assertTrue(pendingCommitsDuringCommit == 1 && pendingCommitsAfterCommit == 0)
},
test("keep track of committed offsets") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
committedOffsets <- committer.getCommittedOffsets
_ <- commitFiber.join
} yield assertTrue(committedOffsets.offsets == Map(tp -> 0L))
},
test("clean committed offsets of no-longer assigned partitions") {
for {
runtime <- ZIO.runtime[Any]
commitAvailable <- Promise.make[Nothing, Unit]
committer <- LiveCommitter.make(
10.seconds,
Diagnostics.NoOp,
new DummyMetrics,
onCommitAvailable = commitAvailable.succeed(()).unit,
sameThreadRuntime = runtime
)
tp = new TopicPartition("topic", 0)
commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped
_ <- commitAvailable.await
_ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null)))
_ <- committer.keepCommitsForPartitions(Set.empty)
committedOffsets <- committer.getCommittedOffsets
_ <- commitFiber.join
} yield assertTrue(committedOffsets.offsets.isEmpty)
}
) @@ TestAspect.withLiveClock @@ TestAspect.nonFlaky(100)
}
Loading

0 comments on commit bdde6e3

Please sign in to comment.