diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 08bef604..34a3a30c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,9 +49,9 @@ jobs: - name: Run grpc-web e2e test if: ${{ matrix.scala != 3 }} run: | - docker-compose -f ./examples/fullapp/docker-compose.yaml up -d + docker compose -f ./examples/fullapp/docker-compose.yaml up -d sbt e2eWebJS${{matrix.scala}}/fastOptJS/webpack e2eWebJVM${{matrix.scala}}/test - docker-compose -f ./examples/fullapp/docker-compose.yaml down + docker compose -f ./examples/fullapp/docker-compose.yaml down - name: Examples run: | diff --git a/.jvmopts b/.jvmopts new file mode 100644 index 00000000..f68da4ae --- /dev/null +++ b/.jvmopts @@ -0,0 +1,2 @@ +-Xms1g +-Xmx2g diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e71db65..424d1b5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## 0.6.3 +* Delay sending response headers until the first message is ready. * Add configurable backpressure to streaming clients. ## 0.6.1 diff --git a/core/src/main/scalajvm/scalapb/zio_grpc/server/ListenerDriver.scala b/core/src/main/scalajvm/scalapb/zio_grpc/server/ListenerDriver.scala index 5a775698..0eda4f85 100644 --- a/core/src/main/scalajvm/scalapb/zio_grpc/server/ListenerDriver.scala +++ b/core/src/main/scalajvm/scalapb/zio_grpc/server/ListenerDriver.scala @@ -1,7 +1,7 @@ package scalapb.zio_grpc.server import io.grpc.ServerCall.Listener -import io.grpc.{Metadata, Status, StatusException} +import io.grpc.{Status, StatusException} import zio._ import zio.stream.{Stream, ZStream} import scalapb.zio_grpc.RequestContext @@ -27,7 +27,6 @@ object ListenerDriver { ( call.request(2) *> completed.await *> - call.sendHeaders(new Metadata) *> request.await flatMap writeResponse ).onExit(ex => call.close(ListenerDriver.exitToStatus(ex), requestContext.responseMetadata.metadata).ignore) .ignore @@ -113,7 +112,6 @@ object ListenerDriver { .collectWhileSome (call.request(1) *> - call.sendHeaders(new Metadata) *> writeResponse(requestStream)) .onExit(ex => call.close(ListenerDriver.exitToStatus(ex), requestContext.responseMetadata.metadata).ignore) .ignore diff --git a/core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala b/core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala index 67417bb1..33be5142 100644 --- a/core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala +++ b/core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala @@ -72,7 +72,10 @@ object ZServerCallHandler { ): ServerCallHandler[Req, Res] = unaryInput[Req, Res]( runtime, - (req, requestContext, call) => impl(req, requestContext).flatMap[Any, StatusException, Unit](call.sendMessage) + (req, requestContext, call) => + impl(req, requestContext) + .zipLeft(call.sendHeaders(new Metadata)) + .flatMap[Any, StatusException, Unit](call.sendMessage) ) def serverStreamingCallHandler[Req, Res]( @@ -91,7 +94,10 @@ object ZServerCallHandler { ): ServerCallHandler[Req, Res] = streamingInput[Req, Res]( runtime, - (req, requestContext, call) => impl(req, requestContext).flatMap[Any, StatusException, Unit](call.sendMessage) + (req, requestContext, call) => + impl(req, requestContext) + .zipLeft(call.sendHeaders(new Metadata)) + .flatMap[Any, StatusException, Unit](call.sendMessage) ) def bidiCallHandler[Req, Res]( @@ -108,16 +114,17 @@ object ZServerCallHandler { stream: ZStream[Any, StatusException, Res] ): ZIO[Any, StatusException, Unit] = { val backpressureSink = { - def go: ZChannel[Any, ZNothing, Chunk[Res], Any, StatusException, Chunk[Res], Unit] = + def go(metadata: Option[Metadata]): ZChannel[Any, ZNothing, Chunk[Res], Any, StatusException, Chunk[Res], Unit] = ZChannel.readWithCause( xs => - ZChannel.fromZIO(GIO.attempt(xs.foreach(call.call.sendMessage))) *> - ZChannel.suspend(if (call.call.isReady()) go else ZChannel.fromZIO(call.awaitReady) *> go), + ZChannel.fromZIO(ZIO.foreachDiscard(metadata)(call.sendHeaders)) *> + ZChannel.fromZIO(GIO.attempt(xs.foreach(call.call.sendMessage))) *> + ZChannel.suspend(if (call.call.isReady()) go(None) else ZChannel.fromZIO(call.awaitReady) *> go(None)), c => ZChannel.failCause(c), _ => ZChannel.unit ) - ZSink.fromChannel(go) + ZSink.fromChannel(go(Some(new Metadata))) } for { diff --git a/e2e/protos/src/main/protobuf/testservice.proto b/e2e/protos/src/main/protobuf/testservice.proto index 691aeee2..9165ff65 100644 --- a/e2e/protos/src/main/protobuf/testservice.proto +++ b/e2e/protos/src/main/protobuf/testservice.proto @@ -11,6 +11,7 @@ message Request { ERROR_AFTER = 2; // for server streaming, error after two responses DELAY = 3; // do not return a response. for testing cancellations DIE = 4; // fail + UNAVAILABLE = 5; // fail with UNAVAILABLE, to test client retries } Scenario scenario = 1; int32 in = 2; diff --git a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala index 3f6e85fc..a39b9e3a 100644 --- a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala +++ b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala @@ -8,6 +8,8 @@ import scalapb.zio_grpc.testservice.Request.Scenario import zio.stream.{Stream, ZStream} import zio.ZEnvironment +import java.util.concurrent.atomic.AtomicInteger + package object server { import zio.Schedule @@ -22,17 +24,22 @@ package object server { exit: zio.Promise[Nothing, Exit[StatusException, Response]] )(clock: Clock, console: Console) extends testservice.ZioTestservice.TestService { + val rpcRunsCounter: AtomicInteger = new AtomicInteger(0) + def unary(request: Request): ZIO[Any, StatusException, Response] = - (requestReceived.succeed(()) *> (request.scenario match { - case Scenario.OK => - ZIO.succeed( - Response(out = "Res" + request.in.toString) - ) - case Scenario.ERROR_NOW => + (requestReceived.succeed(()) *> ZIO.succeed(rpcRunsCounter.incrementAndGet()) *> (request.scenario match { + case Scenario.OK => + ZIO.succeed(Response(out = "Res" + request.in.toString)) + case Scenario.ERROR_NOW => ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException()) - case Scenario.DELAY => ZIO.never - case Scenario.DIE => ZIO.die(new RuntimeException("FOO")) - case _ => ZIO.fail(Status.UNKNOWN.asException()) + case Scenario.DELAY => + ZIO.never + case Scenario.DIE => + ZIO.die(new RuntimeException("FOO")) + case Scenario.UNAVAILABLE => + ZIO.fail(Status.UNAVAILABLE.withDescription(rpcRunsCounter.get().toString).asException()) + case _ => + ZIO.fail(Status.UNKNOWN.asException()) })).onExit(exit.succeed(_)) def unaryTypeMapped(request: Request): ZIO[Any, StatusException, WrappedString] = @@ -42,14 +49,15 @@ package object server { request: Request ): ZStream[Any, StatusException, Response] = ZStream - .acquireReleaseExitWith(requestReceived.succeed(())) { (_, ex) => - ex.foldExit( - failed => - if (failed.isInterrupted || failed.isInterruptedOnly) - exit.succeed(Exit.fail(Status.CANCELLED.asException())) - else exit.succeed(Exit.fail(Status.UNKNOWN.asException())), - _ => exit.succeed(Exit.succeed(Response())) - ) + .acquireReleaseExitWith(requestReceived.succeed(()) *> ZIO.succeed(rpcRunsCounter.incrementAndGet())) { + (_, ex) => + ex.foldExit( + failed => + if (failed.isInterrupted || failed.isInterruptedOnly) + exit.succeed(Exit.fail(Status.CANCELLED.asException())) + else exit.succeed(Exit.fail(Status.UNKNOWN.asException())), + _ => exit.succeed(Exit.succeed(Response())) + ) } .flatMap { _ => request.scenario match { @@ -78,15 +86,14 @@ package object server { def clientStreaming( request: Stream[StatusException, Request] ): ZIO[Any, StatusException, Response] = - requestReceived.succeed(()) *> + requestReceived.succeed(()) *> ZIO.succeed(rpcRunsCounter.incrementAndGet()) *> request .runFoldZIO(0)((state, req) => req.scenario match { case Scenario.OK => ZIO.succeed(state + req.in) case Scenario.DELAY => delayReceived.succeed(()) *> ZIO.never case Scenario.DIE => ZIO.die(new RuntimeException("foo")) - case Scenario.ERROR_NOW => - ZIO.fail((Status.INTERNAL.withDescription("InternalError").asException())) + case Scenario.ERROR_NOW => ZIO.fail((Status.INTERNAL.withDescription("InternalError").asException())) case _: Scenario => ZIO.fail(Status.UNKNOWN.asException()) } ) @@ -96,7 +103,7 @@ package object server { def bidiStreaming( request: Stream[StatusException, Request] ): Stream[StatusException, Response] = - (ZStream.fromZIO(requestReceived.succeed(())).drain ++ + (ZStream.fromZIO(requestReceived.succeed(()) *> ZIO.succeed(rpcRunsCounter.incrementAndGet())).drain ++ (request.flatMap { r => r.scenario match { case Scenario.OK => diff --git a/e2e/src/test/scalajvm/scalapb/zio_grpc/TestServiceSpec.scala b/e2e/src/test/scalajvm/scalapb/zio_grpc/TestServiceSpec.scala index 4b4bd114..982342b5 100644 --- a/e2e/src/test/scalajvm/scalapb/zio_grpc/TestServiceSpec.scala +++ b/e2e/src/test/scalajvm/scalapb/zio_grpc/TestServiceSpec.scala @@ -1,6 +1,7 @@ package scalapb.zio_grpc import io.grpc.{ManagedChannelBuilder, ServerBuilder, Status, StatusException} +import scala.jdk.CollectionConverters._ import scalapb.zio_grpc.server.TestServiceImpl import scalapb.zio_grpc.testservice.Request.Scenario import scalapb.zio_grpc.testservice.ZioTestservice.TestServiceClient @@ -16,11 +17,27 @@ object TestServiceSpec extends ZIOSpecDefault with CommonTestServiceSpec { val serverLayer: ZLayer[TestServiceImpl, Throwable, Server] = ServerLayer.fromEnvironment[TestServiceImpl.Service](ServerBuilder.forPort(0)) + // https://github.com/grpc/proposal/blob/master/A6-client-retries.md + val serviceConfig = Map( + "methodConfig" -> List( + Map( + "name" -> List(Map("service" -> "scalapb.zio_grpc.TestService", "method" -> "Unary").asJava).asJava, + "retryPolicy" -> Map[String, Any]( + "maxAttempts" -> "5", + "initialBackoff" -> "0.1s", + "maxBackoff" -> "30s", + "backoffMultiplier" -> "1", + "retryableStatusCodes" -> List("UNAVAILABLE").asJava + ).asJava + ).asJava + ).asJava + ).asJava + def clientLayer(prefetch: Option[Int]): ZLayer[Server, Nothing, TestServiceClient] = ZLayer.scoped[Server](for { ss <- ZIO.service[Server] port <- ss.port.orDie - ch = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext() + ch = ManagedChannelBuilder.forAddress("localhost", port).defaultServiceConfig(serviceConfig).usePlaintext() client <- TestServiceClient.scoped(ZManagedChannel(ch, prefetch, Nil)).orDie } yield client) @@ -46,7 +63,12 @@ object TestServiceSpec extends ZIOSpecDefault with CommonTestServiceSpec { // The timeout below protects the test from getting hang if the call is discarded by grpc. exit <- TestServiceImpl.awaitExit.timeout(3.seconds) } yield assert(r)(fails(hasStatusCode(Status.DEADLINE_EXCEEDED))) && assert(exit.get.isInterrupted)(isTrue) - } @@ flaky(100) @@ withLiveClock + } @@ flaky(100) @@ withLiveClock, + test("let clients retry") { + assertZIO(TestServiceClient.unary(Request(Request.Scenario.UNAVAILABLE, in = 12)).exit) { + fails(hasStatusCode(Status.UNAVAILABLE) && hasDescription("5")) + } + } ) def serverStreamingSuiteJVM = diff --git a/examples/fullapp/README.md b/examples/fullapp/README.md index 382519b4..d9553c57 100644 --- a/examples/fullapp/README.md +++ b/examples/fullapp/README.md @@ -23,7 +23,7 @@ To get this project running: 3. On a different tab, start envoyproxy in a docker container: ``` - $ docker-compose up + $ docker compose up ``` 4. In a browser open `index.html` in this directory. In Google Chrome, you can open a file by pressing