diff --git a/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala b/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala index 5e3a9b12b..a4baa348c 100644 --- a/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala +++ b/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala @@ -41,32 +41,34 @@ object Main extends ZIOAppDefault { private val topic = "test-topic" - private def consumerLayer(kafka: MyKafka): ZLayer[Any, Throwable, Consumer] = { - val consumerSettings = - ConsumerSettings(kafka.bootstrapServers) - .withPollTimeout(500.millis) - .withGroupId("test") - - ZLayer.make[Consumer]( - ZLayer.succeed(consumerSettings), - ZLayer.succeed(Diagnostics.NoOp), - Consumer.live - ) + private def consumerSettings: ZLayer[MyKafka, Throwable, ConsumerSettings] = ZLayer { + for { + kafka <- ZIO.service[MyKafka] + } yield ConsumerSettings(kafka.bootstrapServers) + .withPollTimeout(500.millis) + .withGroupId("test") } - override def run: ZIO[ZIOAppArgs with Scope, Any, Any] = - ZIO.addFinalizer(ZIO.logInfo("Stopping app")) *> - ( - for { - _ <- ZIO.logInfo(s"Starting app") - kafka <- ZIO.service[MyKafka] - stream = Consumer - .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .provideLayer(consumerLayer(kafka)) - _ <- ZIO.logInfo(s"Consuming messages...") - consumed <- stream.take(1000).tap(r => ZIO.logInfo(s"Consumed record $r")).runCount - _ <- ZIO.logInfo(s"Consumed $consumed records") - } yield () - ).provideSomeLayer[ZIOAppArgs with Scope](MyKafka.embedded) + private val runConsumerStream: ZIO[Consumer, Throwable, Unit] = + for { + _ <- ZIO.logInfo("Consuming messages...") + consumed <- Consumer + .plainStream(Subscription.topics(topic), Serde.string, Serde.string) + .take(1000) + .tap(r => ZIO.logInfo(s"Consumed record $r")) + .runCount + _ <- ZIO.logInfo(s"Consumed $consumed records") + } yield () + + override def run: ZIO[Scope, Any, Any] = + ZIO.logInfo("Starting app") *> + ZIO.addFinalizer(ZIO.logInfo("Stopping app")) *> + runConsumerStream + .provide( + consumerSettings, + ZLayer.succeed(Diagnostics.NoOp), + Consumer.live, + MyKafka.embedded + ) }