Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into bounded-commit-avai…
Browse files Browse the repository at this point in the history
…lable
  • Loading branch information
svroonland committed Nov 26, 2024
2 parents 5e37292 + bdde6e3 commit 137794f
Show file tree
Hide file tree
Showing 23 changed files with 1,276 additions and 590 deletions.
13 changes: 3 additions & 10 deletions .github/workflows/profile.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
# Copied from Flavio W. Brasil's work on Kyo: https://github.com/fwbrasil/kyo
name: profile
on:
push:
branches:
- main
pull_request:
types: [ opened, reopened, synchronize ]

# Prevent multiple builds at the same time from the same branch (except for 'master').
concurrency:
group: ${{ github.workflow }}-${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) && github.run_id || github.ref }}
cancel-in-progress: true
release:
types: [ created ]
workflow_dispatch:

permissions:
contents: write
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ lazy val binCompatVersionToCompare =
compatVersion
}

lazy val kafkaVersion = "3.8.1"
lazy val embeddedKafkaVersion = "3.8.1" // Should be the same as kafkaVersion, except for the patch part
lazy val kafkaVersion = "3.9.0"
lazy val embeddedKafkaVersion = "3.9.0" // Should be the same as kafkaVersion, except for the patch part

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.12"
Expand Down
3 changes: 1 addition & 2 deletions docs/consumer-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ the partition queues. A very rough estimate for the maximum amount of heap neede
The total can be tuned by changing the `partitionPreFetchBufferLimit`, `max.poll.records` settings.

Another option is to write a custom `FetchStrategy`. For example the `ManyPartitionsQueueSizeBasedFetchStrategy` in
[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (merged into zio-kafka since 2.8.1). Note that the fetch strategy API is marked as
experimental and may change without notice in any future zio-kafka version.
[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (merged into zio-kafka since 2.8.1).

## Long processing durations

Expand Down
87 changes: 87 additions & 0 deletions zio-kafka-bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Comparison Benchmarks

## Results

The benchmark are run from a GitHub action on every commit. The results are published
on https://zio.github.io/zio-kafka/dev/bench/.

The results are automatically pruned by [a scala script](https://github.com/zio/zio-kafka/blob/gh-pages/scripts/prune-benchmark-history.sc) on the `gh-pages` branch.

## The consumer benchmarks

When comparing the zio-kafka benchmarks against the regular Kafka clients, keep in mind that these benchmarks represent
the worst possible case for zio-kafka. This is because these consumers only count the received records, there is no
processing. This makes the comparison look bad for zio-kafka because zio-kafka programs normally process records in
parallel, while other Kafka consumers process records serially.

All consumer benchmarks send 50k ~512 byte records per run.

#### zio.kafka.bench.ZioKafkaConsumerBenchmark.throughput

Uses zio-kafka's `plainStream` with a topic subscription. The offsets of the consumed records are _not_ committed.

#### zio.kafka.bench.ZioKafkaConsumerBenchmark.throughputWithCommits

Same as above, but now the offsets of the consumed records are committed.

#### zio.kafka.bench.comparison.KafkaClientBenchmarks.kafkaClients

The simplest possible Kafka client that subscribes to a topic. It directly calls the poll method in a tight loop.

#### zio.kafka.bench.comparison.KafkaClientBenchmarks.manualKafkaClients

Same as above, but now using partition assignment instead of topic subscription.

#### zio.kafka.bench.comparison.ZioKafkaBenchmarks.zioKafka

Does the same as `zio.kafka.bench.ZioKafkaConsumerBenchmark.throughput`.

#### zio.kafka.bench.comparison.ZioKafkaBenchmarks.manualZioKafka

Does the same as `zio.kafka.bench.ZioKafkaConsumerBenchmark.throughput`, but uses a partition assignment instead of a
topic subscription.

## The producer benchmarks

#### zio.kafka.bench.ZioKafkaProducerBenchmark.produceChunkSeq

Sequentially produces 30 batches, where each batch contains 500 small records.

#### zio.kafka.bench.ZioKafkaProducerBenchmark.produceChunkPar

Produces the same batches as the above, but from 4 fibers.

#### zio.kafka.bench.ZioKafkaProducerBenchmark.produceSingleRecordSeq

Sequentially produces 100 small records.

#### zio.kafka.bench.ZioKafkaProducerBenchmark.produceSingleRecordPar

Produces 100 small records from 4 fibers.

## How to run the benchmarks

To run these "comparison" benchmarks, in a sbt console, run:

```scala
clean
Test/compile
zioKafkaBench/Jmh/run -wi 10 -i 10 -r 1 -w 1 -t 1 -f 5 -foe true .*comparison.*
```

The `.*comparison.*` part is the selector telling to JMH which benchmarks to run.
Here, we're only selecting the ones living in the `comparison` package.

## Tuning JMH runs

To list all possible options and understand these configurations, see run `sbt "zioKafkaBench/Jmh/run -h"`

Used options meaning:

- "-wi 10": 10 warmup iterations
- "-i 10": 10 benchmark iterations
- "-r 1": Minimum time to spend at each measurement iteration. 1 second
- "-w 1": Minimum time to spend at each warmup iteration. 1 second
- "-t 1": Number of worker threads to run with. 1 thread
- "-f 5": How many times to fork a single benchmark. 5 forks
- "-foe true": Should JMH fail immediately if any benchmark had experienced an unrecoverable error?. True
17 changes: 17 additions & 0 deletions zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ import zio.{ ZLayer, _ }

import java.util.UUID

trait ConsumerZioBenchmark[Environment] extends ZioBenchmark[Environment] {
private val recordDataSize = 512
private def genString(i: Int): String = i.toString + scala.util.Random.alphanumeric.take(recordDataSize).mkString

protected val recordCount: Int = 50000
protected val kvs: Iterable[(String, String)] = Iterable.tabulate(recordCount)(i => (s"key$i", genString(i)))
protected val topic1 = "topic1"
protected val partitionCount = 6
}

trait ProducerZioBenchmark[Environment] extends ZioBenchmark[Environment] {
protected val recordCount = 500
protected val kvs: List[(String, String)] = List.tabulate(recordCount)(i => (s"key$i", s"msg$i"))
protected val topic1 = "topic1"
protected val partitionCount = 6
}

trait ZioBenchmark[Environment] {
var runtime: Runtime.Scoped[Environment] = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ import java.util.concurrent.TimeUnit

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
val topic1 = "topic1"
val nrPartitions = 6
val nrMessages = 50000
val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i"))
class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka with Producer] {

override protected def bootstrap: ZLayer[Any, Nothing, Kafka with Producer] =
ZLayer.make[Kafka with Producer](Kafka.embedded, producer).orDie

override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount))
_ <- produceMany(topic1, kvs)
} yield ()

Expand All @@ -47,7 +43,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
_ <- Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.tap { _ =>
counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages))
counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == recordCount))
}
.runDrain
.provideSome[Kafka](env)
Expand All @@ -67,7 +63,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
.tap(batch => counter.update(_ + batch.size))
.map(OffsetBatch.apply)
.mapZIO(_.commit)
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.takeUntilZIO(_ => counter.get.map(_ >= recordCount))
.runDrain
.provideSome[Kafka](env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ import java.util.concurrent.TimeUnit

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] {
val topic1 = "topic1"
val nrPartitions = 6
val nrMessages = 500
val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i"))
class ZioKafkaProducerBenchmark extends ProducerZioBenchmark[Kafka with Producer] {
val records: Chunk[ProducerRecord[String, String]] = Chunk.fromIterable(kvs.map { case (k, v) =>
new ProducerRecord(topic1, k, v)
})
Expand All @@ -28,7 +24,7 @@ class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] {

override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount))
} yield ()

@Benchmark
Expand All @@ -54,7 +50,7 @@ class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] {
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
def produceSingleRecordSeq(): Any = runZIO {
// Produce 50 records sequentially
// Produce 100 records sequentially
Producer.produce(topic1, "key", "value", Serde.string, Serde.string).repeatN(99)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.github.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import zio.kafka.admin.AdminClient.TopicPartition
import zio.kafka.bench.ZioBenchmark
import zio.kafka.bench.ConsumerZioBenchmark
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.bench.comparison.ComparisonBenchmark._
import zio.kafka.consumer.{ Consumer, ConsumerSettings }
Expand All @@ -15,14 +15,10 @@ import zio.{ ULayer, ZIO, ZLayer }

import scala.jdk.CollectionConverters._

trait ComparisonBenchmark extends ZioBenchmark[Env] {
trait ComparisonBenchmark extends ConsumerZioBenchmark[Env] {

protected final val topic1: String = "topic1"
protected final val nrPartitions: Int = 6
protected final val topicPartitions: List[TopicPartition] =
(0 until nrPartitions).map(TopicPartition(topic1, _)).toList
protected final val numberOfMessages: Int = 1000000
protected final val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key$i", s"msg$i"))
(0 until partitionCount).map(TopicPartition(topic1, _)).toList

private val javaKafkaConsumer: ZLayer[ConsumerSettings, Throwable, LowLevelKafka] =
ZLayer.scoped {
Expand Down Expand Up @@ -63,7 +59,7 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] {
override final def initialize: ZIO[Env, Throwable, Any] =
for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount))
_ <- produceMany(topic1, kvs)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark {
consumer.subscribe(java.util.Arrays.asList(topic1))

var count = 0L
while (count < numberOfMessages) {
while (count < recordCount) {
val records = consumer.poll(settings.pollTimeout)
count += records.count()
}

consumer.unsubscribe()
count
}.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
}.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount"))
}
}
}
Expand All @@ -46,14 +46,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark {
consumer.assign(topicPartitions.map(_.asJava).asJava)

var count = 0L
while (count < numberOfMessages) {
while (count < recordCount) {
val records = consumer.poll(settings.pollTimeout)
count += records.count()
}

consumer.unsubscribe()
count
}.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
}.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount"))
}
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark {
runZIO {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.take(numberOfMessages.toLong)
.take(recordCount.toLong)
.runCount
.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount"))
}

@Benchmark
Expand All @@ -32,9 +32,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark {
Serde.byteArray,
Serde.byteArray
)
.take(numberOfMessages.toLong)
.take(recordCount.toLong)
.runCount
.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages"))
.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount"))
}

}
Loading

0 comments on commit 137794f

Please sign in to comment.