Skip to content

Commit

Permalink
Fix benchmark code
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Sep 9, 2023
1 parent fd2a82f commit a60db8e
Showing 1 changed file with 4 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, produceMany, producer }
import zio.{ Chunk, Ref, ZIO, ZLayer }
import zio.{ Ref, ZIO, ZLayer }

import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -63,14 +63,11 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
.logAnnotate("consumer", "1") {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.tap { _ =>
counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages))
}
.mapChunksZIO(records => counter.update(_ + records.size) *> Consumer.commit(records).as(Chunk.empty))
.takeUntilZIO((_: Chunk[_]) => counter.get.map(_ >= nrMessages))
.mapChunksZIO(records => counter.update(_ + records.size) *> Consumer.commit(records).as(records))
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.runDrain
.provideSome[Kafka](env)
}
.provideSome[Kafka](env)
} yield ()
}
}

0 comments on commit a60db8e

Please sign in to comment.