Skip to content

Commit

Permalink
Fix timing issue in ConsumerSpec test by adding delay in consumer str…
Browse files Browse the repository at this point in the history
…eam (#1388)

This PR addresses a timing issue in the ConsumerSpec test: 

`“it’s possible to start a new consumption session from a Consumer that
had a consumption session stopped previously”`

# Issue:

When running the entire test suite, this test occasionally fails with
the following assertion error:

```
Assertion failed:
  ✗ 100000 was not less than 100000
  consumed0 did not satisfy isGreaterThan(0L) && isLessThan(numberOfMessages.toLong)
  consumed0 = 100000
```

# Cause:
- The failure occurs because the first consumer sometimes consumes all
messages before consumer.stopConsumption is called.
- This happens due to timing variations when the test suite is run in
full, possibly because of system performance or resource contention.
- As a result, consumed0 equals numberOfMessages, causing the assertion
consumed0 < numberOfMessages.toLong to fail.

# Solution:
- Introduce a slight delay in the consumer stream to prevent it from
consuming all messages too quickly.
- This ensures that consumer.stopConsumption is called before all
messages are consumed.
- The test can now reliably check that the consumer can be stopped and
restarted.

# Testing:
- Ran the full test suite multiple times to confirm that the issue is
resolved.
- Verified that consumed0 is greater than 0 and less than
numberOfMessages, satisfying the original assertions.
  • Loading branch information
AdrielC authored Nov 19, 2024
1 parent 1c5041e commit 78cfa03
Showing 1 changed file with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1480,9 +1480,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
test(
"it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously"
) {
// NOTE:
// When this test fails with the message `100000 was not less than 100000`, it's because
// your computer is so fast that the first consumer already consumed all 100000 messages.
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

Expand All @@ -1494,11 +1491,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
// Starting a consumption session to start the Runloop.
fiber <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
.forkScoped
fiber <-
consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.tap(_ => ZIO.sleep(1.millisecond)) // sleep to avoid consuming all messages in under 200 millis
.take(numberOfMessages.toLong)
.runCount
.forkScoped
_ <- ZIO.sleep(200.millis)
_ <- consumer.stopConsumption
consumed0 <- fiber.join
Expand Down

0 comments on commit 78cfa03

Please sign in to comment.