Skip to content

Commit

Permalink
Refined implementation of #1022 (comment)
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Sep 9, 2023
1 parent 3dedcaf commit d3a5139
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 7 deletions.
6 changes: 3 additions & 3 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess }
import zio.kafka.serde.{ Deserializer, Serde }
import zio.kafka.utils.SslHelper
import zio.kafka.utils.{ Awaitable, SslHelper }
import zio.stream._

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -51,7 +51,7 @@ trait Consumer {

def commitAccumBatch[R](
commitschedule: Schedule[R, Any, Any]
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]]
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Awaitable[Throwable, Unit]]]

def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]]

Expand Down Expand Up @@ -465,7 +465,7 @@ private[consumer] final class ConsumerLive private[consumer] (

override def commitAccumBatch[R](
commitSchedule: Schedule[R, Any, Any]
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] =
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Awaitable[Throwable, Unit]]] =
runloopAccess.commitAccumBatch(commitSchedule)

override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.fetch.FetchStrategy
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.utils.Awaitable
import zio.stream._

import java.util
Expand Down Expand Up @@ -132,7 +133,7 @@ private[consumer] final class Runloop private (
// noinspection YieldingZIOEffectInspection
private[internal] def commitAccumBatch[R](
commitSchedule: Schedule[R, Any, Any]
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] =
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Awaitable[Throwable, Unit]]] =
for {
acc <- Ref.Synchronized.make(Map.empty[TopicPartition, Long] -> List.empty[Promise[Throwable, Unit]])
_ <- acc.updateZIO { case data @ (offsets, promises) =>
Expand All @@ -155,7 +156,7 @@ private[consumer] final class Runloop private (
val newPromises = promises :+ p
(newOffsets, newPromises)
}
} yield p.await
} yield Awaitable(p)
}

private val commit: Map[TopicPartition, Long] => Task[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.consumer.{ CommittableRecord, ConsumerSettings, InvalidSubscriptionUnion, Subscription }
import zio.stream.{ Stream, Take, UStream, ZStream }
import zio._
import zio.kafka.utils.Awaitable

private[internal] sealed trait RunloopState
private[internal] object RunloopState {
Expand Down Expand Up @@ -78,9 +79,11 @@ private[consumer] final class RunloopAccess private (

def commitAccumBatch[R](
commitschedule: Schedule[R, Any, Any]
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] =
): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Awaitable[Throwable, Unit]]] =
withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))(
ZIO.succeed((_: Chunk[CommittableRecord[_, _]]) => ZIO.succeed(ZIO.unit))
ZIO.succeed((_: Chunk[CommittableRecord[_, _]]) =>
ZIO.succeed(Awaitable.unit.asInstanceOf[Awaitable[Throwable, Unit]])
)
)

}
Expand Down
36 changes: 36 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/utils/Awaitable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package zio.kafka.utils

import zio.{ IO, Promise, Trace, ZIO }

/**
* Less powerful interface than Promise
*
* Avoid leaking to the use the Promise methods we don't want him/her to have access to.
*/
// noinspection ConvertExpressionToSAM
trait Awaitable[E, A] { self =>

def await(implicit trace: Trace): IO[E, A]

final def combineDiscard(other: Awaitable[E, _]): Awaitable[E, Unit] =
new Awaitable[E, Unit] {
override def await(implicit trace: Trace): IO[E, Unit] =
for {
_ <- self.await
_ <- other.await
} yield ()
}
}

//noinspection ConvertExpressionToSAM
object Awaitable {
def apply[E, A](p: Promise[E, A]): Awaitable[E, A] =
new Awaitable[E, A] {
override def await(implicit trace: Trace): IO[E, A] = p.await
}

val unit: Awaitable[Nothing, Unit] =
new Awaitable[Nothing, Unit] {
override def await(implicit trace: Trace): IO[Nothing, Unit] = ZIO.unit
}
}

0 comments on commit d3a5139

Please sign in to comment.