Skip to content

Commit

Permalink
Merge branch 'master' into badge
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten authored Jan 21, 2024
2 parents 1929966 + 7b2093e commit 301cda3
Show file tree
Hide file tree
Showing 9 changed files with 648 additions and 188 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ Kafka has a mature Java client for producing and consuming events, but it has a
In order to use this library, we need to add the following line in our `build.sbt` file:

```scala
libraryDependencies += "dev.zio" %% "zio-kafka" % "2.7.1"
libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.7.1" % Test
libraryDependencies += "dev.zio" %% "zio-kafka" % "2.7.2"
libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.7.2" % Test
```

## Example
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ lazy val zioKafkaTest =
libraryDependencies ++= Seq(
kafkaClients,
logback % Test,
"dev.zio" %% "zio-logging-slf4j" % "2.1.16" % Test,
"dev.zio" %% "zio-logging-slf4j" % "2.2.0" % Test,
scalaCollectionCompat
) ++ `embedded-kafka`.value
)
Expand All @@ -165,7 +165,7 @@ lazy val zioKafkaExample =
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.20",
"dev.zio" %% "zio-kafka" % "2.7.2",
"dev.zio" %% "zio-logging-slf4j2" % "2.1.16",
"dev.zio" %% "zio-logging-slf4j2" % "2.2.0",
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion,
logback,
"dev.zio" %% "zio-kafka-testkit" % "2.7.2" % Test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,57 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
override def spec: Spec[TestEnvironment with Scope, Any] =
suite("Runloop.CommitOffsets spec")(
test("addCommits adds to empty CommitOffsets") {
val s1 = Runloop.CommitOffsets(Map.empty)
val s2 = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(s2.offsets == Map(tp10 -> 10L))
val s1 = Runloop.CommitOffsets(Map.empty)
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits updates offset when it is higher") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 5L))
val s2 = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(s2.offsets == Map(tp10 -> 10L))
val s1 = Runloop.CommitOffsets(Map(tp10 -> 4L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10))))
assertTrue(
inc == 10 - 4,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits ignores an offset when it is lower") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s2 = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5))))
assertTrue(s2.offsets == Map(tp10 -> 10L))
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L)
)
},
test("addCommits keeps unrelated partitions") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val s2 = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11))))
assertTrue(s2.offsets == Map(tp10 -> 10L, tp11 -> 11L))
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11))))
assertTrue(
inc == 0,
s2.offsets == Map(tp10 -> 10L, tp11 -> 11L)
)
},
test("addCommits does it all at once") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L))
val s2 = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L))))
assertTrue(s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 206L, tp21 -> 210L, tp22 -> 220L))
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L))
val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L))))
assertTrue(
inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 1 + /* tp21 */ 0 + /* tp22 */ 0,
s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 206L, tp21 -> 210L, tp22 -> 220L)
)
},
test("addCommits adds multiple commits") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L))
val s2 = s1.addCommits(
val (inc, s2) = s1.addCommits(
Chunk(
makeCommit(Map(tp11 -> 11, tp20 -> 199L, tp21 -> 211L, tp22 -> 219L)),
makeCommit(Map(tp20 -> 198L, tp21 -> 209L, tp22 -> 221L))
)
)
assertTrue(s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 200L, tp21 -> 211L, tp22 -> 221L))
assertTrue(
inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 0 + /* tp21 */ 1 + /* tp22 */ 1,
s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 200L, tp21 -> 211L, tp22 -> 221L)
)
},
test("keepPartitions removes some partitions") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
Expand Down Expand Up @@ -80,6 +98,6 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = {
val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) }
val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None))
Runloop.Commit(o, p)
Runloop.Commit(0L, o, p)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import zio._
import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.consumer.fetch.{ FetchStrategy, QueueSizeBasedFetchStrategy }
import zio.kafka.security.KafkaCredentialStore
import zio.metrics.MetricLabel

/**
* Settings for the consumer.
Expand All @@ -30,7 +31,9 @@ final case class ConsumerSettings(
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
maxRebalanceDuration: Option[Duration] = None,
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy()
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(),
metricLabels: Set[MetricLabel] = Set.empty,
runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis)
) {

/**
Expand Down Expand Up @@ -278,6 +281,29 @@ final case class ConsumerSettings(
*/
def withFetchStrategy(fetchStrategy: FetchStrategy): ConsumerSettings =
copy(fetchStrategy = fetchStrategy)

/**
* @param metricLabels
* The labels given to all metrics collected by zio-kafka. By default no labels are set.
*
* For applications with multiple consumers it is recommended to set some metric labels. For example, if one is used,
* the consumer group id could be used as a label:
*
* {{{
* consumerSettings.withMetricLabels(Set(MetricLabel("group-id", groupId)))
* }}}
*/
def withMetricsLabels(metricLabels: Set[MetricLabel]): ConsumerSettings =
copy(metricLabels = metricLabels)

/**
* @param runloopMetricsSchedule
* The schedule at which the runloop metrics are measured. Example runloop metrics are queue sizes and number of
* outstanding commits. The default is to measure every 500ms.
*/
def withRunloopMetricsSchedule(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ConsumerSettings =
copy(runloopMetricsSchedule = runloopMetricsSchedule)

}

object ConsumerSettings {
Expand Down
Loading

0 comments on commit 301cda3

Please sign in to comment.