Skip to content

Commit

Permalink
Leave response and sendError when request is canceled (#3267)
Browse files Browse the repository at this point in the history
* Leave response and sendError when request is canceled

* Add comment referencing the issue.

* fix: catch canceled job before closing too
  • Loading branch information
slashvar authored Jul 31, 2024
1 parent ace390c commit e5db414
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions frontend/server/src/main/java/org/pytorch/serve/job/GRPCJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,14 @@ public GRPCJob(
Arrays.asList("Host", ConfigManager.getInstance().getHostName());
}

private void cancelHandler(ServerCallStreamObserver<PredictionResponse> responseObserver) {
private boolean cancelHandler(ServerCallStreamObserver<PredictionResponse> responseObserver) {
if (responseObserver.isCancelled()) {
logger.warn(
"grpc client call already cancelled, not able to send this response for requestId: {}",
getPayload().getRequestId());
return true;
}
return false;
}

private void logQueueTime() {
Expand Down Expand Up @@ -124,7 +126,11 @@ public void response(
case STREAMPREDICT2:
ServerCallStreamObserver<PredictionResponse> responseObserver =
(ServerCallStreamObserver<PredictionResponse>) predictionResponseObserver;
cancelHandler(responseObserver);
if (cancelHandler(responseObserver)) {
// issue #3087: Leave response early as the request has been canceled.
// Note: trying to continue wil trigger an exception when calling `onNext`.
return;
}
PredictionResponse reply =
PredictionResponse.newBuilder().setPrediction(output).build();
responseObserver.onNext(reply);
Expand All @@ -133,6 +139,9 @@ public void response(
&& responseHeaders
.get(RequestInput.TS_STREAM_NEXT)
.equals("false"))) {
if (cancelHandler(responseObserver)) {
return;
}
responseObserver.onCompleted();
logQueueTime();
} else if (cmd == WorkerCommands.STREAMPREDICT2
Expand Down Expand Up @@ -208,7 +217,11 @@ public void sendError(int status, String error) {
case STREAMPREDICT2:
ServerCallStreamObserver<PredictionResponse> responseObserver =
(ServerCallStreamObserver<PredictionResponse>) predictionResponseObserver;
cancelHandler(responseObserver);
if (cancelHandler(responseObserver)) {
// issue #3087: Leave response early as the request has been canceled.
// Note: trying to continue wil trigger an exception when calling `onNext`.
return;
}
if (cmd == WorkerCommands.PREDICT || cmd == WorkerCommands.STREAMPREDICT) {
responseObserver.onError(
responseStatus
Expand Down

0 comments on commit e5db414

Please sign in to comment.