Skip to content

Commit

Permalink
Use Semaphore (#1142)
Browse files Browse the repository at this point in the history
Since Zio 2.0.20 Semaphore is fair so we can use it again. This reverts 2cc9efc.
  • Loading branch information
adamgfraser authored Dec 24, 2023
1 parent 8a31ed7 commit 814bac9
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 7 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ lazy val zioKafka =
.settings(
libraryDependencies ++= Seq(
kafkaClients,
scalaCollectionCompat,
"dev.zio" %% "zio-concurrent" % zioVersion.value
scalaCollectionCompat
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ import zio.kafka.consumer.ConsumerSettings
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer

import scala.jdk.CollectionConverters._
import zio.concurrent.ReentrantLock

private[consumer] final class ConsumerAccess(
private[consumer] val consumer: ByteArrayKafkaConsumer,
access: ReentrantLock
access: Semaphore
) {

def withConsumer[A](f: ByteArrayKafkaConsumer => A): Task[A] =
withConsumerZIO[Any, A](c => ZIO.attempt(f(c)))

def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
access.lock.zipRight(withConsumerNoPermit(f)).ensuring(access.unlock)
access.withPermit(withConsumerNoPermit(f))

private def withConsumerNoPermit[R, A](
f: ByteArrayKafkaConsumer => RIO[R, A]
Expand All @@ -36,7 +35,7 @@ private[consumer] final class ConsumerAccess(
* Use this method only from Runloop.
*/
private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] =
access.lock.zipRight(f(consumer)).ensuring(access.unlock)
access.withPermit(f(consumer))

/**
* Use this method ONLY from the rebalance listener.
Expand Down Expand Up @@ -67,6 +66,6 @@ private[consumer] object ConsumerAccess {

def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Throwable, ConsumerAccess] =
for {
access <- ReentrantLock.make(fairness = true)
access <- Semaphore.make(1)
} yield new ConsumerAccess(consumer, access)
}

0 comments on commit 814bac9

Please sign in to comment.