diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 41a14e1415..14bbdb3cf1 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -12,7 +12,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess } import zio.kafka.serde.{ Deserializer, Serde } -import zio.kafka.utils.SslHelper +import zio.kafka.utils.{ Awaitable, SslHelper } import zio.stream._ import scala.jdk.CollectionConverters._ @@ -51,7 +51,7 @@ trait Consumer { def commitAccumBatch[R]( commitschedule: Schedule[R, Any, Any] - ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Awaitable[Throwable, Unit]]] def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] @@ -465,7 +465,7 @@ private[consumer] final class ConsumerLive private[consumer] ( override def commitAccumBatch[R]( commitSchedule: Schedule[R, Any, Any] - ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Awaitable[Throwable, Unit]]] = runloopAccess.commitAccumBatch(commitSchedule) override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 078b75971f..4b909f3624 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -11,6 +11,7 @@ import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.fetch.FetchStrategy import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.utils.Awaitable import zio.stream._ import java.util @@ -132,7 +133,7 @@ private[consumer] final class Runloop private ( // noinspection YieldingZIOEffectInspection private[internal] def commitAccumBatch[R]( commitSchedule: Schedule[R, Any, Any] - ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Awaitable[Throwable, Unit]]] = for { acc <- Ref.Synchronized.make(Map.empty[TopicPartition, Long] -> List.empty[Promise[Throwable, Unit]]) _ <- acc.updateZIO { case data @ (offsets, promises) => @@ -155,7 +156,7 @@ private[consumer] final class Runloop private ( val newPromises = promises :+ p (newOffsets, newPromises) } - } yield p.await + } yield Awaitable(p) } private val commit: Map[TopicPartition, Long] => Task[Unit] = diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index fc02723298..11beb876c6 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -8,6 +8,7 @@ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.kafka.consumer.{ CommittableRecord, ConsumerSettings, InvalidSubscriptionUnion, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } import zio._ +import zio.kafka.utils.Awaitable private[internal] sealed trait RunloopState private[internal] object RunloopState { @@ -78,9 +79,11 @@ private[consumer] final class RunloopAccess private ( def commitAccumBatch[R]( commitschedule: Schedule[R, Any, Any] - ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Awaitable[Throwable, Unit]]] = withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))( - ZIO.succeed((_: Chunk[CommittableRecord[_, _]]) => ZIO.succeed(ZIO.unit)) + ZIO.succeed((_: Chunk[CommittableRecord[_, _]]) => + ZIO.succeed(Awaitable.unit.asInstanceOf[Awaitable[Throwable, Unit]]) + ) ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/utils/Awaitable.scala b/zio-kafka/src/main/scala/zio/kafka/utils/Awaitable.scala new file mode 100644 index 0000000000..264b3db717 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/utils/Awaitable.scala @@ -0,0 +1,36 @@ +package zio.kafka.utils + +import zio.{ IO, Promise, Trace, ZIO } + +/** + * Less powerful interface than Promise + * + * Avoid leaking to the use the Promise methods we don't want him/her to have access to. + */ +// noinspection ConvertExpressionToSAM +trait Awaitable[E, A] { self => + + def await(implicit trace: Trace): IO[E, A] + + final def combineDiscard(other: Awaitable[E, _]): Awaitable[E, Unit] = + new Awaitable[E, Unit] { + override def await(implicit trace: Trace): IO[E, Unit] = + for { + _ <- self.await + _ <- other.await + } yield () + } +} + +//noinspection ConvertExpressionToSAM +object Awaitable { + def apply[E, A](p: Promise[E, A]): Awaitable[E, A] = + new Awaitable[E, A] { + override def await(implicit trace: Trace): IO[E, A] = p.await + } + + val unit: Awaitable[Nothing, Unit] = + new Awaitable[Nothing, Unit] { + override def await(implicit trace: Trace): IO[Nothing, Unit] = ZIO.unit + } +}