Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smithy4s integration #58

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .mill-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.10.10
0.10.11
70 changes: 65 additions & 5 deletions build.sc
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import mill.define.Sources
import mill.define.Target
import mill.util.Jvm
import $ivy.`com.lihaoyi::mill-contrib-bloop:$MILL_VERSION`
import $ivy.`com.disneystreaming.smithy4s::smithy4s-mill-codegen-plugin::0.17.4`
import $ivy.`io.github.davidgregory084::mill-tpolecat::0.3.2`
import $ivy.`io.chris-kipp::mill-ci-release::0.1.5`

Expand All @@ -13,16 +15,18 @@ import scalanativelib._
import mill.scalajslib.api._
import io.github.davidgregory084._
import io.kipp.mill.ci.release.CiReleaseModule
import _root_.smithy4s.codegen.mill._

object versions {
val scala212Version = "2.12.16"
val scala213Version = "2.13.10"
val scala3Version = "3.1.2"
val scalaJSVersion = "1.10.1"
val scalaNativeVersion = "0.4.8"
val scala3Version = "3.2.2"
val scalaJSVersion = "1.13.0"
val scalaNativeVersion = "0.4.10"
val munitVersion = "0.7.29"
val munitNativeVersion = "1.0.0-M7"
val fs2 = "3.3.0"
val jsoniterVersion = "2.21.0"
val fs2 = "3.6.1"
val weaver = "0.8.0"

val scala213 = "2.13"
Expand All @@ -40,7 +44,7 @@ import versions._
object core extends RPCCrossPlatformModule { cross =>

def crossPlatformIvyDeps: T[Agg[Dep]] = Agg(
ivy"com.github.plokhotnyuk.jsoniter-scala::jsoniter-scala-macros::2.17.0"
ivy"com.github.plokhotnyuk.jsoniter-scala::jsoniter-scala-macros::$jsoniterVersion"
)

object jvm extends mill.Cross[JvmModule](scala213, scala3)
Expand Down Expand Up @@ -83,6 +87,41 @@ object fs2 extends RPCCrossPlatformModule { cross =>

}

object smithy extends JavaModule {}

object smithy4s extends RPCCrossPlatformModule { cross =>

override def crossPlatformModuleDeps = Seq(fs2)
def crossPlatformIvyDeps: T[Agg[Dep]] = Agg(
ivy"com.disneystreaming.smithy4s::smithy4s-json::${_root_.smithy4s.codegen.BuildInfo.version}"
)

// A module holding the code-generation logic to help cache that task
object gen extends Smithy4sModule {
def scalaVersion = "2.13.10"
def smithy4sInternalDependenciesAsJars = T {
List(smithy.jar())
}
}

object jvm extends mill.Cross[JvmModule](scala213, scala3)
def sharedSmithy = T.sources(T.workspace / "smithy" / "resources" / "META-INF" / "smithy")
class JvmModule(cv: String) extends cross.JVM(cv) with Smithy4sModule {
def smithy4sInputDirs = sharedSmithy
}

object js extends mill.Cross[JsModule](scala213, scala3)
class JsModule(cv: String) extends cross.JS(cv) with Smithy4sModule {
def smithy4sInputDirs = sharedSmithy
}

object native extends mill.Cross[NativeModule](scala3)
class NativeModule(cv: String) extends cross.Native(cv) with Smithy4sModule {
def smithy4sInputDirs = sharedSmithy
}

}

object examples extends mill.define.Module {

object server extends ScalaModule {
Expand All @@ -101,6 +140,27 @@ object examples extends mill.define.Module {
}
}

object smithyShared extends Smithy4sModule {
def moduleDeps = Seq(smithy4s.jvm(versions.scala213))
def scalaVersion = versions.scala213Version
}

object smithyServer extends ScalaModule {
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
def moduleDeps = Seq(fs2.jvm(versions.scala213), smithyShared)
def scalaVersion = versions.scala213Version
}

object smithyClient extends ScalaModule {
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
def moduleDeps = Seq(fs2.jvm(versions.scala213), smithyShared)
def scalaVersion = versions.scala213Version
def forkEnv: Target[Map[String, String]] = T {
val assembledServer = smithyServer.assembly()
super.forkEnv() ++ Map("SERVER_JAR" -> assembledServer.path.toString())
}
}

}

// #############################################################################
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package examples.smithy.client

import fs2.Stream
import cats.effect._
import cats.syntax.all._
import scala.jdk.CollectionConverters._
import java.io.OutputStream

trait ChildProcess[F[_]] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking forward to killing this off when fs2.io has a solution for it

def stdin: fs2.Pipe[F, Byte, Unit]
def stdout: Stream[F, Byte]
def stderr: Stream[F, Byte]
}

object ChildProcess {

def spawn[F[_]: Async](command: String*): Stream[F, ChildProcess[F]] =
Stream.resource(startRes(command))

val readBufferSize = 512

private def startRes[F[_]: Async](command: Seq[String]) = Resource
.make {
Async[F].interruptible(new java.lang.ProcessBuilder(command.asJava).start())
} { p =>
Sync[F].interruptible(p.destroy())
}
.map { p =>
val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit()))
new ChildProcess[F] {
def stdin: fs2.Pipe[F, Byte, Unit] =
writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream()))

def stdout: fs2.Stream[F, Byte] = fs2.io
.readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize)

def stderr: fs2.Stream[F, Byte] = fs2.io
.readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize)
// Avoids broken pipe - we cut off when the program ends.
// Users can decide what to do with the error logs using the exitCode value
.interruptWhen(done.void.attempt)
Comment on lines +34 to +41

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readInputStream is not using interruptible looks like.
https://github.com/typelevel/fs2/blob/56b86022d48d3caf0a9b5f00bf637cac62bb7f5e/io/shared/src/main/scala/fs2/io/io.scala#L46

So that would probably explain why it is hanging. We can try swapping that, but as you mentioned interruptibility might be broken like it is for stdin. In which case I suppose each Process would have to start its own dedicated blocking threads that it can kill 🤔

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I published 3.7-ae0fc02-SNAPSHOT which swapped in interruptible, you can give that a try and see if it helps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't resolve yet, are you sure it's the right version ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh, my bad. Will try in a bit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm I only get the initial prints from both the client and server side, but then nothing ... It may be because chunks are not automatically flushed in your implementation. That's why I couldn't use the Davenverse lib directly :

https://github.com/neandertech/jsonrpclib/pull/58/files#diff-281c0b146d38f8e1fc9f7c77eb87773debe12d364454ef6766472a26563a0ee6R47-R59

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, I missed that. Thanks!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, try this one 3.7-eacce62-SNAPSHOT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works 🎉 most things that should print actually print (except for the server termination message but that's some userland problem), and the program terminates as expected !

}
}

/** Adds a flush after each chunk
*/
def writeOutputStreamFlushingChunks[F[_]](
fos: F[OutputStream],
closeAfterUse: Boolean = true
)(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] =
s => {
def useOs(os: OutputStream): Stream[F, Nothing] =
s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush()))

val os =
if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close()))
else Stream.eval(fos)
os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush())))
}

}
60 changes: 60 additions & 0 deletions examples/smithyClient/src/examples/smithy/client/ClientMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package examples.smithy.client

import cats.effect._
import cats.syntax.all._
import fs2.Stream
import fs2.io._
import jsonrpclib.CallId
import jsonrpclib.fs2._
import jsonrpclib.smithy4sinterop.ClientStub
import jsonrpclib.smithy4sinterop.ServerEndpoints
import test._

import java.io.InputStream
import java.io.OutputStream

object SmithyClientMain extends IOApp.Simple {

// Reserving a method for cancelation.
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)

type IOStream[A] = fs2.Stream[IO, A]
def log(str: String): IOStream[Unit] = Stream.eval(IO.consoleForIO.errorln(str))

// Implementing the generated interface
object Client extends TestClient[IO] {
def pong(pong: String): IO[Unit] = IO.consoleForIO.errorln(s"Client received pong: $pong")
}

def run: IO[Unit] = {
import scala.concurrent.duration._
val run = for {
////////////////////////////////////////////////////////
/////// BOOTSTRAPPING
////////////////////////////////////////////////////////
_ <- log("Starting client")
serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist"))
// Starting the server
rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar)
// Creating a channel that will be used to communicate to the server
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some)
// Mounting our implementation of the generated interface onto the channel
_ <- fs2Channel.withEndpointsStream(ServerEndpoints(Client))
// Creating stubs to talk to the remote server
server: TestServer[IO] <- ClientStub.stream(test.TestServer, fs2Channel)
_ <- Stream(())
.concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin))
.concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce))
.concurrently(rp.stderr.through(fs2.io.stderr[IO]))

////////////////////////////////////////////////////////
/////// INTERACTION
////////////////////////////////////////////////////////
result1 <- Stream.eval(server.greet("Client"))
_ <- log(s"Client received $result1")
_ <- Stream.eval(server.ping("Ping"))
} yield ()
run.compile.drain.guarantee(IO.consoleForIO.errorln("Terminating client"))
}

}
42 changes: 42 additions & 0 deletions examples/smithyServer/src/examples/smithy/server/ServerMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package examples.smithy.server

import jsonrpclib.CallId
import jsonrpclib.fs2._
import cats.effect._
import fs2.io._
import jsonrpclib.Endpoint
import cats.syntax.all._
import test._ // smithy4s-generated package
import jsonrpclib.smithy4sinterop.ClientStub
import jsonrpclib.smithy4sinterop.ServerEndpoints

object ServerMain extends IOApp.Simple {

// Reserving a method for cancelation.
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)

// Implementing the generated interface
class ServerImpl(client: TestClient[IO]) extends TestServer[IO] {
def greet(name: String): IO[GreetOutput] = IO.pure(GreetOutput(s"Server says: hello $name !"))

def ping(ping: String): IO[Unit] = client.pong(s"Returned to sender: $ping")
}

def printErr(s: String): IO[Unit] = IO.consoleForIO.errorln(s)

def run: IO[Unit] = {
val run = for {
channel <- FS2Channel[IO](cancelTemplate = Some(cancelEndpoint))
testClient <- ClientStub.stream(TestClient, channel)
_ <- channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient)))
_ <- fs2.Stream
.eval(IO.never) // running the server forever
.concurrently(stdin[IO](512).through(lsp.decodeMessages).through(channel.inputOrBounce))
.concurrently(channel.output.through(lsp.encodeMessages).through(stdout[IO]))
} yield {}

// Using errorln as stdout is used by the RPC channel
printErr("Starting server") >> run.compile.drain.guarantee(printErr("Terminating server"))
}

}
45 changes: 45 additions & 0 deletions examples/smithyShared/smithy/spec.smithy
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
$version: "2.0"

namespace test

use jsonrpclib#jsonRequest
use jsonrpclib#jsonRPC
use jsonrpclib#jsonNotification

@jsonRPC
service TestServer {
operations: [Greet, Ping]
}

@jsonRPC
service TestClient {
operations: [Pong]
}

@jsonRequest("greet")
operation Greet {
input := {
@required
name: String
}
output := {
@required
message: String
}
}

@jsonNotification("ping")
operation Ping {
input := {
@required
ping: String
}
}

@jsonNotification("pong")
operation Pong {
input := {
@required
pong: String
}
}
23 changes: 23 additions & 0 deletions smithy/resources/META-INF/smithy/jsonrpclib.smithy
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
$version: "2.0"

namespace jsonrpclib
Copy link
Contributor Author

@Baccata Baccata Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this smithy file contains the protocol definition. It kind of echoes a question that @ckipp01 had on my BSP PR about where would be a good "central" for protocols.

I suppose DisneyStreaming could host it in https://github.com/disneystreaming/alloy/, but I'm a bit wary about putting it there and have the BSP depend on it ($work project etc etc).


/// the JSON-RPC protocol,
/// see https://www.jsonrpc.org/specification
@protocolDefinition(traits: [
jsonRequest
jsonNotification
])
@trait(selector: "service")
structure jsonRPC {
}

/// Identifies an operation that abides by request/response semantics
/// https://www.jsonrpc.org/specification#request_object
@trait(selector: "operation")
string jsonRequest

/// Identifies an operation that abides by fire-and-forget semantics
/// see https://www.jsonrpc.org/specification#notification
@trait(selector: "operation")
string jsonNotification
1 change: 1 addition & 0 deletions smithy/resources/META-INF/smithy/manifest
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
jsonrpclib.smithy
Loading