From 2fed32d45c27ce0ea6f464779426b2e63253c862 Mon Sep 17 00:00:00 2001 From: jczuchnowski Date: Sun, 14 Nov 2021 22:06:06 +0100 Subject: [PATCH] Update ZIO to 2.0.0-M5 --- build.sbt | 2 +- core/src/main/scala/zio/pulsar/Consumer.scala | 6 +++--- core/src/main/scala/zio/pulsar/ConsumerBuilder.scala | 3 +-- core/src/main/scala/zio/pulsar/Producer.scala | 4 ++-- examples/src/main/scala/FanoutStreamExample.scala | 9 +++------ examples/src/main/scala/SingleMessageExample.scala | 2 -- examples/src/main/scala/StreamingExample.scala | 5 +---- 7 files changed, 11 insertions(+), 20 deletions(-) diff --git a/build.sbt b/build.sbt index c82a3f6..b576176 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,4 @@ -val zioVersion = "1.0.8" +val zioVersion = "2.0.0-M5" inThisBuild( List( diff --git a/core/src/main/scala/zio/pulsar/Consumer.scala b/core/src/main/scala/zio/pulsar/Consumer.scala index ba97246..0dea68a 100644 --- a/core/src/main/scala/zio/pulsar/Consumer.scala +++ b/core/src/main/scala/zio/pulsar/Consumer.scala @@ -7,7 +7,7 @@ import org.apache.pulsar.client.api.{ PulsarClientException, } import zio.{ IO, ZIO } -import zio.blocking._ +//import zio.blocking._ import zio.stream._ final class Consumer[M](val consumer: JConsumer[M]): @@ -24,5 +24,5 @@ final class Consumer[M](val consumer: JConsumer[M]): val receiveAsync: IO[PulsarClientException, Message[M]] = ZIO.fromCompletionStage(consumer.receiveAsync).refineToOrDie[PulsarClientException] - val receiveStream: ZStream[Blocking, PulsarClientException, Message[M]] = - ZStream.repeatEffect(effectBlocking(consumer.receive).refineToOrDie[PulsarClientException]) + val receiveStream: Stream[PulsarClientException, Message[M]] = + ZStream.repeatEffect(ZIO.attemptBlocking(consumer.receive).refineToOrDie[PulsarClientException]) diff --git a/core/src/main/scala/zio/pulsar/ConsumerBuilder.scala b/core/src/main/scala/zio/pulsar/ConsumerBuilder.scala index c3e3813..346ad6d 100644 --- a/core/src/main/scala/zio/pulsar/ConsumerBuilder.scala +++ b/core/src/main/scala/zio/pulsar/ConsumerBuilder.scala @@ -14,8 +14,7 @@ import org.apache.pulsar.client.api.{ Schema, SubscriptionInitialPosition, } -import zio.{ Has, ZIO, ZManaged } -import zio.duration.Duration +import zio.{ Duration, Has, ZIO, ZManaged } case class Subscription[K <: SubscriptionKind]( name: String, diff --git a/core/src/main/scala/zio/pulsar/Producer.scala b/core/src/main/scala/zio/pulsar/Producer.scala index c2141f2..252ad6f 100644 --- a/core/src/main/scala/zio/pulsar/Producer.scala +++ b/core/src/main/scala/zio/pulsar/Producer.scala @@ -12,9 +12,9 @@ final class Producer[M] private (val producer: JProducer[M]): def sendAsync(message: M): IO[PulsarClientException, MessageId] = ZIO.fromCompletionStage(producer.sendAsync(message)).refineToOrDie[PulsarClientException] - def asSink: Sink[PulsarClientException, M, M, Unit] = ZSink.foreach(m => send(m)) + def asSink: Sink[PulsarClientException, M, PulsarClientException, M, Unit] = ZSink.foreach(m => send(m)) - def asSinkAsync: Sink[PulsarClientException, M, M, Unit] = ZSink.foreach(m => sendAsync(m)) + def asSinkAsync: Sink[PulsarClientException, M, PulsarClientException, M, Unit] = ZSink.foreach(m => sendAsync(m)) object Producer: diff --git a/examples/src/main/scala/FanoutStreamExample.scala b/examples/src/main/scala/FanoutStreamExample.scala index 83fad36..d6ba0db 100644 --- a/examples/src/main/scala/FanoutStreamExample.scala +++ b/examples/src/main/scala/FanoutStreamExample.scala @@ -1,9 +1,6 @@ package examples import zio._ -import zio.blocking._ -import zio.clock._ -import zio.console._ import zio.pulsar._ import zio.stm._ import zio.stream._ @@ -34,7 +31,7 @@ object FanoutStreamExample extends App: _ <- stream.run(sink).toManaged_ yield () - val consumer: ZManaged[Has[PulsarClient] with Console with Blocking, Throwable, Unit] = + val consumer: ZManaged[Has[PulsarClient] with Has[Console], Throwable, Unit] = for builder <- ConsumerBuilder.make(Schema.STRING).toManaged_ consumer <- builder @@ -42,10 +39,10 @@ object FanoutStreamExample extends App: .pattern(s"$pattern.*") .build _ <- consumer.receiveStream.take(10).foreach { a => - putStrLn("Received: (id: " + a.getMessageId + ") " + a.getValue) *> + Console.putStrLn("Received: (id: " + a.getMessageId + ") " + a.getValue) *> consumer.acknowledge(a.getMessageId) }.toManaged_ - _ <- putStrLn("Finished").toManaged_ + _ <- Console.putStrLn("Finished").toManaged_ yield () val app = diff --git a/examples/src/main/scala/SingleMessageExample.scala b/examples/src/main/scala/SingleMessageExample.scala index ae9e7fb..68eb797 100644 --- a/examples/src/main/scala/SingleMessageExample.scala +++ b/examples/src/main/scala/SingleMessageExample.scala @@ -1,8 +1,6 @@ package examples import zio._ -import zio.clock._ -import zio.console._ import zio.pulsar._ import org.apache.pulsar.client.api.{ PulsarClientException, RegexSubscriptionMode, Schema } import RegexSubscriptionMode._ diff --git a/examples/src/main/scala/StreamingExample.scala b/examples/src/main/scala/StreamingExample.scala index c8e4263..593f9d5 100644 --- a/examples/src/main/scala/StreamingExample.scala +++ b/examples/src/main/scala/StreamingExample.scala @@ -2,9 +2,6 @@ package examples import org.apache.pulsar.client.api.{ PulsarClientException, Schema } import zio._ -import zio.blocking._ -import zio.clock._ -import zio.console._ import zio.pulsar._ import zio.stream._ @@ -26,7 +23,7 @@ object StreamingExample extends App: _ <- stream.run(sink).toManaged_ yield () - val consumer: ZManaged[Has[PulsarClient] with Blocking, PulsarClientException, Unit] = + val consumer: ZManaged[Has[PulsarClient], PulsarClientException, Unit] = for builder <- ConsumerBuilder.make(Schema.STRING).toManaged_ consumer <- builder