Skip to content

Commit

Permalink
Fix concurrent modification exception in Consumer release (#1365)
Browse files Browse the repository at this point in the history
By using a semaphore also for the close operation we guard against
concurrent access during release. This reverts a change made in in
#1011.

The cause of this concurrent access after the Consumer has shutdown is
unfortunately unknown and I haven't been able to reproduce it.

Fixes #1238.
  • Loading branch information
svroonland authored Nov 10, 2024
1 parent 8ae5d79 commit c8f1173
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[consumer] object ConsumerAccess {

def make(settings: ConsumerSettings): ZIO[Scope, Throwable, ConsumerAccess] =
for {
access <- Semaphore.make(1)
consumer <- ZIO.acquireRelease {
ZIO.attemptBlocking {
new KafkaConsumer[Array[Byte], Array[Byte]](
Expand All @@ -59,10 +60,9 @@ private[consumer] object ConsumerAccess {
)
}
} { consumer =>
ZIO.blocking(ZIO.attempt(consumer.close(settings.closeTimeout))).orDie
ZIO.blocking(access.withPermit(ZIO.attempt(consumer.close(settings.closeTimeout)))).orDie
}
result <- make(consumer)
} yield result
} yield new ConsumerAccess(consumer, access)

def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Throwable, ConsumerAccess] =
for {
Expand Down

0 comments on commit c8f1173

Please sign in to comment.