Skip to content

Commit

Permalink
Update ZIO to 2.0.0-M5
Browse files Browse the repository at this point in the history
  • Loading branch information
jczuchnowski committed Nov 14, 2021
1 parent 63eea20 commit 2fed32d
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 20 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
val zioVersion = "1.0.8"
val zioVersion = "2.0.0-M5"

inThisBuild(
List(
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/zio/pulsar/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.apache.pulsar.client.api.{
PulsarClientException,
}
import zio.{ IO, ZIO }
import zio.blocking._
//import zio.blocking._
import zio.stream._

final class Consumer[M](val consumer: JConsumer[M]):
Expand All @@ -24,5 +24,5 @@ final class Consumer[M](val consumer: JConsumer[M]):
val receiveAsync: IO[PulsarClientException, Message[M]] =
ZIO.fromCompletionStage(consumer.receiveAsync).refineToOrDie[PulsarClientException]

val receiveStream: ZStream[Blocking, PulsarClientException, Message[M]] =
ZStream.repeatEffect(effectBlocking(consumer.receive).refineToOrDie[PulsarClientException])
val receiveStream: Stream[PulsarClientException, Message[M]] =
ZStream.repeatEffect(ZIO.attemptBlocking(consumer.receive).refineToOrDie[PulsarClientException])
3 changes: 1 addition & 2 deletions core/src/main/scala/zio/pulsar/ConsumerBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import org.apache.pulsar.client.api.{
Schema,
SubscriptionInitialPosition,
}
import zio.{ Has, ZIO, ZManaged }
import zio.duration.Duration
import zio.{ Duration, Has, ZIO, ZManaged }

case class Subscription[K <: SubscriptionKind](
name: String,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/zio/pulsar/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ final class Producer[M] private (val producer: JProducer[M]):
def sendAsync(message: M): IO[PulsarClientException, MessageId] =
ZIO.fromCompletionStage(producer.sendAsync(message)).refineToOrDie[PulsarClientException]

def asSink: Sink[PulsarClientException, M, M, Unit] = ZSink.foreach(m => send(m))
def asSink: Sink[PulsarClientException, M, PulsarClientException, M, Unit] = ZSink.foreach(m => send(m))

def asSinkAsync: Sink[PulsarClientException, M, M, Unit] = ZSink.foreach(m => sendAsync(m))
def asSinkAsync: Sink[PulsarClientException, M, PulsarClientException, M, Unit] = ZSink.foreach(m => sendAsync(m))

object Producer:

Expand Down
9 changes: 3 additions & 6 deletions examples/src/main/scala/FanoutStreamExample.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package examples

import zio._
import zio.blocking._
import zio.clock._
import zio.console._
import zio.pulsar._
import zio.stm._
import zio.stream._
Expand Down Expand Up @@ -34,18 +31,18 @@ object FanoutStreamExample extends App:
_ <- stream.run(sink).toManaged_
yield ()

val consumer: ZManaged[Has[PulsarClient] with Console with Blocking, Throwable, Unit] =
val consumer: ZManaged[Has[PulsarClient] with Has[Console], Throwable, Unit] =
for
builder <- ConsumerBuilder.make(Schema.STRING).toManaged_
consumer <- builder
.subscription(Subscription("my-subscription", SubscriptionType.Exclusive))
.pattern(s"$pattern.*")
.build
_ <- consumer.receiveStream.take(10).foreach { a =>
putStrLn("Received: (id: " + a.getMessageId + ") " + a.getValue) *>
Console.putStrLn("Received: (id: " + a.getMessageId + ") " + a.getValue) *>
consumer.acknowledge(a.getMessageId)
}.toManaged_
_ <- putStrLn("Finished").toManaged_
_ <- Console.putStrLn("Finished").toManaged_
yield ()

val app =
Expand Down
2 changes: 0 additions & 2 deletions examples/src/main/scala/SingleMessageExample.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package examples

import zio._
import zio.clock._
import zio.console._
import zio.pulsar._
import org.apache.pulsar.client.api.{ PulsarClientException, RegexSubscriptionMode, Schema }
import RegexSubscriptionMode._
Expand Down
5 changes: 1 addition & 4 deletions examples/src/main/scala/StreamingExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package examples

import org.apache.pulsar.client.api.{ PulsarClientException, Schema }
import zio._
import zio.blocking._
import zio.clock._
import zio.console._
import zio.pulsar._
import zio.stream._

Expand All @@ -26,7 +23,7 @@ object StreamingExample extends App:
_ <- stream.run(sink).toManaged_
yield ()

val consumer: ZManaged[Has[PulsarClient] with Blocking, PulsarClientException, Unit] =
val consumer: ZManaged[Has[PulsarClient], PulsarClientException, Unit] =
for
builder <- ConsumerBuilder.make(Schema.STRING).toManaged_
consumer <- builder
Expand Down

0 comments on commit 2fed32d

Please sign in to comment.