diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala index 801801b49..a65727640 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala @@ -42,8 +42,11 @@ class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka with Producer counter <- Ref.make(0) _ <- Consumer .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) - .tap { _ => - counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == recordCount)) + .chunks + .tap { batch => + counter + .updateAndGet(_ + batch.size) + .flatMap(count => Consumer.stopConsumption.when(count == recordCount)) } .runDrain .provideSome[Kafka](env)