Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
In test `restartStreamsOnRebalancing mode closes all partition streams` consumer 1 is expected to receive at least 1 message. However, consumer 2 might grab all them through pre-fetching. Fix this by disabling pre-fetching for consumer 2.
  • Loading branch information
erikvanoosten committed Oct 28, 2023
1 parent a1fb1ff commit 9f4b41b
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -811,12 +811,16 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.take(20)
.runDrain
.provideSomeLayer[Kafka](
consumer(
client2,
Some(group),
clientInstanceId = Some("consumer2"),
properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10")
)
// Reduce `max.poll.records` and disable pre-fetch so that we are sure that consumer 2 does
// not pre-fetch more than it will process.
ZLayer {
consumerSettings(
client2,
Some(group),
clientInstanceId = Some("consumer2"),
`max.poll.records` = 10
).map(_.withFetchStrategy(_ => ZIO.succeed(Set.empty)))
} >>> minimalConsumer()
)
}
.fork
Expand Down

0 comments on commit 9f4b41b

Please sign in to comment.