Skip to content

Commit

Permalink
Make access to the java consumer fair (#1109)
Browse files Browse the repository at this point in the history
Replace the semaphore that is used to control access to the java consumer with a reentrant lock that has fairness enabled. This prevents liveness problems for consumers that need to use the consumer, e.g. to fetch committed offsets. Fixes #1107.
  • Loading branch information
flavienbert authored Nov 17, 2023
1 parent dc36c85 commit 2cc9efc
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ lazy val zioKafka =
.settings(
libraryDependencies ++= Seq(
kafkaClients,
scalaCollectionCompat
scalaCollectionCompat,
"dev.zio" %% "zio-concurrent" % zioVersion.value
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,39 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSomeLayer[Kafka](consumer(client, Some(group)))
} yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata)))
},
test("access to the java consumer must be fair") {
val kvs = (1 to 10).toList.map(i => (s"key$i", s"msg$i"))

val expectedResult = (0 to 9).toList.map(i => i.toLong -> i.toLong)

for {
topic <- randomTopic
client <- randomClient
group <- randomGroup

_ <- produceMany(topic, kvs)
committedOffsetRef <- Ref.make(Seq.empty[(Long, Long)])

topicPartition = new TopicPartition(topic, 0)

_ <- Consumer
.plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(10)
.map(_.offset)
.mapZIO(offsetBatch =>
Consumer
.committed(Set(topicPartition))
.flatMap(offset =>
committedOffsetRef.update(map =>
map :+ (offsetBatch.offset -> offset(topicPartition).map(_.offset()).getOrElse(0L))
) *> offsetBatch.commit
)
)
.runDrain
.provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds))
offsets <- committedOffsetRef.get
} yield assert(offsets)(equalTo(expectedResult))
} @@ TestAspect.timeout(20.seconds),
test("handle rebalancing by completing topic-partition streams") {
val nrMessages = 50
val nrPartitions = 6 // Must be even and strictly positive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import zio.kafka.consumer.ConsumerSettings
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer

import scala.jdk.CollectionConverters._
import zio.concurrent.ReentrantLock

private[consumer] final class ConsumerAccess(
private[consumer] val consumer: ByteArrayKafkaConsumer,
access: Semaphore
access: ReentrantLock
) {

def withConsumer[A](f: ByteArrayKafkaConsumer => A): Task[A] =
withConsumerZIO[Any, A](c => ZIO.attempt(f(c)))

def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
access.withPermit(withConsumerNoPermit(f))
access.lock.zipRight(withConsumerNoPermit(f)).ensuring(access.unlock)

private[consumer] def withConsumerNoPermit[R, A](
f: ByteArrayKafkaConsumer => RIO[R, A]
Expand All @@ -34,7 +36,7 @@ private[consumer] final class ConsumerAccess(
* Do not use this method outside of the Runloop
*/
private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] =
access.withPermit(f(consumer))
access.lock.zipRight(f(consumer)).ensuring(access.unlock)
}

private[consumer] object ConsumerAccess {
Expand All @@ -58,6 +60,6 @@ private[consumer] object ConsumerAccess {

def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Throwable, ConsumerAccess] =
for {
access <- Semaphore.make(1)
access <- ReentrantLock.make(fairness = true)
} yield new ConsumerAccess(consumer, access)
}

0 comments on commit 2cc9efc

Please sign in to comment.