Skip to content

Commit

Permalink
fix(interactive): add logs when return result in GRPC (#4328)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

As titled.

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes
  • Loading branch information
BingqingLyu authored Nov 20, 2024
1 parent a8952a4 commit d8e5c27
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions interactive_engine/executor/engine/pegasus/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ impl FromStream<Vec<u8>> for RpcSink {
fn on_next(&mut self, resp: Vec<u8>) -> FnResult<()> {
// todo: use bytes to alleviate copy & allocate cost;
let res = pb::JobResponse { job_id: self.job_id, resp };
self.tx.send(Ok(res)).ok();
Ok(())
debug!("rpc send response for job {}", self.job_id);
self.tx
.send(Ok(res))
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)
}
}

Expand Down Expand Up @@ -115,7 +117,11 @@ impl FromStreamExt<Vec<u8>> for RpcSink {
Status::unknown(format!("{:?}", server_error))
};

self.tx.send(Err(status)).ok();
if let Err(e) = self.tx.send(Err(status)) {
error!("rpc send error failure for job {}: {:?}", self.job_id, e);
} else {
info!("rpc send error success for job {}", self.job_id);
}
}
}

Expand All @@ -124,8 +130,14 @@ impl Drop for RpcSink {
let before_sub = self.peers.fetch_sub(1, Ordering::SeqCst);
if before_sub == 1 {
if !self.had_error.load(Ordering::SeqCst) {
self.tx.send(Err(Status::ok("ok"))).ok();
if let Err(e) = self.tx.send(Err(Status::ok("ok"))) {
error!("rpc send complete failure for job {}: {:?}", self.job_id, e);
} else {
info!("rpc send complete success for job {}", self.job_id);
}
}
} else {
debug!("rpc send success for job {}, {} left;", self.job_id, before_sub - 1);
}
}
}
Expand Down

0 comments on commit d8e5c27

Please sign in to comment.