Skip to content

Commit

Permalink
fix(interactive): refine Insight server error (#4152)
Browse files Browse the repository at this point in the history
Define and throw out more specific Insight Server errors related to
queries, focusing on those that occur during the job build phase and the
job execution phase.

Examples as follows:

Errors in build phase (a mock error):

![image](https://github.com/user-attachments/assets/fc06827a-8245-4045-a432-23e276002ff4)

Errors in execution phase (a mock error):

![image](https://github.com/user-attachments/assets/db8e3bca-9900-47bc-8a7e-d3f34c66e6b2)
  • Loading branch information
BingqingLyu authored Aug 19, 2024
1 parent 08bacf4 commit 8a28040
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 120 deletions.
19 changes: 17 additions & 2 deletions interactive_engine/executor/engine/pegasus/server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=proto/job_service.proto");
println!("cargo:rerun-if-changed=proto/job_plan.proto");
println!("cargo:rerun-if-changed=../../../../../proto/error/insight.proto");
codegen_inplace()
}

Expand All @@ -30,7 +31,14 @@ fn codegen_inplace() -> Result<(), Box<dyn std::error::Error>> {
.build_server(true)
.build_client(true)
.out_dir("src/generated")
.compile(&["proto/job_service.proto", "proto/job_plan.proto"], &["proto"])?;
.compile(
&[
"proto/job_service.proto",
"proto/job_plan.proto",
"../../../../../proto/error/insight.proto",
],
&["proto", "../../../../../proto"],
)?;
Ok(())
}

Expand All @@ -39,6 +47,13 @@ fn codegen_inplace() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(true)
.build_client(true)
.compile(&["proto/job_service.proto", "proto/job_plan.proto"], &["proto"])?;
.compile(
&[
"proto/job_service.proto",
"proto/job_plan.proto",
"../../../../../proto/error/insight.proto",
],
&["proto", "../../../../../proto"],
)?;
Ok(())
}
116 changes: 116 additions & 0 deletions interactive_engine/executor/engine/pegasus/server/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//
//! Copyright 2024 Alibaba Group Holding Limited.
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//! See the License for the specific language governing permissions and
//! limitations under the License.
use std::collections::HashMap;

use pegasus::{
errors::{ErrorKind, JobExecError},
JobSubmitError,
};

use crate::insight_error::Code as ErrorCode;

#[derive(Clone)]
pub struct ServerError {
err_code: ErrorCode,
ec: String,
msg: String,
details: HashMap<String, String>,
}

impl ServerError {
pub fn new(err_code: ErrorCode, msg: String) -> Self {
let ec = format!("04-{:04}", err_code as i32);
ServerError { err_code, ec, msg, details: HashMap::new() }
}

pub fn with_details(mut self, key: &str, value: String) -> Self {
self.details.insert(key.to_string(), value);
self
}
}

impl std::fmt::Debug for ServerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "")?;
writeln!(f, "ErrorCode: {:?}", self.err_code)?;
writeln!(f, "EC: {}", self.ec)?;
writeln!(f, "Message: \"{}\"", self.msg)?;
for (k, v) in self.details.iter() {
writeln!(f, "{}: {}", k, v)?;
}
write!(f, "")
}
}

impl From<&JobExecError> for ServerError {
fn from(err: &JobExecError) -> Self {
match err.kind {
ErrorKind::WouldBlock(_) => {
let err_code = ErrorCode::JobExecuteWouldBlock;
ServerError::new(err_code, format!("{}", err))
}
ErrorKind::Interrupted => {
let err_code = ErrorCode::JobExecuteInterrupted;
ServerError::new(err_code, format!("{}", err))
}
ErrorKind::IOError => {
let err_code = ErrorCode::JobExecuteIoError;
ServerError::new(err_code, format!("{}", err))
}
ErrorKind::IllegalScopeInput => {
let err_code = ErrorCode::JobExecuteIlleagalScopeInput;
ServerError::new(err_code, format!("{}", err))
}
ErrorKind::Canceled => {
let err_code = ErrorCode::JobExecuteCancelled;
ServerError::new(err_code, format!("{}", err))
}
ErrorKind::Others => {
let err_code = ErrorCode::JobExecuteOthers;
ServerError::new(err_code, format!("{}", err))
}
}
}
}

impl From<&JobSubmitError> for ServerError {
fn from(err: &JobSubmitError) -> Self {
match err {
JobSubmitError::Build(err) => match err {
pegasus::BuildJobError::Unsupported(e) => {
let err_code = ErrorCode::JobSubmitBuildJobUnsupported;
ServerError::new(err_code, format!("{}", e))
}
pegasus::BuildJobError::InternalError(e) => {
let err_code = ErrorCode::JobSubmitBuildJobInternalError;
ServerError::new(err_code, format!("{}", e))
}
pegasus::BuildJobError::ServerError(e) => {
let err_code = ErrorCode::JobSubmitBuildJobServerError;
ServerError::new(err_code, format!("{}", e))
}
pegasus::BuildJobError::UserError(e) => {
let err_code = ErrorCode::JobSubmitBuildJobUserError;
ServerError::new(err_code, format!("{}", e))
}
},
JobSubmitError::Spawn(e) => {
let err_code = ErrorCode::JobSubmitSpawnJobError;
ServerError::new(err_code, format!("{}", e))
}
}
}
}
7 changes: 7 additions & 0 deletions interactive_engine/executor/engine/pegasus/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ mod generated {
pub mod job_proto {
tonic::include_proto!("job_proto");
}
pub mod insight_error {
tonic::include_proto!("insight_error");
}
}

#[rustfmt::skip]
Expand All @@ -35,8 +38,11 @@ mod generated {
pub mod protocol;
#[path = "job_proto.rs"]
pub mod job_proto;
#[path = "insight_error.rs"]
pub mod insight_error;
}

pub use generated::insight_error;
pub use generated::job_proto as job_pb;
pub use generated::protocol as pb;

Expand All @@ -46,6 +52,7 @@ pub trait AnyData: Data + Eq {}
pub mod client;
pub mod cluster;
pub mod config;
mod error;
pub mod job;
pub mod rpc;

Expand Down
35 changes: 13 additions & 22 deletions interactive_engine/executor/engine/pegasus/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,16 @@ use opentelemetry::{
trace::{Span, SpanKind, Tracer},
KeyValue,
};
use opentelemetry_otlp::{ExportConfig, Protocol, TonicExporterBuilder, WithExportConfig};
use opentelemetry_otlp::{TonicExporterBuilder, WithExportConfig};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::resource::{
EnvResourceDetector, SdkProvidedResourceDetector, TelemetryResourceDetector,
};
use opentelemetry_sdk::trace::BatchConfigBuilder;
use opentelemetry_sdk::Resource;
use pegasus::api::function::FnResult;
use pegasus::api::FromStream;
use pegasus::errors::{ErrorKind, JobExecError};
use pegasus::errors::JobExecError;
use pegasus::result::{FromStreamExt, ResultSink};
use pegasus::{Configuration, Data, JobConf, ServerConf};
use pegasus_network::config::ServerAddr;
Expand All @@ -55,6 +54,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::Server;
use tonic::{Code, Request, Response, Status};

use crate::error::ServerError;
use crate::generated::protocol as pb;
use crate::generated::protocol::job_config::Servers;
use crate::job::{JobAssembly, JobDesc};
Expand Down Expand Up @@ -103,24 +103,12 @@ impl FromStreamExt<Vec<u8>> for RpcSink {
fn on_error(&mut self, error: Box<dyn Error + Send>) {
self.had_error.store(true, Ordering::SeqCst);
let status = if let Some(e) = error.downcast_ref::<JobExecError>() {
match e.kind {
ErrorKind::WouldBlock(_) => {
Status::internal(format!("[Execution Error] WouldBlock: {}", error))
}
ErrorKind::Interrupted => {
Status::internal(format!("[Execution Error] Interrupted: {}", error))
}
ErrorKind::IOError => Status::internal(format!("[Execution Error] IOError: {}", error)),
ErrorKind::IllegalScopeInput => {
Status::internal(format!("[Execution Error] IllegalScopeInput: {}", error))
}
ErrorKind::Canceled => {
Status::deadline_exceeded(format!("[Execution Error] Canceled: {}", error))
}
_ => Status::unknown(format!("[Execution Error]: {}", error)),
}
let server_error = ServerError::from(e).with_details("QueryId", self.job_id.to_string());
Status::internal(format!("{:?}", server_error))
} else {
Status::unknown(format!("[Unknown Error]: {}", error))
let server_error =
ServerError::new(crate::insight_error::Code::UnknownError, error.to_string());
Status::unknown(format!("{:?}", server_error))
};

self.tx.send(Err(status)).ok();
Expand Down Expand Up @@ -235,15 +223,18 @@ where
info!("trace_id : {}, job conf {:?}", trace_id_hex, conf);
span.set_attributes(vec![
KeyValue::new("job.name", conf.job_name.clone()),
KeyValue::new("job.id", conf.job_id.to_string()),
KeyValue::new("job.id", job_id.to_string()),
]);
let cx = opentelemetry::Context::current_with_span(span);
let _guard = cx.clone().attach();
let ret = pegasus::run_opt(conf, sink, move |worker| service.assemble(&job, worker));

if let Err(e) = ret {
error!("trace_id:{}, submit job {} failure: {:?}", trace_id_hex, job_id, e);
Err(Status::unknown(format!("submit job error {}", e)))
let server_error = ServerError::from(&e)
.with_details("TraceId", trace_id_hex)
.with_details("QueryId", job_id.to_string());
Err(Status::internal(format!("{:?}", server_error)))
} else {
Ok(Response::new(UnboundedReceiverStream::new(rx)))
}
Expand Down
Loading

0 comments on commit 8a28040

Please sign in to comment.