Skip to content

Commit

Permalink
Use same amount of messages for all consumer benchmarks (#1382)
Browse files Browse the repository at this point in the history
In this change common benchmark code is moved into `ZioBenchmark`,
`ConsumerZioBenchmark` and `ProducerZioBenchmark` so that it becomes
easier to make the different benchmark comparable.

After running the consumer benchmarks with different number of records,
and different records sizes per run, this PR settled on 50k records of
~512 bytes per run for all consumer benchmarks. With these amounts the
zio-kafka based benchmarks and the 'comparison' benchmarks have roughly
the same scaling elasticity (where 'scaling elasticity' is defined as
the throughput growth factor divided by the number of records growth
factor).

After this PR is merged, the benchmark history will be rewritten with
linear scaling so that we can compare historic runs against new runs.
  • Loading branch information
erikvanoosten authored Nov 23, 2024
1 parent e23d64a commit b713170
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# Comparison Benchmarks

## How to run them
## Results

The benchmark are run from a GitHub action on every commit. The results are published on https://zio.github.io/zio-kafka/dev/bench/.

The results are automatically pruned by [a scala script](https://github.com/zio/zio-kafka/blob/gh-pages/scripts/prune-benchmark-history.sc) on the `gh-pages` branch.

## Interpreting the benchmarks

To do!

## How to run the benchmarks

To run these "comparison" benchmarks, in a sbt console, run:
```scala
Expand Down
17 changes: 17 additions & 0 deletions zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ import zio.{ ZLayer, _ }

import java.util.UUID

trait ConsumerZioBenchmark[Environment] extends ZioBenchmark[Environment] {
private val recordDataSize = 512
private def genString(i: Int): String = i.toString + scala.util.Random.alphanumeric.take(recordDataSize).mkString

protected val recordCount: Int = 50000
protected val kvs: Iterable[(String, String)] = Iterable.tabulate(recordCount)(i => (s"key$i", genString(i)))
protected val topic1 = "topic1"
protected val partitionCount = 6
}

trait ProducerZioBenchmark[Environment] extends ZioBenchmark[Environment] {
protected val recordCount = 500
protected val kvs: List[(String, String)] = List.tabulate(recordCount)(i => (s"key$i", s"msg$i"))
protected val topic1 = "topic1"
protected val partitionCount = 6
}

trait ZioBenchmark[Environment] {
var runtime: Runtime.Scoped[Environment] = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ import java.util.concurrent.TimeUnit

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
val topic1 = "topic1"
val nrPartitions = 6
val nrMessages = 50000
val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i"))
class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka with Producer] {

override protected def bootstrap: ZLayer[Any, Nothing, Kafka with Producer] =
ZLayer.make[Kafka with Producer](Kafka.embedded, producer).orDie

override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount))
_ <- produceMany(topic1, kvs)
} yield ()

Expand All @@ -47,7 +43,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
_ <- Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.tap { _ =>
counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages))
counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == recordCount))
}
.runDrain
.provideSome[Kafka](env)
Expand All @@ -67,7 +63,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
.tap(batch => counter.update(_ + batch.size))
.map(OffsetBatch.apply)
.mapZIO(_.commit)
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.takeUntilZIO(_ => counter.get.map(_ >= recordCount))
.runDrain
.provideSome[Kafka](env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ import java.util.concurrent.TimeUnit

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] {
val topic1 = "topic1"
val nrPartitions = 6
val nrMessages = 500
val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i"))
class ZioKafkaProducerBenchmark extends ProducerZioBenchmark[Kafka with Producer] {
val records: Chunk[ProducerRecord[String, String]] = Chunk.fromIterable(kvs.map { case (k, v) =>
new ProducerRecord(topic1, k, v)
})
Expand All @@ -28,7 +24,7 @@ class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] {

override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount))
} yield ()

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.github.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import zio.kafka.admin.AdminClient.TopicPartition
import zio.kafka.bench.ZioBenchmark
import zio.kafka.bench.ConsumerZioBenchmark
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.bench.comparison.ComparisonBenchmark._
import zio.kafka.consumer.{ Consumer, ConsumerSettings }
Expand All @@ -15,14 +15,10 @@ import zio.{ ULayer, ZIO, ZLayer }

import scala.jdk.CollectionConverters._

trait ComparisonBenchmark extends ZioBenchmark[Env] {
trait ComparisonBenchmark extends ConsumerZioBenchmark[Env] {

protected final val topic1: String = "topic1"
protected final val nrPartitions: Int = 6
protected final val topicPartitions: List[TopicPartition] =
(0 until nrPartitions).map(TopicPartition(topic1, _)).toList
protected final val numberOfMessages: Int = 1000000
protected final val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key$i", s"msg$i"))
(0 until partitionCount).map(TopicPartition(topic1, _)).toList

private val javaKafkaConsumer: ZLayer[ConsumerSettings, Throwable, LowLevelKafka] =
ZLayer.scoped {
Expand Down Expand Up @@ -63,7 +59,7 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] {
override final def initialize: ZIO[Env, Throwable, Any] =
for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount))
_ <- produceMany(topic1, kvs)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark {
consumer.subscribe(java.util.Arrays.asList(topic1))

var count = 0L
while (count < numberOfMessages) {
while (count < recordCount) {
val records = consumer.poll(settings.pollTimeout)
count += records.count()
}

consumer.unsubscribe()
count
}.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
}.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount"))
}
}
}
Expand All @@ -46,14 +46,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark {
consumer.assign(topicPartitions.map(_.asJava).asJava)

var count = 0L
while (count < numberOfMessages) {
while (count < recordCount) {
val records = consumer.poll(settings.pollTimeout)
count += records.count()
}

consumer.unsubscribe()
count
}.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
}.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark {
runZIO {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.take(numberOfMessages.toLong)
.take(recordCount.toLong)
.runCount
.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount"))
}

@Benchmark
Expand All @@ -32,9 +32,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark {
Serde.byteArray,
Serde.byteArray
)
.take(numberOfMessages.toLong)
.take(recordCount.toLong)
.runCount
.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount"))
}

}

0 comments on commit b713170

Please sign in to comment.