Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect lost messages #68

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nexosim/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_client(false)
.out_dir("src/grpc/codegen/")
.compile(&["simulation.proto"], &["src/grpc/api/"])?;
.compile_protos(&["simulation.proto"], &["src/grpc/api/"])?;

Ok(())
}
4 changes: 2 additions & 2 deletions nexosim/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);

#[derive(Debug)]
pub(crate) enum ExecutorError {
/// The simulation has deadlocked.
Deadlock,
/// Not all messages have been processed.
UnprocessedMessages(usize),
/// The simulation has timed out.
Timeout,
/// The simulation has panicked.
Expand Down
4 changes: 2 additions & 2 deletions nexosim/src/executor/mt_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ impl Executor {
if self.context.pool_manager.pool_is_idle() {
let msg_count = self.context.msg_count.load(Ordering::Relaxed);
if msg_count != 0 {
assert!(msg_count > 0);
let msg_count: usize = msg_count.try_into().unwrap();

return Err(ExecutorError::Deadlock);
return Err(ExecutorError::UnprocessedMessages(msg_count));
}

return Ok(());
Expand Down
6 changes: 3 additions & 3 deletions nexosim/src/executor/st_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ impl ExecutorInner {
return Err(ExecutorError::Panic(model_id, payload));
}

// Check for deadlock.
// Check for unprocessed messages.
self.context.msg_count = channel::THREAD_MSG_COUNT.replace(msg_count_stash);
if self.context.msg_count != 0 {
assert!(self.context.msg_count > 0);
let msg_count: usize = self.context.msg_count.try_into().unwrap();

return Err(ExecutorError::Deadlock);
return Err(ExecutorError::UnprocessedMessages(msg_count));
}

Ok(())
Expand Down
11 changes: 6 additions & 5 deletions nexosim/src/grpc/api/simulation.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ enum ErrorCode {
SIMULATION_NOT_STARTED = 1;
SIMULATION_TERMINATED = 2;
SIMULATION_DEADLOCK = 3;
SIMULATION_PANIC = 4;
SIMULATION_TIMEOUT = 5;
SIMULATION_OUT_OF_SYNC = 6;
SIMULATION_BAD_QUERY = 7;
SIMULATION_TIME_OUT_OF_RANGE = 8;
SIMULATION_MESSAGE_LOSS = 4;
SIMULATION_PANIC = 5;
SIMULATION_TIMEOUT = 6;
SIMULATION_OUT_OF_SYNC = 7;
SIMULATION_BAD_QUERY = 8;
SIMULATION_TIME_OUT_OF_RANGE = 9;
MISSING_ARGUMENT = 20;
INVALID_TIME = 30;
INVALID_PERIOD = 31;
Expand Down
79 changes: 45 additions & 34 deletions nexosim/src/grpc/codegen/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,12 @@ pub enum ErrorCode {
SimulationNotStarted = 1,
SimulationTerminated = 2,
SimulationDeadlock = 3,
SimulationPanic = 4,
SimulationTimeout = 5,
SimulationOutOfSync = 6,
SimulationBadQuery = 7,
SimulationTimeOutOfRange = 8,
SimulationMessageLoss = 4,
SimulationPanic = 5,
SimulationTimeout = 6,
SimulationOutOfSync = 7,
SimulationBadQuery = 8,
SimulationTimeOutOfRange = 9,
MissingArgument = 20,
InvalidTime = 30,
InvalidPeriod = 31,
Expand All @@ -359,23 +360,24 @@ impl ErrorCode {
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
ErrorCode::InternalError => "INTERNAL_ERROR",
ErrorCode::SimulationNotStarted => "SIMULATION_NOT_STARTED",
ErrorCode::SimulationTerminated => "SIMULATION_TERMINATED",
ErrorCode::SimulationDeadlock => "SIMULATION_DEADLOCK",
ErrorCode::SimulationPanic => "SIMULATION_PANIC",
ErrorCode::SimulationTimeout => "SIMULATION_TIMEOUT",
ErrorCode::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
ErrorCode::SimulationBadQuery => "SIMULATION_BAD_QUERY",
ErrorCode::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
ErrorCode::MissingArgument => "MISSING_ARGUMENT",
ErrorCode::InvalidTime => "INVALID_TIME",
ErrorCode::InvalidPeriod => "INVALID_PERIOD",
ErrorCode::InvalidDeadline => "INVALID_DEADLINE",
ErrorCode::InvalidMessage => "INVALID_MESSAGE",
ErrorCode::InvalidKey => "INVALID_KEY",
ErrorCode::SourceNotFound => "SOURCE_NOT_FOUND",
ErrorCode::SinkNotFound => "SINK_NOT_FOUND",
Self::InternalError => "INTERNAL_ERROR",
Self::SimulationNotStarted => "SIMULATION_NOT_STARTED",
Self::SimulationTerminated => "SIMULATION_TERMINATED",
Self::SimulationDeadlock => "SIMULATION_DEADLOCK",
Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS",
Self::SimulationPanic => "SIMULATION_PANIC",
Self::SimulationTimeout => "SIMULATION_TIMEOUT",
Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC",
Self::SimulationBadQuery => "SIMULATION_BAD_QUERY",
Self::SimulationTimeOutOfRange => "SIMULATION_TIME_OUT_OF_RANGE",
Self::MissingArgument => "MISSING_ARGUMENT",
Self::InvalidTime => "INVALID_TIME",
Self::InvalidPeriod => "INVALID_PERIOD",
Self::InvalidDeadline => "INVALID_DEADLINE",
Self::InvalidMessage => "INVALID_MESSAGE",
Self::InvalidKey => "INVALID_KEY",
Self::SourceNotFound => "SOURCE_NOT_FOUND",
Self::SinkNotFound => "SINK_NOT_FOUND",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
Expand All @@ -385,6 +387,7 @@ impl ErrorCode {
"SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted),
"SIMULATION_TERMINATED" => Some(Self::SimulationTerminated),
"SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock),
"SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss),
"SIMULATION_PANIC" => Some(Self::SimulationPanic),
"SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout),
"SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync),
Expand All @@ -404,7 +407,13 @@ impl ErrorCode {
}
/// Generated server implementations.
pub mod simulation_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
#![allow(
unused_variables,
dead_code,
missing_docs,
clippy::wildcard_imports,
clippy::let_unit_value,
)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with SimulationServer.
#[async_trait]
Expand Down Expand Up @@ -1033,17 +1042,19 @@ pub mod simulation_server {
}
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", tonic::Code::Unimplemented as i32)
.header(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
)
.body(empty_body())
.unwrap(),
)
let mut response = http::Response::new(empty_body());
let headers = response.headers_mut();
headers
.insert(
tonic::Status::GRPC_STATUS,
(tonic::Code::Unimplemented as i32).into(),
);
headers
.insert(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
);
Ok(response)
})
}
}
Expand Down
1 change: 1 addition & 0 deletions nexosim/src/grpc/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ fn simulation_not_started_error() -> Error {
fn map_execution_error(error: ExecutionError) -> Error {
let error_code = match error {
ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock,
ExecutionError::MessageLoss(_) => ErrorCode::SimulationMessageLoss,
ExecutionError::Panic { .. } => ErrorCode::SimulationPanic,
ExecutionError::Timeout => ErrorCode::SimulationTimeout,
ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync,
Expand Down
23 changes: 19 additions & 4 deletions nexosim/src/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl Simulation {
}

self.executor.run(self.timeout).map_err(|e| match e {
ExecutorError::Deadlock => {
ExecutorError::UnprocessedMessages(msg_count) => {
self.is_terminated = true;
let mut deadlock_info = Vec::new();
for (model, observer) in &self.observers {
Expand All @@ -346,7 +346,11 @@ impl Simulation {
}
}

ExecutionError::Deadlock(deadlock_info)
if deadlock_info.is_empty() {
ExecutionError::MessageLoss(msg_count)
} else {
ExecutionError::Deadlock(deadlock_info)
}
}
ExecutorError::Timeout => {
self.is_terminated = true;
Expand Down Expand Up @@ -525,14 +529,22 @@ pub struct DeadlockInfo {
/// An error returned upon simulation execution failure.
#[derive(Debug)]
pub enum ExecutionError {
/// The simulation has been terminated due to an earlier deadlock, model
/// panic, timeout or synchronization loss.
/// The simulation has been terminated due to an earlier deadlock, message
/// loss, model panic, timeout or synchronization loss.
Terminated,
/// The simulation has deadlocked due to the enlisted models.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
Deadlock(Vec<DeadlockInfo>),
/// One or more message were left unprocessed because the recipient's
/// mailbox was not migrated to the simulation.
///
/// The payload indicates the number of lost messages.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
/// return an [`ExecutionError::Terminated`] error.
MessageLoss(usize),
/// A panic was caught during execution.
///
/// This is a fatal error: any subsequent attempt to run the simulation will
Expand Down Expand Up @@ -604,6 +616,9 @@ impl fmt::Display for ExecutionError {

Ok(())
}
Self::MessageLoss(count) => {
write!(f, "{} messages have been lost", count)
}
Self::Panic{model, payload} => {
let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() {
s
Expand Down
1 change: 1 addition & 0 deletions nexosim/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod model_scheduling;
#[cfg(not(miri))]
mod simulation_clock_sync;
mod simulation_deadlock;
mod simulation_message_loss;
mod simulation_panic;
mod simulation_scheduling;
#[cfg(not(miri))]
Expand Down
99 changes: 99 additions & 0 deletions nexosim/tests/integration/simulation_message_loss.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Message loss detection.

use nexosim::model::Model;
use nexosim::ports::{Output, Requestor};
use nexosim::simulation::{ExecutionError, Mailbox, SimInit};
use nexosim::time::MonotonicTime;

const MT_NUM_THREADS: usize = 4;

#[derive(Default)]
struct TestModel {
output: Output<()>,
requestor: Requestor<(), ()>,
}
impl TestModel {
async fn activate_output_twice(&mut self) {
self.output.send(()).await;
self.output.send(()).await;
}
async fn activate_requestor_twice(&mut self) {
let _ = self.requestor.send(()).await;
let _ = self.requestor.send(()).await;
}
}
impl Model for TestModel {}

/// Loose an event.
fn event_loss(num_threads: usize) {
let mut model = TestModel::default();
let mbox = Mailbox::new();
let addr = mbox.address();
let bad_mbox = Mailbox::new();

// Make two self-connections so that each outgoing message generates two
// incoming messages.
model
.output
.connect(TestModel::activate_output_twice, &bad_mbox);

let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, "")
.init(t0)
.unwrap()
.0;

match simu.process_event(TestModel::activate_output_twice, (), addr) {
Err(ExecutionError::MessageLoss(msg_count)) => {
assert_eq!(msg_count, 2);
}
_ => panic!("message loss not detected"),
}
}

/// Loose an event.
fn request_loss(num_threads: usize) {
let mut model = TestModel::default();
let mbox = Mailbox::new();
let addr = mbox.address();
let bad_mbox = Mailbox::new();

model
.requestor
.connect(TestModel::activate_requestor_twice, &bad_mbox);

let t0 = MonotonicTime::EPOCH;
let mut simu = SimInit::with_num_threads(num_threads)
.add_model(model, mbox, "")
.init(t0)
.unwrap()
.0;

match simu.process_event(TestModel::activate_requestor_twice, (), addr) {
Err(ExecutionError::MessageLoss(msg_count)) => {
assert_eq!(msg_count, 1);
}
_ => panic!("message loss not detected"),
}
}

#[test]
fn event_loss_st() {
event_loss(1);
}

#[test]
fn event_loss_mt() {
event_loss(MT_NUM_THREADS);
}

#[test]
fn request_loss_st() {
request_loss(1);
}

#[test]
fn request_loss_mt() {
request_loss(MT_NUM_THREADS);
}