Skip to content

Commit

Permalink
Improve example app (#1279)
Browse files Browse the repository at this point in the history
This gives consumer settings its own layer and extracts the streaming
logic. This makes the example app more modular and easier to adapt to
larger applications.
  • Loading branch information
erikvanoosten authored Jul 14, 2024
1 parent 8a03653 commit 50f804c
Showing 1 changed file with 27 additions and 25 deletions.
52 changes: 27 additions & 25 deletions zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,34 @@ object Main extends ZIOAppDefault {

private val topic = "test-topic"

private def consumerLayer(kafka: MyKafka): ZLayer[Any, Throwable, Consumer] = {
val consumerSettings =
ConsumerSettings(kafka.bootstrapServers)
.withPollTimeout(500.millis)
.withGroupId("test")

ZLayer.make[Consumer](
ZLayer.succeed(consumerSettings),
ZLayer.succeed(Diagnostics.NoOp),
Consumer.live
)
private def consumerSettings: ZLayer[MyKafka, Throwable, ConsumerSettings] = ZLayer {
for {
kafka <- ZIO.service[MyKafka]
} yield ConsumerSettings(kafka.bootstrapServers)
.withPollTimeout(500.millis)
.withGroupId("test")
}

override def run: ZIO[ZIOAppArgs with Scope, Any, Any] =
ZIO.addFinalizer(ZIO.logInfo("Stopping app")) *>
(
for {
_ <- ZIO.logInfo(s"Starting app")
kafka <- ZIO.service[MyKafka]
stream = Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.provideLayer(consumerLayer(kafka))
_ <- ZIO.logInfo(s"Consuming messages...")
consumed <- stream.take(1000).tap(r => ZIO.logInfo(s"Consumed record $r")).runCount
_ <- ZIO.logInfo(s"Consumed $consumed records")
} yield ()
).provideSomeLayer[ZIOAppArgs with Scope](MyKafka.embedded)
private val runConsumerStream: ZIO[Consumer, Throwable, Unit] =
for {
_ <- ZIO.logInfo("Consuming messages...")
consumed <- Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.take(1000)
.tap(r => ZIO.logInfo(s"Consumed record $r"))
.runCount
_ <- ZIO.logInfo(s"Consumed $consumed records")
} yield ()

override def run: ZIO[Scope, Any, Any] =
ZIO.logInfo("Starting app") *>
ZIO.addFinalizer(ZIO.logInfo("Stopping app")) *>
runConsumerStream
.provide(
consumerSettings,
ZLayer.succeed(Diagnostics.NoOp),
Consumer.live,
MyKafka.embedded
)

}

0 comments on commit 50f804c

Please sign in to comment.