diff --git a/.mill-version b/.mill-version index cd47247..223df19 100644 --- a/.mill-version +++ b/.mill-version @@ -1 +1 @@ -0.10.10 \ No newline at end of file +0.10.11 diff --git a/build.sc b/build.sc index 162b5ac..6cb4d6e 100644 --- a/build.sc +++ b/build.sc @@ -1,6 +1,8 @@ +import mill.define.Sources import mill.define.Target import mill.util.Jvm import $ivy.`com.lihaoyi::mill-contrib-bloop:$MILL_VERSION` +import $ivy.`com.disneystreaming.smithy4s::smithy4s-mill-codegen-plugin::0.17.4` import $ivy.`io.github.davidgregory084::mill-tpolecat::0.3.2` import $ivy.`io.chris-kipp::mill-ci-release::0.1.5` @@ -13,16 +15,18 @@ import scalanativelib._ import mill.scalajslib.api._ import io.github.davidgregory084._ import io.kipp.mill.ci.release.CiReleaseModule +import _root_.smithy4s.codegen.mill._ object versions { val scala212Version = "2.12.16" val scala213Version = "2.13.10" - val scala3Version = "3.1.2" - val scalaJSVersion = "1.10.1" - val scalaNativeVersion = "0.4.8" + val scala3Version = "3.2.2" + val scalaJSVersion = "1.13.0" + val scalaNativeVersion = "0.4.10" val munitVersion = "0.7.29" val munitNativeVersion = "1.0.0-M7" - val fs2 = "3.3.0" + val jsoniterVersion = "2.21.0" + val fs2 = "3.6.1" val weaver = "0.8.0" val scala213 = "2.13" @@ -40,7 +44,7 @@ import versions._ object core extends RPCCrossPlatformModule { cross => def crossPlatformIvyDeps: T[Agg[Dep]] = Agg( - ivy"com.github.plokhotnyuk.jsoniter-scala::jsoniter-scala-macros::2.17.0" + ivy"com.github.plokhotnyuk.jsoniter-scala::jsoniter-scala-macros::$jsoniterVersion" ) object jvm extends mill.Cross[JvmModule](scala213, scala3) @@ -83,6 +87,41 @@ object fs2 extends RPCCrossPlatformModule { cross => } +object smithy extends JavaModule {} + +object smithy4s extends RPCCrossPlatformModule { cross => + + override def crossPlatformModuleDeps = Seq(fs2) + def crossPlatformIvyDeps: T[Agg[Dep]] = Agg( + ivy"com.disneystreaming.smithy4s::smithy4s-json::${_root_.smithy4s.codegen.BuildInfo.version}" + ) + + // A module holding the code-generation logic to help cache that task + object gen extends Smithy4sModule { + def scalaVersion = "2.13.10" + def smithy4sInternalDependenciesAsJars = T { + List(smithy.jar()) + } + } + + object jvm extends mill.Cross[JvmModule](scala213, scala3) + def sharedSmithy = T.sources(T.workspace / "smithy" / "resources" / "META-INF" / "smithy") + class JvmModule(cv: String) extends cross.JVM(cv) with Smithy4sModule { + def smithy4sInputDirs = sharedSmithy + } + + object js extends mill.Cross[JsModule](scala213, scala3) + class JsModule(cv: String) extends cross.JS(cv) with Smithy4sModule { + def smithy4sInputDirs = sharedSmithy + } + + object native extends mill.Cross[NativeModule](scala3) + class NativeModule(cv: String) extends cross.Native(cv) with Smithy4sModule { + def smithy4sInputDirs = sharedSmithy + } + +} + object examples extends mill.define.Module { object server extends ScalaModule { @@ -101,6 +140,27 @@ object examples extends mill.define.Module { } } + object smithyShared extends Smithy4sModule { + def moduleDeps = Seq(smithy4s.jvm(versions.scala213)) + def scalaVersion = versions.scala213Version + } + + object smithyServer extends ScalaModule { + def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}") + def moduleDeps = Seq(fs2.jvm(versions.scala213), smithyShared) + def scalaVersion = versions.scala213Version + } + + object smithyClient extends ScalaModule { + def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}") + def moduleDeps = Seq(fs2.jvm(versions.scala213), smithyShared) + def scalaVersion = versions.scala213Version + def forkEnv: Target[Map[String, String]] = T { + val assembledServer = smithyServer.assembly() + super.forkEnv() ++ Map("SERVER_JAR" -> assembledServer.path.toString()) + } + } + } // ############################################################################# diff --git a/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala b/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala new file mode 100644 index 0000000..154e66d --- /dev/null +++ b/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala @@ -0,0 +1,61 @@ +package examples.smithy.client + +import fs2.Stream +import cats.effect._ +import cats.syntax.all._ +import scala.jdk.CollectionConverters._ +import java.io.OutputStream + +trait ChildProcess[F[_]] { + def stdin: fs2.Pipe[F, Byte, Unit] + def stdout: Stream[F, Byte] + def stderr: Stream[F, Byte] +} + +object ChildProcess { + + def spawn[F[_]: Async](command: String*): Stream[F, ChildProcess[F]] = + Stream.resource(startRes(command)) + + val readBufferSize = 512 + + private def startRes[F[_]: Async](command: Seq[String]) = Resource + .make { + Async[F].interruptible(new java.lang.ProcessBuilder(command.asJava).start()) + } { p => + Sync[F].interruptible(p.destroy()) + } + .map { p => + val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit())) + new ChildProcess[F] { + def stdin: fs2.Pipe[F, Byte, Unit] = + writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream())) + + def stdout: fs2.Stream[F, Byte] = fs2.io + .readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize) + + def stderr: fs2.Stream[F, Byte] = fs2.io + .readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize) + // Avoids broken pipe - we cut off when the program ends. + // Users can decide what to do with the error logs using the exitCode value + .interruptWhen(done.void.attempt) + } + } + + /** Adds a flush after each chunk + */ + def writeOutputStreamFlushingChunks[F[_]]( + fos: F[OutputStream], + closeAfterUse: Boolean = true + )(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] = + s => { + def useOs(os: OutputStream): Stream[F, Nothing] = + s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush())) + + val os = + if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close())) + else Stream.eval(fos) + os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush()))) + } + +} diff --git a/examples/smithyClient/src/examples/smithy/client/ClientMain.scala b/examples/smithyClient/src/examples/smithy/client/ClientMain.scala new file mode 100644 index 0000000..049e08c --- /dev/null +++ b/examples/smithyClient/src/examples/smithy/client/ClientMain.scala @@ -0,0 +1,60 @@ +package examples.smithy.client + +import cats.effect._ +import cats.syntax.all._ +import fs2.Stream +import fs2.io._ +import jsonrpclib.CallId +import jsonrpclib.fs2._ +import jsonrpclib.smithy4sinterop.ClientStub +import jsonrpclib.smithy4sinterop.ServerEndpoints +import test._ + +import java.io.InputStream +import java.io.OutputStream + +object SmithyClientMain extends IOApp.Simple { + + // Reserving a method for cancelation. + val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity) + + type IOStream[A] = fs2.Stream[IO, A] + def log(str: String): IOStream[Unit] = Stream.eval(IO.consoleForIO.errorln(str)) + + // Implementing the generated interface + object Client extends TestClient[IO] { + def pong(pong: String): IO[Unit] = IO.consoleForIO.errorln(s"Client received pong: $pong") + } + + def run: IO[Unit] = { + import scala.concurrent.duration._ + val run = for { + //////////////////////////////////////////////////////// + /////// BOOTSTRAPPING + //////////////////////////////////////////////////////// + _ <- log("Starting client") + serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist")) + // Starting the server + rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar) + // Creating a channel that will be used to communicate to the server + fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some) + // Mounting our implementation of the generated interface onto the channel + _ <- fs2Channel.withEndpointsStream(ServerEndpoints(Client)) + // Creating stubs to talk to the remote server + server: TestServer[IO] <- ClientStub.stream(test.TestServer, fs2Channel) + _ <- Stream(()) + .concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin)) + .concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce)) + .concurrently(rp.stderr.through(fs2.io.stderr[IO])) + + //////////////////////////////////////////////////////// + /////// INTERACTION + //////////////////////////////////////////////////////// + result1 <- Stream.eval(server.greet("Client")) + _ <- log(s"Client received $result1") + _ <- Stream.eval(server.ping("Ping")) + } yield () + run.compile.drain.guarantee(IO.consoleForIO.errorln("Terminating client")) + } + +} diff --git a/examples/smithyServer/src/examples/smithy/server/ServerMain.scala b/examples/smithyServer/src/examples/smithy/server/ServerMain.scala new file mode 100644 index 0000000..279ef38 --- /dev/null +++ b/examples/smithyServer/src/examples/smithy/server/ServerMain.scala @@ -0,0 +1,42 @@ +package examples.smithy.server + +import jsonrpclib.CallId +import jsonrpclib.fs2._ +import cats.effect._ +import fs2.io._ +import jsonrpclib.Endpoint +import cats.syntax.all._ +import test._ // smithy4s-generated package +import jsonrpclib.smithy4sinterop.ClientStub +import jsonrpclib.smithy4sinterop.ServerEndpoints + +object ServerMain extends IOApp.Simple { + + // Reserving a method for cancelation. + val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity) + + // Implementing the generated interface + class ServerImpl(client: TestClient[IO]) extends TestServer[IO] { + def greet(name: String): IO[GreetOutput] = IO.pure(GreetOutput(s"Server says: hello $name !")) + + def ping(ping: String): IO[Unit] = client.pong(s"Returned to sender: $ping") + } + + def printErr(s: String): IO[Unit] = IO.consoleForIO.errorln(s) + + def run: IO[Unit] = { + val run = for { + channel <- FS2Channel[IO](cancelTemplate = Some(cancelEndpoint)) + testClient <- ClientStub.stream(TestClient, channel) + _ <- channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient))) + _ <- fs2.Stream + .eval(IO.never) // running the server forever + .concurrently(stdin[IO](512).through(lsp.decodeMessages).through(channel.inputOrBounce)) + .concurrently(channel.output.through(lsp.encodeMessages).through(stdout[IO])) + } yield {} + + // Using errorln as stdout is used by the RPC channel + printErr("Starting server") >> run.compile.drain.guarantee(printErr("Terminating server")) + } + +} diff --git a/examples/smithyShared/smithy/spec.smithy b/examples/smithyShared/smithy/spec.smithy new file mode 100644 index 0000000..518c745 --- /dev/null +++ b/examples/smithyShared/smithy/spec.smithy @@ -0,0 +1,45 @@ +$version: "2.0" + +namespace test + +use jsonrpclib#jsonRequest +use jsonrpclib#jsonRPC +use jsonrpclib#jsonNotification + +@jsonRPC +service TestServer { + operations: [Greet, Ping] +} + +@jsonRPC +service TestClient { + operations: [Pong] +} + +@jsonRequest("greet") +operation Greet { + input := { + @required + name: String + } + output := { + @required + message: String + } +} + +@jsonNotification("ping") +operation Ping { + input := { + @required + ping: String + } +} + +@jsonNotification("pong") +operation Pong { + input := { + @required + pong: String + } +} diff --git a/smithy/resources/META-INF/smithy/jsonrpclib.smithy b/smithy/resources/META-INF/smithy/jsonrpclib.smithy new file mode 100644 index 0000000..5d4e6f3 --- /dev/null +++ b/smithy/resources/META-INF/smithy/jsonrpclib.smithy @@ -0,0 +1,23 @@ +$version: "2.0" + +namespace jsonrpclib + +/// the JSON-RPC protocol, +/// see https://www.jsonrpc.org/specification +@protocolDefinition(traits: [ + jsonRequest + jsonNotification +]) +@trait(selector: "service") +structure jsonRPC { +} + +/// Identifies an operation that abides by request/response semantics +/// https://www.jsonrpc.org/specification#request_object +@trait(selector: "operation") +string jsonRequest + +/// Identifies an operation that abides by fire-and-forget semantics +/// see https://www.jsonrpc.org/specification#notification +@trait(selector: "operation") +string jsonNotification diff --git a/smithy/resources/META-INF/smithy/manifest b/smithy/resources/META-INF/smithy/manifest new file mode 100644 index 0000000..94839e2 --- /dev/null +++ b/smithy/resources/META-INF/smithy/manifest @@ -0,0 +1 @@ +jsonrpclib.smithy diff --git a/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala b/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala new file mode 100644 index 0000000..07f72ec --- /dev/null +++ b/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala @@ -0,0 +1,88 @@ +package jsonrpclib.smithy4sinterop + +import cats.MonadThrow +import jsonrpclib.fs2._ +import smithy4s.Service +import smithy4s.http.json.JCodec +import smithy4s.schema._ +import cats.effect.kernel.Async +import smithy4s.kinds.PolyFunction5 +import smithy4s.ShapeId +import cats.syntax.all._ + +object ClientStub { + + def apply[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit + F: Async[F] + ): F[service.Impl[F]] = new ClientStub(service, channel).compile + + def stream[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit + F: Async[F] + ): fs2.Stream[F, service.Impl[F]] = fs2.Stream.eval(new ClientStub(service, channel).compile) +} + +private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg], channel: FS2Channel[F])(implicit + F: Async[F] +) { + + def compile: F[service.Impl[F]] = precompileAll.map { stubCache => + val interpreter = new service.FunctorInterpreter[F] { + def apply[I, E, O, SI, SO](op: service.Operation[I, E, O, SI, SO]): F[O] = { + val (input, smithy4sEndpoint) = service.endpoint(op) + (stubCache(smithy4sEndpoint): F[I => F[O]]).flatMap { stub => + stub(input) + } + } + } + service.fromPolyFunction(interpreter) + } + + private type Stub[I, E, O, SI, SO] = F[I => F[O]] + private val precompileAll: F[PolyFunction5[service.Endpoint, Stub]] = { + F.ref(Map.empty[ShapeId, Any]).flatMap { cache => + service.endpoints + .traverse_ { ep => + val shapeId = ep.id + EndpointSpec.fromHints(ep.hints).liftTo[F](NotJsonRPCEndpoint(shapeId)).flatMap { epSpec => + val stub = jsonRPCStub(ep, epSpec) + cache.update(_ + (shapeId -> stub)) + } + } + .as { + new PolyFunction5[service.Endpoint, Stub] { + def apply[I, E, O, SI, SO](ep: service.Endpoint[I, E, O, SI, SO]): Stub[I, E, O, SI, SO] = { + cache.get.map { c => + c(ep.id).asInstanceOf[I => F[O]] + } + } + } + } + } + } + + def jsonRPCStub[I, E, O, SI, SO]( + smithy4sEndpoint: service.Endpoint[I, E, O, SI, SO], + endpointSpec: EndpointSpec + ): I => F[O] = { + + implicit val inputCodec: JCodec[I] = JCodec.fromSchema(smithy4sEndpoint.input) + implicit val outputCodec: JCodec[O] = JCodec.fromSchema(smithy4sEndpoint.output) + + endpointSpec match { + case EndpointSpec.Notification(methodName) => + val coerce = coerceUnit[O](smithy4sEndpoint.output) + channel.notificationStub[I](methodName).andThen(_ *> coerce) + case EndpointSpec.Request(methodName) => + channel.simpleStub[I, O](methodName) + } + } + + case class NotJsonRPCEndpoint(shapeId: ShapeId) extends Throwable + case object NotUnitReturnType extends Throwable + private def coerceUnit[A](schema: Schema[A]): F[A] = + schema match { + case Schema.PrimitiveSchema(_, _, Primitive.PUnit) => MonadThrow[F].unit + case _ => MonadThrow[F].raiseError[A](NotUnitReturnType) + } + +} diff --git a/smithy4s/src/jsonrpclib/smithy4sinterop/EndpointSpec.scala b/smithy4s/src/jsonrpclib/smithy4sinterop/EndpointSpec.scala new file mode 100644 index 0000000..2e29930 --- /dev/null +++ b/smithy4s/src/jsonrpclib/smithy4sinterop/EndpointSpec.scala @@ -0,0 +1,15 @@ +package jsonrpclib.smithy4sinterop + +import smithy4s.Hints + +sealed trait EndpointSpec +object EndpointSpec { + case class Notification(methodName: String) extends EndpointSpec + case class Request(methodName: String) extends EndpointSpec + + def fromHints(hints: Hints): Option[EndpointSpec] = hints match { + case jsonrpclib.JsonRequest.hint(r) => Some(Request(r.value)) + case jsonrpclib.JsonNotification.hint(r) => Some(Notification(r.value)) + case _ => None + } +} diff --git a/smithy4s/src/jsonrpclib/smithy4sinterop/ServerEndpoints.scala b/smithy4s/src/jsonrpclib/smithy4sinterop/ServerEndpoints.scala new file mode 100644 index 0000000..3aa3c57 --- /dev/null +++ b/smithy4s/src/jsonrpclib/smithy4sinterop/ServerEndpoints.scala @@ -0,0 +1,50 @@ +package jsonrpclib.smithy4sinterop + +import _root_.smithy4s.{Endpoint => Smithy4sEndpoint} +import cats.MonadThrow +import cats.syntax.all._ +import jsonrpclib.Endpoint +import jsonrpclib.fs2._ +import smithy4s.Service +import smithy4s.http.json.JCodec +import smithy4s.kinds.FunctorAlgebra +import smithy4s.kinds.FunctorInterpreter + +object ServerEndpoints { + + def apply[Alg[_[_, _, _, _, _]], F[_]]( + impl: FunctorAlgebra[Alg, F] + )(implicit service: Service[Alg], F: MonadThrow[F]): List[Endpoint[F]] = { + val interpreter: service.FunctorInterpreter[F] = service.toPolyFunction(impl) + service.endpoints.flatMap { smithy4sEndpoint => + EndpointSpec.fromHints(smithy4sEndpoint.hints).map { endpointSpec => + jsonRPCEndpoint(smithy4sEndpoint, endpointSpec, interpreter) + } + } + + } + + // TODO : codify errors at smithy level and handle them. + def jsonRPCEndpoint[F[_]: MonadThrow, Op[_, _, _, _, _], I, E, O, SI, SO]( + smithy4sEndpoint: Smithy4sEndpoint[Op, I, E, O, SI, SO], + endpointSpec: EndpointSpec, + impl: FunctorInterpreter[Op, F] + ): Endpoint[F] = { + + implicit val inputCodec: JCodec[I] = JCodec.fromSchema(smithy4sEndpoint.input) + implicit val outputCodec: JCodec[O] = JCodec.fromSchema(smithy4sEndpoint.output) + + endpointSpec match { + case EndpointSpec.Notification(methodName) => + Endpoint[F](methodName).notification { (input: I) => + val op = smithy4sEndpoint.wrap(input) + impl(op).void + } + case EndpointSpec.Request(methodName) => + Endpoint[F](methodName).simple { (input: I) => + val op = smithy4sEndpoint.wrap(input) + impl(op) + } + } + } +}