diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8763b7e6..ab85676b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,7 +31,7 @@ jobs: fail-fast: false matrix: java: ['adopt@1.8', 'adopt@1.11'] - scala: ['2.12.13', '2.13.5', '3.0.0-RC1'] + scala: ['2.12.13', '2.13.5', '3.0.0-RC2'] platform: ['JVM', 'JS'] steps: - name: Checkout current branch @@ -45,10 +45,10 @@ jobs: - name: Cache scala dependencies uses: coursier/cache-action@v5 - name: Run tests - if: matrix.scala != '3.0.0-RC1' + if: matrix.scala != '3.0.0-RC2' run: sbt ++${{ matrix.scala }}! test${{ matrix.platform }} - name: Run dotty tests - if: matrix.scala == '3.0.0-RC1' && matrix.platform == 'JVM' + if: matrix.scala == '3.0.0-RC2' && matrix.platform == 'JVM' run: sbt ++${{ matrix.scala }}! testJVM publish: diff --git a/build.sbt b/build.sbt index 7b74c2f9..ef4a7bd5 100644 --- a/build.sbt +++ b/build.sbt @@ -40,7 +40,7 @@ lazy val root = project unusedCompileDependenciesFilter -= moduleFilter("org.scala-js", "scalajs-library") ) -val zioVersion = "1.0.5" +val zioVersion = "1.0.6" lazy val interopCats = crossProject(JSPlatform, JVMPlatform) .in(file("interop-cats")) .enablePlugins(BuildInfoPlugin) @@ -50,25 +50,25 @@ lazy val interopCats = crossProject(JSPlatform, JVMPlatform) libraryDependencies ++= Seq( "dev.zio" %%% "zio" % zioVersion, "dev.zio" %%% "zio-test-sbt" % zioVersion % Test, - "org.typelevel" %%% "cats-testkit" % "2.4.2" % Test, - "org.typelevel" %%% "cats-effect-laws" % "2.4.0" % Test, - "org.typelevel" %%% "cats-mtl-laws" % "1.1.2" % Test, - "org.typelevel" %%% "discipline-scalatest" % "2.1.2" % Test + "org.typelevel" %%% "cats-testkit" % "2.5.0" % Test, + "org.typelevel" %%% "cats-effect-laws" % "2.4.1" % Test, + "org.typelevel" %%% "cats-mtl-laws" % "1.1.3" % Test, + "org.typelevel" %%% "discipline-scalatest" % "2.1.3" % Test ), libraryDependencies ++= { if (isDotty.value) { Seq( "dev.zio" %%% "zio-streams" % zioVersion, "dev.zio" %%% "zio-test" % zioVersion, - "org.typelevel" %%% "cats-effect" % "2.4.0", - "org.typelevel" %%% "cats-mtl" % "1.1.2", + "org.typelevel" %%% "cats-effect" % "2.4.1", + "org.typelevel" %%% "cats-mtl" % "1.1.3", "co.fs2" %%% "fs2-core" % "2.5.4" ) } else { Seq( "dev.zio" %%% "zio-streams" % zioVersion % Optional, "dev.zio" %%% "zio-test" % zioVersion % Optional, - "org.typelevel" %%% "cats-effect" % "2.4.0" % Optional, + "org.typelevel" %%% "cats-effect" % "2.4.1" % Optional, "org.typelevel" %%% "cats-mtl" % "1.1.2" % Optional, "co.fs2" %%% "fs2-core" % "2.5.4" % Optional ) @@ -91,7 +91,7 @@ lazy val coreOnlyTest = crossProject(JSPlatform, JVMPlatform) .settings(skip in publish := true) .settings( libraryDependencies ++= Seq( - "org.typelevel" %%% "cats-core" % "2.4.2" % Test, + "org.typelevel" %%% "cats-core" % "2.5.0" % Test, "dev.zio" %%% "zio-test-sbt" % zioVersion % Test ) ) diff --git a/interop-cats/shared/src/main/scala/zio/interop/CHub.scala b/interop-cats/shared/src/main/scala/zio/interop/CHub.scala new file mode 100644 index 00000000..98b16a44 --- /dev/null +++ b/interop-cats/shared/src/main/scala/zio/interop/CHub.scala @@ -0,0 +1,228 @@ +/* + * Copyright 2021 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.interop + +import cats.effect.{ Effect, Resource } +import zio.{ Runtime, ZDequeue, ZEnqueue, ZHub, ZQueue } +import zio.interop.catz._ + +/** + * A `CHub[F, A, B]` is an asynchronous message hub. Publishers can publish + * messages of type `A` to the hub and subscribers can subscribe to take + * messages of type `B` from the hub within the context of the effect `F`. + */ +sealed abstract class CHub[F[+_], -A, +B] extends Serializable { + + /** + * Waits for the hub to be shut down. + */ + def awaitShutdown: F[Unit] + + /** + * The maximum capacity of the hub. + */ + def capacity: Int + + /** + * Checks whether the hub is shut down. + */ + def isShutdown: F[Boolean] + + /** + * Publishes a message to the hub, returning whether the message was + * published to the hub. + */ + def publish(a: A): F[Boolean] + + /** + * Publishes all of the specified messages to the hub, returning whether + * they were published to the hub. + */ + def publishAll(as: Iterable[A]): F[Boolean] + + /** + * Shuts down the hub. + */ + def shutdown: F[Unit] + + /** + * The current number of messages in the hub. + */ + def size: F[Int] + + /** + * Subscribes to receive messages from the hub. The resulting subscription + * can be evaluated multiple times within the scope of the resource to take a + * message from the hub each time. + */ + def subscribe: Resource[F, Dequeue[F, B]] + + /** + * Transforms messages published to the hub using the specified function. + */ + def contramap[C](f: C => A): CHub[F, C, B] + + /** + * Transforms messages published to the hub using the specified effectual + * function. + */ + def contramapM[C](f: C => F[A]): CHub[F, C, B] + + /** + * Transforms messages published to and taken from the hub using the + * specified functions. + */ + def dimap[C, D](f: C => A, g: B => D): CHub[F, C, D] + + /** + * Transforms messages published to and taken from the hub using the + * specified effectual functions. + */ + def dimapM[C, D]( + f: C => F[A], + g: B => F[D] + ): CHub[F, C, D] + + /** + * Filters messages published to the hub using the specified function. + */ + def filterInput[A1 <: A](f: A1 => Boolean): CHub[F, A1, B] + + /** + * Filters messages published to the hub using the specified effectual + * function. + */ + def filterInputM[A1 <: A]( + f: A1 => F[Boolean] + ): CHub[F, A1, B] + + /** + * Filters messages taken from the hub using the specified function. + */ + def filterOutput(f: B => Boolean): CHub[F, A, B] + + /** + * Filters messages taken from the hub using the specified effectual + * function. + */ + def filterOutputM( + f: B => F[Boolean] + ): CHub[F, A, B] + + /** + * Transforms messages taken from the hub using the specified function. + */ + def map[C](f: B => C): CHub[F, A, C] + + /** + * Transforms messages taken from the hub using the specified effectual + * function. + */ + def mapM[C](f: B => F[C]): CHub[F, A, C] + + /** + * Views the hub as a queue that can only be written to. + */ + def toQueue: Enqueue[F, A] +} + +object CHub { + + /** + * Creates a bounded hub with the back pressure strategy. The hub will retain + * messages until they have been taken by all subscribers, applying back + * pressure to publishers if the hub is at capacity. + * + * For best performance use capacities that are powers of two. + */ + def bounded[F[+_]: Effect, A](requestedCapacity: Int)(implicit runtime: Runtime[Any]): F[Hub[F, A]] = + toEffect(ZHub.bounded[A](requestedCapacity).map(hub => CHub(hub))) + + /** + * Creates a bounded hub with the dropping strategy. The hub will drop new + * messages if the hub is at capacity. + * + * For best performance use capacities that are powers of two. + */ + def dropping[F[+_]: Effect, A](requestedCapacity: Int)(implicit runtime: Runtime[Any]): F[Hub[F, A]] = + toEffect(ZHub.dropping[A](requestedCapacity).map(hub => CHub(hub))) + + /** + * Creates a bounded hub with the sliding strategy. The hub will add new + * messages and drop old messages if the hub is at capacity. + * + * For best performance use capacities that are powers of two. + */ + def sliding[F[+_]: Effect, A](requestedCapacity: Int)(implicit runtime: Runtime[Any]): F[Hub[F, A]] = + toEffect(ZHub.sliding[A](requestedCapacity).map(hub => CHub(hub))) + + /** + * Creates an unbounded hub. + */ + def unbounded[F[+_]: Effect, A](implicit runtime: Runtime[Any]): F[Hub[F, A]] = + toEffect(ZHub.unbounded[A].map(hub => CHub(hub))) + + private def apply[F[+_]: Effect, A, B]( + hub: ZHub[Any, Any, Throwable, Throwable, A, B] + )(implicit runtime: Runtime[Any]): CHub[F, A, B] = + new CHub[F, A, B] { self => + def awaitShutdown: F[Unit] = + toEffect(hub.awaitShutdown) + def capacity: Int = + hub.capacity + def isShutdown: F[Boolean] = + toEffect(hub.isShutdown) + def publish(a: A): F[Boolean] = + toEffect(hub.publish(a)) + def publishAll(as: Iterable[A]): F[Boolean] = + toEffect(hub.publishAll(as)) + def shutdown: F[Unit] = + toEffect(hub.shutdown) + def size: F[Int] = + toEffect(hub.size) + def subscribe: Resource[F, Dequeue[F, B]] = + hub.subscribe.map(dequeue => Dequeue[F, B](dequeue)).toResource[F] + def contramap[C](f: C => A): CHub[F, C, B] = + CHub(hub.contramap(f)) + def contramapM[C](f: C => F[A]): CHub[F, C, B] = + CHub(hub.contramapM(c => fromEffect(f(c)))) + def dimap[C, D](f: C => A, g: B => D): CHub[F, C, D] = + CHub(hub.dimap(f, g)) + def dimapM[C, D](f: C => F[A], g: B => F[D]): CHub[F, C, D] = + CHub(hub.dimapM(c => fromEffect(f(c)), b => fromEffect(g(b)))) + def filterInput[A1 <: A](f: A1 => Boolean): CHub[F, A1, B] = + CHub(hub.filterInput(f)) + def filterInputM[A1 <: A](f: A1 => F[Boolean]): CHub[F, A1, B] = + CHub(hub.filterInputM(a => fromEffect(f(a)))) + def filterOutput(f: B => Boolean): CHub[F, A, B] = + CHub(hub.filterOutput(f)) + def filterOutputM(f: B => F[Boolean]): CHub[F, A, B] = + CHub(hub.filterOutputM(a => fromEffect(f(a)))) + def map[C](f: B => C): CHub[F, A, C] = + CHub(hub.map(f)) + def mapM[C](f: B => F[C]): CHub[F, A, C] = + CHub(hub.mapM(a => fromEffect(f(a)))) + def toQueue: Enqueue[F, A] = + Enqueue(hub.toQueue) + } + + private def Dequeue[F[+_], A](dequeue: ZDequeue[Any, Throwable, A]): Dequeue[F, A] = + new Dequeue(dequeue.asInstanceOf[ZQueue[Any, Any, Throwable, Throwable, Nothing, A]]) + + private def Enqueue[F[+_], A](enqueue: ZEnqueue[Any, Throwable, A]): Enqueue[F, A] = + new Enqueue(enqueue.asInstanceOf[ZQueue[Any, Any, Throwable, Throwable, A, Nothing]]) +} diff --git a/interop-cats/shared/src/main/scala/zio/interop/package.scala b/interop-cats/shared/src/main/scala/zio/interop/package.scala index aaeea4ba..926cdb58 100644 --- a/interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -24,6 +24,19 @@ package object interop { type Queue[F[+_], A] = CQueue[F, A, A] + /** + * A queue that can only be dequeued. + */ + type Dequeue[F[+_], +A] = CQueue[F, Nothing, A] + + /** + * A queue that can only be enqueued. + */ + type Enqueue[F[+_], -A] = CQueue[F, A, Nothing] + + type Hub[F[+_], A] = CHub[F, A, A] + val Hub: CHub.type = CHub + @inline private[interop] final def exitToExitCase(exit: Exit[Any, Any]): ExitCase[Throwable] = exit match { case Exit.Success(_) => ExitCase.Completed case Exit.Failure(cause) if cause.interrupted => ExitCase.Canceled diff --git a/project/BuildHelper.scala b/project/BuildHelper.scala index bb3f7027..d90aece0 100644 --- a/project/BuildHelper.scala +++ b/project/BuildHelper.scala @@ -12,7 +12,7 @@ object BuildHelper { val Scala212 = "2.12.13" val Scala213 = "2.13.5" - val Scala3 = "3.0.0-RC1" + val Scala3 = "3.0.0-RC2" private val stdOptions = Seq( "-deprecation", diff --git a/project/build.properties b/project/build.properties index dbae93bc..e67343ae 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.4.9 +sbt.version=1.5.0