Skip to content

Commit

Permalink
Require a semaphore in Consumer.fromJavaConsumer (#1368)
Browse files Browse the repository at this point in the history
Fixes #1276

---------

Co-authored-by: Erik van Oosten <[email protected]>
  • Loading branch information
svroonland and erikvanoosten authored Nov 10, 2024
1 parent 37ad5e7 commit 1f8d590
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ object RunloopSpec extends ZIOSpecDefaultSlf4j {
): ZIO[Scope, Throwable, TestResult] =
ZIO.scoped {
for {
consumerAccess <- ConsumerAccess.make(mockConsumer)
consumerScope <- ZIO.scope
access <- Semaphore.make(1)
consumerAccess = new ConsumerAccess(mockConsumer, access)
consumerScope <- ZIO.scope
partitionsHub <- ZIO
.acquireRelease(Hub.unbounded[Take[Throwable, PartitionAssignment]])(_.shutdown)
.provide(ZLayer.succeed(consumerScope))
Expand Down
45 changes: 42 additions & 3 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,54 @@ object Consumer {
*
* You are responsible for creating and closing the KafkaConsumer. Make sure auto.commit is disabled.
*/
@deprecated("Use fromJavaConsumerWithPermit", since = "2.8.4")
def fromJavaConsumer(
javaConsumer: JConsumer[Array[Byte], Array[Byte]],
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
consumerAccess <- ConsumerAccess.make(javaConsumer)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
access <- Semaphore.make(1)
consumerAccess = new ConsumerAccess(javaConsumer, access)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
* Create a zio-kafka [[Consumer]] from an `org.apache.kafka KafkaConsumer`.
*
* You are responsible for all of the following:
* - creating and closing the `KafkaConsumer`,
* - making sure `auto.commit` is disabled,
* - creating `access` as a fair semaphore with a single permit,
* - acquire a permit from `access` before using the consumer, and release if afterwards,
* - not using the following consumer methods: `subscribe`, `unsubscribe`, `assign`, `poll`, `commit*`, `seek`,
* `pause`, `resume`, and `enforceRebalance`.
*
* Any deviation of these rules is likely to cause hard to track errors.
*
* Semaphore `access` is shared between you and the zio-kafka consumer. Use it as short as possible; while you hold a
* permit the zio-kafka consumer is blocked.
*
* @param javaConsumer
* Consumer
* @param settings
* Settings
* @param access
* A Semaphore with 1 permit.
* @param diagnostics
* Optional diagnostics listener
*/
def fromJavaConsumerWithPermit(
javaConsumer: JConsumer[Array[Byte], Array[Byte]],
settings: ConsumerSettings,
access: Semaphore,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
consumerAccess = new ConsumerAccess(javaConsumer, access)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,4 @@ private[consumer] object ConsumerAccess {
ZIO.blocking(access.withPermit(ZIO.attempt(consumer.close(settings.closeTimeout)))).orDie
}
} yield new ConsumerAccess(consumer, access)

def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Throwable, ConsumerAccess] =
for {
access <- Semaphore.make(1)
} yield new ConsumerAccess(consumer, access)
}

0 comments on commit 1f8d590

Please sign in to comment.