Skip to content

Commit

Permalink
Delay sending response headers (scalapb#635)
Browse files Browse the repository at this point in the history
Send headers only if effect runs successfully for unary and server streaming RPCs

Co-authored-by: Thibault Jeandet <[email protected]>
  • Loading branch information
joroKr21 and Thibault Jeandet authored Sep 25, 2024
1 parent aa3e67e commit 47ec1a3
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 35 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ jobs:
- name: Run grpc-web e2e test
if: ${{ matrix.scala != 3 }}
run: |
docker-compose -f ./examples/fullapp/docker-compose.yaml up -d
docker compose -f ./examples/fullapp/docker-compose.yaml up -d
sbt e2eWebJS${{matrix.scala}}/fastOptJS/webpack e2eWebJVM${{matrix.scala}}/test
docker-compose -f ./examples/fullapp/docker-compose.yaml down
docker compose -f ./examples/fullapp/docker-compose.yaml down
- name: Examples
run: |
Expand Down
2 changes: 2 additions & 0 deletions .jvmopts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-Xms1g
-Xmx2g
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## 0.6.3
* Delay sending response headers until the first message is ready.
* Add configurable backpressure to streaming clients.

## 0.6.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package scalapb.zio_grpc.server

import io.grpc.ServerCall.Listener
import io.grpc.{Metadata, Status, StatusException}
import io.grpc.{Status, StatusException}
import zio._
import zio.stream.{Stream, ZStream}
import scalapb.zio_grpc.RequestContext
Expand All @@ -27,7 +27,6 @@ object ListenerDriver {
(
call.request(2) *>
completed.await *>
call.sendHeaders(new Metadata) *>
request.await flatMap writeResponse
).onExit(ex => call.close(ListenerDriver.exitToStatus(ex), requestContext.responseMetadata.metadata).ignore)
.ignore
Expand Down Expand Up @@ -113,7 +112,6 @@ object ListenerDriver {
.collectWhileSome

(call.request(1) *>
call.sendHeaders(new Metadata) *>
writeResponse(requestStream))
.onExit(ex => call.close(ListenerDriver.exitToStatus(ex), requestContext.responseMetadata.metadata).ignore)
.ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ object ZServerCallHandler {
): ServerCallHandler[Req, Res] =
unaryInput[Req, Res](
runtime,
(req, requestContext, call) => impl(req, requestContext).flatMap[Any, StatusException, Unit](call.sendMessage)
(req, requestContext, call) =>
impl(req, requestContext)
.zipLeft(call.sendHeaders(new Metadata))
.flatMap[Any, StatusException, Unit](call.sendMessage)
)

def serverStreamingCallHandler[Req, Res](
Expand All @@ -91,7 +94,10 @@ object ZServerCallHandler {
): ServerCallHandler[Req, Res] =
streamingInput[Req, Res](
runtime,
(req, requestContext, call) => impl(req, requestContext).flatMap[Any, StatusException, Unit](call.sendMessage)
(req, requestContext, call) =>
impl(req, requestContext)
.zipLeft(call.sendHeaders(new Metadata))
.flatMap[Any, StatusException, Unit](call.sendMessage)
)

def bidiCallHandler[Req, Res](
Expand All @@ -108,16 +114,17 @@ object ZServerCallHandler {
stream: ZStream[Any, StatusException, Res]
): ZIO[Any, StatusException, Unit] = {
val backpressureSink = {
def go: ZChannel[Any, ZNothing, Chunk[Res], Any, StatusException, Chunk[Res], Unit] =
def go(metadata: Option[Metadata]): ZChannel[Any, ZNothing, Chunk[Res], Any, StatusException, Chunk[Res], Unit] =
ZChannel.readWithCause(
xs =>
ZChannel.fromZIO(GIO.attempt(xs.foreach(call.call.sendMessage))) *>
ZChannel.suspend(if (call.call.isReady()) go else ZChannel.fromZIO(call.awaitReady) *> go),
ZChannel.fromZIO(ZIO.foreachDiscard(metadata)(call.sendHeaders)) *>
ZChannel.fromZIO(GIO.attempt(xs.foreach(call.call.sendMessage))) *>
ZChannel.suspend(if (call.call.isReady()) go(None) else ZChannel.fromZIO(call.awaitReady) *> go(None)),
c => ZChannel.failCause(c),
_ => ZChannel.unit
)

ZSink.fromChannel(go)
ZSink.fromChannel(go(Some(new Metadata)))
}

for {
Expand Down
1 change: 1 addition & 0 deletions e2e/protos/src/main/protobuf/testservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ message Request {
ERROR_AFTER = 2; // for server streaming, error after two responses
DELAY = 3; // do not return a response. for testing cancellations
DIE = 4; // fail
UNAVAILABLE = 5; // fail with UNAVAILABLE, to test client retries
}
Scenario scenario = 1;
int32 in = 2;
Expand Down
49 changes: 28 additions & 21 deletions e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import scalapb.zio_grpc.testservice.Request.Scenario
import zio.stream.{Stream, ZStream}
import zio.ZEnvironment

import java.util.concurrent.atomic.AtomicInteger

package object server {

import zio.Schedule
Expand All @@ -22,17 +24,22 @@ package object server {
exit: zio.Promise[Nothing, Exit[StatusException, Response]]
)(clock: Clock, console: Console)
extends testservice.ZioTestservice.TestService {
val rpcRunsCounter: AtomicInteger = new AtomicInteger(0)

def unary(request: Request): ZIO[Any, StatusException, Response] =
(requestReceived.succeed(()) *> (request.scenario match {
case Scenario.OK =>
ZIO.succeed(
Response(out = "Res" + request.in.toString)
)
case Scenario.ERROR_NOW =>
(requestReceived.succeed(()) *> ZIO.succeed(rpcRunsCounter.incrementAndGet()) *> (request.scenario match {
case Scenario.OK =>
ZIO.succeed(Response(out = "Res" + request.in.toString))
case Scenario.ERROR_NOW =>
ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException())
case Scenario.DELAY => ZIO.never
case Scenario.DIE => ZIO.die(new RuntimeException("FOO"))
case _ => ZIO.fail(Status.UNKNOWN.asException())
case Scenario.DELAY =>
ZIO.never
case Scenario.DIE =>
ZIO.die(new RuntimeException("FOO"))
case Scenario.UNAVAILABLE =>
ZIO.fail(Status.UNAVAILABLE.withDescription(rpcRunsCounter.get().toString).asException())
case _ =>
ZIO.fail(Status.UNKNOWN.asException())
})).onExit(exit.succeed(_))

def unaryTypeMapped(request: Request): ZIO[Any, StatusException, WrappedString] =
Expand All @@ -42,14 +49,15 @@ package object server {
request: Request
): ZStream[Any, StatusException, Response] =
ZStream
.acquireReleaseExitWith(requestReceived.succeed(())) { (_, ex) =>
ex.foldExit(
failed =>
if (failed.isInterrupted || failed.isInterruptedOnly)
exit.succeed(Exit.fail(Status.CANCELLED.asException()))
else exit.succeed(Exit.fail(Status.UNKNOWN.asException())),
_ => exit.succeed(Exit.succeed(Response()))
)
.acquireReleaseExitWith(requestReceived.succeed(()) *> ZIO.succeed(rpcRunsCounter.incrementAndGet())) {
(_, ex) =>
ex.foldExit(
failed =>
if (failed.isInterrupted || failed.isInterruptedOnly)
exit.succeed(Exit.fail(Status.CANCELLED.asException()))
else exit.succeed(Exit.fail(Status.UNKNOWN.asException())),
_ => exit.succeed(Exit.succeed(Response()))
)
}
.flatMap { _ =>
request.scenario match {
Expand Down Expand Up @@ -78,15 +86,14 @@ package object server {
def clientStreaming(
request: Stream[StatusException, Request]
): ZIO[Any, StatusException, Response] =
requestReceived.succeed(()) *>
requestReceived.succeed(()) *> ZIO.succeed(rpcRunsCounter.incrementAndGet()) *>
request
.runFoldZIO(0)((state, req) =>
req.scenario match {
case Scenario.OK => ZIO.succeed(state + req.in)
case Scenario.DELAY => delayReceived.succeed(()) *> ZIO.never
case Scenario.DIE => ZIO.die(new RuntimeException("foo"))
case Scenario.ERROR_NOW =>
ZIO.fail((Status.INTERNAL.withDescription("InternalError").asException()))
case Scenario.ERROR_NOW => ZIO.fail((Status.INTERNAL.withDescription("InternalError").asException()))
case _: Scenario => ZIO.fail(Status.UNKNOWN.asException())
}
)
Expand All @@ -96,7 +103,7 @@ package object server {
def bidiStreaming(
request: Stream[StatusException, Request]
): Stream[StatusException, Response] =
(ZStream.fromZIO(requestReceived.succeed(())).drain ++
(ZStream.fromZIO(requestReceived.succeed(()) *> ZIO.succeed(rpcRunsCounter.incrementAndGet())).drain ++
(request.flatMap { r =>
r.scenario match {
case Scenario.OK =>
Expand Down
26 changes: 24 additions & 2 deletions e2e/src/test/scalajvm/scalapb/zio_grpc/TestServiceSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scalapb.zio_grpc

import io.grpc.{ManagedChannelBuilder, ServerBuilder, Status, StatusException}
import scala.jdk.CollectionConverters._
import scalapb.zio_grpc.server.TestServiceImpl
import scalapb.zio_grpc.testservice.Request.Scenario
import scalapb.zio_grpc.testservice.ZioTestservice.TestServiceClient
Expand All @@ -16,11 +17,27 @@ object TestServiceSpec extends ZIOSpecDefault with CommonTestServiceSpec {
val serverLayer: ZLayer[TestServiceImpl, Throwable, Server] =
ServerLayer.fromEnvironment[TestServiceImpl.Service](ServerBuilder.forPort(0))

// https://github.com/grpc/proposal/blob/master/A6-client-retries.md
val serviceConfig = Map(
"methodConfig" -> List(
Map(
"name" -> List(Map("service" -> "scalapb.zio_grpc.TestService", "method" -> "Unary").asJava).asJava,
"retryPolicy" -> Map[String, Any](
"maxAttempts" -> "5",
"initialBackoff" -> "0.1s",
"maxBackoff" -> "30s",
"backoffMultiplier" -> "1",
"retryableStatusCodes" -> List("UNAVAILABLE").asJava
).asJava
).asJava
).asJava
).asJava

def clientLayer(prefetch: Option[Int]): ZLayer[Server, Nothing, TestServiceClient] =
ZLayer.scoped[Server](for {
ss <- ZIO.service[Server]
port <- ss.port.orDie
ch = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext()
ch = ManagedChannelBuilder.forAddress("localhost", port).defaultServiceConfig(serviceConfig).usePlaintext()
client <- TestServiceClient.scoped(ZManagedChannel(ch, prefetch, Nil)).orDie
} yield client)

Expand All @@ -46,7 +63,12 @@ object TestServiceSpec extends ZIOSpecDefault with CommonTestServiceSpec {
// The timeout below protects the test from getting hang if the call is discarded by grpc.
exit <- TestServiceImpl.awaitExit.timeout(3.seconds)
} yield assert(r)(fails(hasStatusCode(Status.DEADLINE_EXCEEDED))) && assert(exit.get.isInterrupted)(isTrue)
} @@ flaky(100) @@ withLiveClock
} @@ flaky(100) @@ withLiveClock,
test("let clients retry") {
assertZIO(TestServiceClient.unary(Request(Request.Scenario.UNAVAILABLE, in = 12)).exit) {
fails(hasStatusCode(Status.UNAVAILABLE) && hasDescription("5"))
}
}
)

def serverStreamingSuiteJVM =
Expand Down
2 changes: 1 addition & 1 deletion examples/fullapp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ To get this project running:
3. On a different tab, start envoyproxy in a docker container:

```
$ docker-compose up
$ docker compose up
```

4. In a browser open `index.html` in this directory. In Google Chrome, you can open a file by pressing
Expand Down

0 comments on commit 47ec1a3

Please sign in to comment.