Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Apr 14, 2024
1 parent 46c8680 commit 22308d6
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,35 +362,34 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
for {
group <- randomGroup
client <- randomClient
_ <- produceMany(topic, kvs)
messagesReceived <- Ref.make[Int](0)
offset <- ZIO.scoped {
for {
stop <- Promise.make[Nothing, Unit]
fib <-
Consumer
.plainStreamWithGracefulShutdown(Subscription.topics(topic), Serde.string, Serde.string) {
stream =>
stream.mapConcatZIO { record =>
for {
nr <- messagesReceived.updateAndGet(_ + 1)
_ <- stop.succeed(()).when(nr == 10)
.plainStreamWithGracefulShutdown(
Subscription.topics(topic),
Serde.string,
Serde.string
) { stream =>
stream.mapConcatZIO { record =>
for {
nr <- messagesReceived.updateAndGet(_ + 1)
_ <- stop.succeed(()).when(nr == 10)
// _ <- ZIO.debug(nr)
} yield if (nr < 10) Seq(record.offset) else Seq.empty
}
.transduce(Consumer.offsetBatches)
.mapZIO(batch =>
ZIO.logInfo("Committing") *> batch.commit *> ZIO.logInfo("Committed")
)
.runDrain *> ZIO.logInfo("Stream done")
} yield if (nr < 10) Seq(record.offset) else Seq.empty
}
.transduce(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
}
.forkScoped
_ <- stop.await *> fib.interrupt
_ <- ZIO.logInfo("Interrupting done")
_ <- ZIO.sleep(1.second) // Some time for the broker to have processed the commit?
_ <- produceMany(topic, kvs)
_ <- stop.await *> fib.interrupt.debug
offset <- Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head)
} yield offset
}.provideSomeLayer[Kafka](consumer(client, Some(group)))
}.provideSomeLayer[Kafka with Producer](consumer(client, Some(group)))
} yield assert(offset.map(_.offset))(isSome(equalTo(9L)))
} @@ nonFlaky(10),
test(
Expand Down

0 comments on commit 22308d6

Please sign in to comment.