Skip to content

Commit

Permalink
Optimize client calls (#618)
Browse files Browse the repository at this point in the history
* Optimize client calls

* Fix

* Use ZStream.execute
  • Loading branch information
ghostdogpr authored Apr 11, 2024
1 parent c09b79e commit 9d8bdc0
Showing 1 changed file with 25 additions and 37 deletions.
62 changes: 25 additions & 37 deletions core/src/main/scalajvm/scalapb/zio_grpc/client/ClientCalls.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ object ClientCalls {
StreamingClientCallListener.make[Res](call)
)(anyExitHandler[Req, Res](call))
.flatMap { (listener: StreamingClientCallListener[Res]) =>
ZStream
.fromZIO(
call.start(listener, headers) *>
call.request(1) *>
call.sendMessage(req) *>
call.halfClose()
)
.drain ++ listener.stream
ZStream.unwrap(
(call.start(listener, headers) *>
call.request(1) *>
call.sendMessage(req) *>
call.halfClose()).as(listener.stream)
)
}

def serverStreamingCall[Req, Res](
Expand All @@ -59,25 +57,25 @@ object ClientCalls {
headers: SafeMetadata,
req: Req
): ZStream[Any, StatusException, ResponseFrame[Res]] =
ZStream
.fromZIO(channel.newCall(method, options))
.flatMap(serverStreamingCall(_, headers, req))
ZStream.unwrap(
channel
.newCall(method, options)
.map(serverStreamingCall(_, headers, req))
)

private def clientStreamingCall[Req, Res](
call: ZClientCall[Req, Res],
headers: SafeMetadata,
req: ZStream[Any, StatusException, Req]
): IO[StatusException, ResponseContext[Res]] =
ZIO.acquireReleaseExitWith(UnaryClientCallListener.make[Res])(exitHandler(call)) { listener =>
val callStream = req.tap(call.sendMessage).drain ++ ZStream.fromZIO(call.halfClose()).drain
val resultStream = ZStream.fromZIO(listener.getValue)
val processRequestStream = req.runForeach(call.sendMessage) *> call.halfClose()
val getResult = listener.getValue

call.start(listener, headers) *>
call.request(1) *>
callStream
.merge(resultStream)
.runCollect
.map(res => res.last)
processRequestStream &>
getResult
}

def clientStreamingCall[Req, Res](
Expand All @@ -89,13 +87,7 @@ object ClientCalls {
): IO[StatusException, ResponseContext[Res]] =
channel
.newCall(method, options)
.flatMap(
clientStreamingCall(
_,
headers,
req
)
)
.flatMap(clientStreamingCall(_, headers, req))

private def bidiCall[Req, Res](
call: ZClientCall[Req, Res],
Expand All @@ -107,14 +99,10 @@ object ClientCalls {
StreamingClientCallListener.make[Res](call)
)(anyExitHandler(call))
.flatMap { (listener: StreamingClientCallListener[Res]) =>
val init =
ZStream
.fromZIO(
call.start(listener, headers) *>
call.request(1)
)
val finish = ZStream.fromZIO(call.halfClose())
val sendRequestStream = (init ++ req.tap(call.sendMessage) ++ finish).drain
val init = call.start(listener, headers) *> call.request(1)
val process = req.runForeach(call.sendMessage)
val finish = call.halfClose()
val sendRequestStream = ZStream.execute(init *> process *> finish)
sendRequestStream.merge(listener.stream, ZStream.HaltStrategy.Right)
}

Expand All @@ -125,11 +113,11 @@ object ClientCalls {
headers: SafeMetadata,
req: ZStream[Any, StatusException, Req]
): ZStream[Any, StatusException, ResponseFrame[Res]] =
ZStream
.fromZIO(
channel.newCall(method, options)
)
.flatMap(bidiCall(_, headers, req))
ZStream.unwrap(
channel
.newCall(method, options)
.map(bidiCall(_, headers, req))
)
}

def exitHandler[Req, Res](
Expand Down

0 comments on commit 9d8bdc0

Please sign in to comment.