From d8e5c27475aa882959683806d39f24ff2070f87b Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Wed, 20 Nov 2024 11:58:39 +0800 Subject: [PATCH] fix(interactive): add logs when return result in GRPC (#4328) ## What do these changes do? As titled. ## Related issue number Fixes --- .../executor/engine/pegasus/server/src/rpc.rs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs index 33f9d0b1688c..85cffdb74ed0 100644 --- a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs +++ b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs @@ -82,8 +82,10 @@ impl FromStream> for RpcSink { fn on_next(&mut self, resp: Vec) -> FnResult<()> { // todo: use bytes to alleviate copy & allocate cost; let res = pb::JobResponse { job_id: self.job_id, resp }; - self.tx.send(Ok(res)).ok(); - Ok(()) + debug!("rpc send response for job {}", self.job_id); + self.tx + .send(Ok(res)) + .map_err(|e| Box::new(e) as Box) } } @@ -115,7 +117,11 @@ impl FromStreamExt> for RpcSink { Status::unknown(format!("{:?}", server_error)) }; - self.tx.send(Err(status)).ok(); + if let Err(e) = self.tx.send(Err(status)) { + error!("rpc send error failure for job {}: {:?}", self.job_id, e); + } else { + info!("rpc send error success for job {}", self.job_id); + } } } @@ -124,8 +130,14 @@ impl Drop for RpcSink { let before_sub = self.peers.fetch_sub(1, Ordering::SeqCst); if before_sub == 1 { if !self.had_error.load(Ordering::SeqCst) { - self.tx.send(Err(Status::ok("ok"))).ok(); + if let Err(e) = self.tx.send(Err(Status::ok("ok"))) { + error!("rpc send complete failure for job {}: {:?}", self.job_id, e); + } else { + info!("rpc send complete success for job {}", self.job_id); + } } + } else { + debug!("rpc send success for job {}, {} left;", self.job_id, before_sub - 1); } } }