From d4192e83ed4d72ad6b2ef891bab329b68fabd094 Mon Sep 17 00:00:00 2001 From: Serge Barral Date: Tue, 26 Nov 2024 12:01:55 +0100 Subject: [PATCH] Report an error if a message cannot be delivered --- .github/workflows/ci.yml | 30 ++- nexosim/examples/uni_requestor.rs | 6 +- nexosim/src/grpc/api/simulation.proto | 11 +- nexosim/src/grpc/codegen/simulation.rs | 208 ++++++------------ nexosim/src/grpc/services.rs | 1 + nexosim/src/ports/output.rs | 32 +-- nexosim/src/ports/output/broadcaster.rs | 29 ++- nexosim/src/ports/output/sender.rs | 39 +--- nexosim/src/ports/source.rs | 23 +- nexosim/src/ports/source/broadcaster.rs | 40 ++-- nexosim/src/ports/source/sender.rs | 39 +--- nexosim/src/simulation.rs | 99 ++++++--- nexosim/src/util.rs | 1 + nexosim/src/util/unwrap_or_throw.rs | 22 ++ nexosim/tests/integration/main.rs | 1 + .../integration/simulation_no_recipient.rs | 169 ++++++++++++++ 16 files changed, 425 insertions(+), 325 deletions(-) create mode 100644 nexosim/src/util/unwrap_or_throw.rs create mode 100644 nexosim/tests/integration/simulation_no_recipient.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b7859e..f733406 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -95,52 +95,62 @@ jobs: env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example1 (single-threaded executor) + - name: Run cargo miri espresso_machine (single-threaded executor) run: cargo miri run --example espresso_machine env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 - - name: Run cargo miri example1 (multi-threaded executor) + - name: Run cargo miri espresso_machine (multi-threaded executor) run: cargo miri run --example espresso_machine env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example2 (single-threaded executor) + - name: Run cargo miri power_supply (single-threaded executor) run: cargo miri run --example power_supply env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 - - name: Run cargo miri example2 (multi-threaded executor) + - name: Run cargo miri power_supply (multi-threaded executor) run: cargo miri run --example power_supply env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example3 (single-threaded executor) + - name: Run cargo miri stepper_motor (single-threaded executor) run: cargo miri run --example stepper_motor env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 - - name: Run cargo miri example3 (multi-threaded executor) + - name: Run cargo miri stepper_motor (multi-threaded executor) run: cargo miri run --example stepper_motor env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example4 (single-threaded executor) + - name: Run cargo miri assembly (single-threaded executor) run: cargo miri run --example assembly env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 - - name: Run cargo miri example4 (multi-threaded executor) + - name: Run cargo miri assembly (multi-threaded executor) run: cargo miri run --example assembly env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 - - name: Run cargo miri example5 (single-threaded executor) + - name: Run cargo miri uni_requestor (single-threaded executor) + run: cargo miri run --example uni_requestor + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 + + - name: Run cargo miri uni_requestor (multi-threaded executor) + run: cargo miri run --example uni_requestor + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 + + - name: Run cargo miri observables (single-threaded executor) run: cargo miri run --example observables env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=1 - - name: Run cargo miri example5 (multi-threaded executor) + - name: Run cargo miri observables (multi-threaded executor) run: cargo miri run --example observables env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-num-cpus=4 diff --git a/nexosim/examples/uni_requestor.rs b/nexosim/examples/uni_requestor.rs index bd24044..509ae80 100644 --- a/nexosim/examples/uni_requestor.rs +++ b/nexosim/examples/uni_requestor.rs @@ -1,4 +1,4 @@ -//! Example: sensor reading data from the environment model. +//! Example: sensor reading data from environment model. //! //! This example demonstrates in particular: //! @@ -11,8 +11,8 @@ //! ```text //! ┌─────────────┐ ┌──────────┐ //! │ │ temperature │ │ overheat -//! Temperature ●─────────►│ Environment ├──────────────►│ Sensor ├──────────► -//! │ │ │ │ state +//! Temperature ●─────────►│ Environment ├──────────────►│ Sensor ├──────────► +//! │ │ │ │ //! └─────────────┘ └──────────┘ //! ``` diff --git a/nexosim/src/grpc/api/simulation.proto b/nexosim/src/grpc/api/simulation.proto index 8a070ba..e26759e 100644 --- a/nexosim/src/grpc/api/simulation.proto +++ b/nexosim/src/grpc/api/simulation.proto @@ -13,11 +13,12 @@ enum ErrorCode { SIMULATION_TERMINATED = 2; SIMULATION_DEADLOCK = 3; SIMULATION_MESSAGE_LOSS = 4; - SIMULATION_PANIC = 5; - SIMULATION_TIMEOUT = 6; - SIMULATION_OUT_OF_SYNC = 7; - SIMULATION_BAD_QUERY = 8; - SIMULATION_TIME_OUT_OF_RANGE = 9; + SIMULATION_NO_RECIPIENT = 5; + SIMULATION_PANIC = 6; + SIMULATION_TIMEOUT = 7; + SIMULATION_OUT_OF_SYNC = 8; + SIMULATION_BAD_QUERY = 9; + SIMULATION_TIME_OUT_OF_RANGE = 10; MISSING_ARGUMENT = 20; INVALID_TIME = 30; INVALID_PERIOD = 31; diff --git a/nexosim/src/grpc/codegen/simulation.rs b/nexosim/src/grpc/codegen/simulation.rs index 1f75c9b..cf75b70 100644 --- a/nexosim/src/grpc/codegen/simulation.rs +++ b/nexosim/src/grpc/codegen/simulation.rs @@ -299,7 +299,10 @@ pub mod close_sink_reply { #[derive(Clone, PartialEq, ::prost::Message)] pub struct AnyRequest { /// Expects exactly 1 variant. - #[prost(oneof = "any_request::Request", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")] + #[prost( + oneof = "any_request::Request", + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11" + )] pub request: ::core::option::Option, } /// Nested message and enum types in `AnyRequest`. @@ -339,11 +342,12 @@ pub enum ErrorCode { SimulationTerminated = 2, SimulationDeadlock = 3, SimulationMessageLoss = 4, - SimulationPanic = 5, - SimulationTimeout = 6, - SimulationOutOfSync = 7, - SimulationBadQuery = 8, - SimulationTimeOutOfRange = 9, + SimulationNoRecipient = 5, + SimulationPanic = 6, + SimulationTimeout = 7, + SimulationOutOfSync = 8, + SimulationBadQuery = 9, + SimulationTimeOutOfRange = 10, MissingArgument = 20, InvalidTime = 30, InvalidPeriod = 31, @@ -365,6 +369,7 @@ impl ErrorCode { Self::SimulationTerminated => "SIMULATION_TERMINATED", Self::SimulationDeadlock => "SIMULATION_DEADLOCK", Self::SimulationMessageLoss => "SIMULATION_MESSAGE_LOSS", + Self::SimulationNoRecipient => "SIMULATION_NO_RECIPIENT", Self::SimulationPanic => "SIMULATION_PANIC", Self::SimulationTimeout => "SIMULATION_TIMEOUT", Self::SimulationOutOfSync => "SIMULATION_OUT_OF_SYNC", @@ -388,6 +393,7 @@ impl ErrorCode { "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), "SIMULATION_DEADLOCK" => Some(Self::SimulationDeadlock), "SIMULATION_MESSAGE_LOSS" => Some(Self::SimulationMessageLoss), + "SIMULATION_NO_RECIPIENT" => Some(Self::SimulationNoRecipient), "SIMULATION_PANIC" => Some(Self::SimulationPanic), "SIMULATION_TIMEOUT" => Some(Self::SimulationTimeout), "SIMULATION_OUT_OF_SYNC" => Some(Self::SimulationOutOfSync), @@ -412,7 +418,7 @@ pub mod simulation_server { dead_code, missing_docs, clippy::wildcard_imports, - clippy::let_unit_value, + clippy::let_unit_value )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with SimulationServer. @@ -437,31 +443,19 @@ pub mod simulation_server { async fn schedule_event( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn cancel_event( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn process_event( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn process_query( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn read_events( &self, request: tonic::Request, @@ -496,10 +490,7 @@ pub mod simulation_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -554,21 +545,15 @@ pub mod simulation_server { "/simulation.Simulation/Init" => { #[allow(non_camel_case_types)] struct InitSvc(pub Arc); - impl tonic::server::UnaryService - for InitSvc { + impl tonic::server::UnaryService for InitSvc { type Response = super::InitReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::init(&inner, request).await - }; + let fut = async move { ::init(&inner, request).await }; Box::pin(fut) } } @@ -597,21 +582,15 @@ pub mod simulation_server { "/simulation.Simulation/Time" => { #[allow(non_camel_case_types)] struct TimeSvc(pub Arc); - impl tonic::server::UnaryService - for TimeSvc { + impl tonic::server::UnaryService for TimeSvc { type Response = super::TimeReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::time(&inner, request).await - }; + let fut = async move { ::time(&inner, request).await }; Box::pin(fut) } } @@ -640,21 +619,15 @@ pub mod simulation_server { "/simulation.Simulation/Step" => { #[allow(non_camel_case_types)] struct StepSvc(pub Arc); - impl tonic::server::UnaryService - for StepSvc { + impl tonic::server::UnaryService for StepSvc { type Response = super::StepReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::step(&inner, request).await - }; + let fut = async move { ::step(&inner, request).await }; Box::pin(fut) } } @@ -683,23 +656,16 @@ pub mod simulation_server { "/simulation.Simulation/StepUntil" => { #[allow(non_camel_case_types)] struct StepUntilSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for StepUntilSvc { + impl tonic::server::UnaryService for StepUntilSvc { type Response = super::StepUntilReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::step_until(&inner, request).await - }; + let fut = + async move { ::step_until(&inner, request).await }; Box::pin(fut) } } @@ -728,15 +694,11 @@ pub mod simulation_server { "/simulation.Simulation/ScheduleEvent" => { #[allow(non_camel_case_types)] struct ScheduleEventSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for ScheduleEventSvc { + impl tonic::server::UnaryService + for ScheduleEventSvc + { type Response = super::ScheduleEventReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -773,15 +735,9 @@ pub mod simulation_server { "/simulation.Simulation/CancelEvent" => { #[allow(non_camel_case_types)] struct CancelEventSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for CancelEventSvc { + impl tonic::server::UnaryService for CancelEventSvc { type Response = super::CancelEventReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -818,15 +774,9 @@ pub mod simulation_server { "/simulation.Simulation/ProcessEvent" => { #[allow(non_camel_case_types)] struct ProcessEventSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for ProcessEventSvc { + impl tonic::server::UnaryService for ProcessEventSvc { type Response = super::ProcessEventReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -863,15 +813,9 @@ pub mod simulation_server { "/simulation.Simulation/ProcessQuery" => { #[allow(non_camel_case_types)] struct ProcessQuerySvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for ProcessQuerySvc { + impl tonic::server::UnaryService for ProcessQuerySvc { type Response = super::ProcessQueryReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -908,15 +852,9 @@ pub mod simulation_server { "/simulation.Simulation/ReadEvents" => { #[allow(non_camel_case_types)] struct ReadEventsSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for ReadEventsSvc { + impl tonic::server::UnaryService for ReadEventsSvc { type Response = super::ReadEventsReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -953,23 +891,16 @@ pub mod simulation_server { "/simulation.Simulation/OpenSink" => { #[allow(non_camel_case_types)] struct OpenSinkSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for OpenSinkSvc { + impl tonic::server::UnaryService for OpenSinkSvc { type Response = super::OpenSinkReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::open_sink(&inner, request).await - }; + let fut = + async move { ::open_sink(&inner, request).await }; Box::pin(fut) } } @@ -998,23 +929,16 @@ pub mod simulation_server { "/simulation.Simulation/CloseSink" => { #[allow(non_camel_case_types)] struct CloseSinkSvc(pub Arc); - impl< - T: Simulation, - > tonic::server::UnaryService - for CloseSinkSvc { + impl tonic::server::UnaryService for CloseSinkSvc { type Response = super::CloseSinkReply; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::close_sink(&inner, request).await - }; + let fut = + async move { ::close_sink(&inner, request).await }; Box::pin(fut) } } @@ -1040,23 +964,19 @@ pub mod simulation_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - let mut response = http::Response::new(empty_body()); - let headers = response.headers_mut(); - headers - .insert( - tonic::Status::GRPC_STATUS, - (tonic::Code::Unimplemented as i32).into(), - ); - headers - .insert( - http::header::CONTENT_TYPE, - tonic::metadata::GRPC_CONTENT_TYPE, - ); - Ok(response) - }) - } + _ => Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers.insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers.insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }), } } } diff --git a/nexosim/src/grpc/services.rs b/nexosim/src/grpc/services.rs index 858bdc1..4907401 100644 --- a/nexosim/src/grpc/services.rs +++ b/nexosim/src/grpc/services.rs @@ -35,6 +35,7 @@ fn map_execution_error(error: ExecutionError) -> Error { let error_code = match error { ExecutionError::Deadlock(_) => ErrorCode::SimulationDeadlock, ExecutionError::MessageLoss(_) => ErrorCode::SimulationMessageLoss, + ExecutionError::NoRecipient { .. } => ErrorCode::SimulationNoRecipient, ExecutionError::Panic { .. } => ErrorCode::SimulationPanic, ExecutionError::Timeout => ErrorCode::SimulationTimeout, ExecutionError::OutOfSync(_) => ErrorCode::SimulationOutOfSync, diff --git a/nexosim/src/ports/output.rs b/nexosim/src/ports/output.rs index d1f838b..f4a4f07 100644 --- a/nexosim/src/ports/output.rs +++ b/nexosim/src/ports/output.rs @@ -8,6 +8,7 @@ use crate::ports::EventSink; use crate::ports::{InputFn, ReplierFn}; use crate::simulation::Address; use crate::util::cached_rw_lock::CachedRwLock; +use crate::util::unwrap_or_throw::UnwrapOrThrow; use broadcaster::{EventBroadcaster, QueryBroadcaster}; use sender::{FilterMapReplierSender, Sender}; @@ -32,7 +33,7 @@ pub struct Output { } impl Output { - /// Creates a new, disconnected `Output` port. + /// Creates a disconnected `Output` port. pub fn new() -> Self { Self::default() } @@ -146,7 +147,7 @@ impl Output { /// Broadcasts an event to all connected input ports. pub async fn send(&mut self, arg: T) { let broadcaster = self.broadcaster.write_scratchpad().unwrap(); - broadcaster.broadcast(arg).await.unwrap(); + broadcaster.broadcast(arg).await.unwrap_or_throw(); } } @@ -183,7 +184,7 @@ pub struct Requestor { } impl Requestor { - /// Creates a new, disconnected `Requestor` port. + /// Creates a disconnected `Requestor` port. pub fn new() -> Self { Self::default() } @@ -250,7 +251,7 @@ impl Requestor { /// closure plus, optionally, a context reference. pub fn filter_map_connect( &mut self, - query_filer_map: C, + query_filter_map: C, reply_map: D, replier: F, address: impl Into>, @@ -264,7 +265,7 @@ impl Requestor { S: Send + 'static, { let sender = Box::new(FilterMapReplierSender::new( - query_filer_map, + query_filter_map, reply_map, replier, address.into().0, @@ -279,7 +280,7 @@ impl Requestor { .unwrap() .broadcast(arg) .await - .unwrap() + .unwrap_or_throw() } } @@ -303,7 +304,7 @@ impl fmt::Debug for Requestor { @@ -311,7 +312,7 @@ pub struct UniRequestor { } impl UniRequestor { - /// Creates a new `UniRequestor` port connected to a replier port of the model + /// Creates a `UniRequestor` port connected to a replier port of the model /// specified by the address. /// /// The replier port must be an asynchronous method of a model of type `M` @@ -328,8 +329,8 @@ impl UniRequestor { Self { sender } } - /// Creates a new `UniRequestor` port connected with auto-conversion to a - /// replier port of the model specified by the address. + /// Creates an auto-converting `UniRequestor` port connected to a replier + /// port of the model specified by the address. /// /// Queries and replies are mapped to other types using the closures /// provided in argument. @@ -363,8 +364,8 @@ impl UniRequestor { Self { sender } } - /// Creates a new `UniRequestor` port connected with filtering and - /// auto-conversion to a replier port of the model specified by the address. + /// Creates an auto-converting, filtered `UniRequestor` port connected to a + /// replier port of the model specified by the address. /// /// Queries and replies are mapped to other types using the closures /// provided in argument, or ignored if the query closure returns `None`. @@ -374,7 +375,7 @@ impl UniRequestor { /// taking as argument a value of the type returned by the query mapping /// closure plus, optionally, a context reference. pub fn with_filter_map( - query_filer_map: C, + query_filter_map: C, reply_map: D, replier: F, address: impl Into>, @@ -389,7 +390,7 @@ impl UniRequestor { S: Send + 'static, { let sender = Box::new(FilterMapReplierSender::new( - query_filer_map, + query_filter_map, reply_map, replier, address.into().0, @@ -401,7 +402,8 @@ impl UniRequestor { /// Sends a query to the connected replier port. pub async fn send(&mut self, arg: T) -> Option { if let Some(fut) = self.sender.send_owned(arg) { - let output = fut.await.unwrap(); + let output = fut.await.unwrap_or_throw(); + Some(output) } else { None diff --git a/nexosim/src/ports/output/broadcaster.rs b/nexosim/src/ports/output/broadcaster.rs index bb3ac47..ee09f54 100644 --- a/nexosim/src/ports/output/broadcaster.rs +++ b/nexosim/src/ports/output/broadcaster.rs @@ -5,7 +5,8 @@ use std::task::{Context, Poll}; use diatomic_waker::WakeSink; -use super::sender::{RecycledFuture, SendError, Sender}; +use super::sender::{RecycledFuture, Sender}; +use crate::channel::SendError; use crate::util::task_set::TaskSet; /// An object that can efficiently broadcast messages to several addresses. @@ -142,7 +143,7 @@ impl EventBroadcaster { } /// Broadcasts an event to all addresses. - pub(super) async fn broadcast(&mut self, arg: T) -> Result<(), BroadcastError> { + pub(super) async fn broadcast(&mut self, arg: T) -> Result<(), SendError> { match self.inner.senders.as_mut_slice() { // No sender. [] => Ok(()), @@ -150,7 +151,7 @@ impl EventBroadcaster { // One sender at most. [sender] => match sender.send_owned(arg) { None => Ok(()), - Some(fut) => fut.await.map_err(|_| BroadcastError {}), + Some(fut) => fut.await, }, // Possibly multiple senders. @@ -158,7 +159,7 @@ impl EventBroadcaster { let (shared, mut futures) = self.inner.futures(arg); match futures.as_mut_slice() { [] => Ok(()), - [fut] => fut.await.map_err(|_| BroadcastError {}), + [fut] => fut.await, _ => BroadcastFuture::new(shared, futures).await, } } @@ -206,7 +207,7 @@ impl QueryBroadcaster { pub(super) async fn broadcast( &mut self, arg: T, - ) -> Result + '_, BroadcastError> { + ) -> Result + '_, SendError> { let output_count = match self.inner.senders.as_mut_slice() { // No sender. [] => 0, @@ -214,7 +215,7 @@ impl QueryBroadcaster { // One sender at most. [sender] => { if let Some(fut) = sender.send_owned(arg) { - let output = fut.await.map_err(|_| BroadcastError {})?; + let output = fut.await?; self.inner.shared.outputs[0] = Some(output); 1 @@ -231,7 +232,7 @@ impl QueryBroadcaster { match futures.as_mut_slice() { [] => {} [fut] => { - let output = fut.await.map_err(|_| BroadcastError {})?; + let output = fut.await?; shared.outputs[0] = Some(output); } _ => { @@ -361,7 +362,7 @@ impl<'a, R> Drop for BroadcastFuture<'a, R> { } impl<'a, R> Future for BroadcastFuture<'a, R> { - type Output = Result<(), BroadcastError>; + type Output = Result<(), SendError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; @@ -392,10 +393,10 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { *output = Some(o); this.pending_futures_count -= 1; } - Poll::Ready(Err(_)) => { + Poll::Ready(Err(SendError)) => { this.state = FutureState::Completed; - return Poll::Ready(Err(BroadcastError {})); + return Poll::Ready(Err(SendError)); } Poll::Pending => {} } @@ -450,10 +451,10 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { *output = Some(o); this.pending_futures_count -= 1; } - Poll::Ready(Err(_)) => { + Poll::Ready(Err(SendError)) => { this.state = FutureState::Completed; - return Poll::Ready(Err(BroadcastError {})); + return Poll::Ready(Err(SendError)); } Poll::Pending => {} } @@ -468,10 +469,6 @@ impl<'a, R> Future for BroadcastFuture<'a, R> { } } -/// Error returned when a message could not be delivered. -#[derive(Debug)] -pub(super) struct BroadcastError {} - #[derive(Debug, PartialEq)] enum FutureState { Uninit, diff --git a/nexosim/src/ports/output/sender.rs b/nexosim/src/ports/output/sender.rs index b6eb963..ce85587 100644 --- a/nexosim/src/ports/output/sender.rs +++ b/nexosim/src/ports/output/sender.rs @@ -1,5 +1,3 @@ -use std::error::Error; -use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; @@ -11,6 +9,7 @@ use dyn_clone::DynClone; use recycle_box::{coerce_box, RecycleBox}; use crate::channel; +use crate::channel::SendError; use crate::model::Model; use crate::ports::{EventSinkWriter, InputFn, ReplierFn}; @@ -75,9 +74,7 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - Some(RecycledFuture::new(&mut self.fut_storage, async move { - fut.await.map_err(|_| SendError {}) - })) + Some(RecycledFuture::new(&mut self.fut_storage, fut)) } } @@ -147,9 +144,7 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - Some(RecycledFuture::new(&mut self.fut_storage, async move { - fut.await.map_err(|_| SendError {}) - })) + Some(RecycledFuture::new(&mut self.fut_storage, fut)) } } @@ -221,9 +216,7 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }); - RecycledFuture::new(&mut self.fut_storage, async move { - fut.await.map_err(|_| SendError {}) - }) + RecycledFuture::new(&mut self.fut_storage, fut) }) } } @@ -474,12 +467,12 @@ where Some(RecycledFuture::new(fut_storage, async move { // Send the message. - send_fut.await.map_err(|_| SendError {})?; + send_fut.await?; // Wait until the message is processed and the reply is sent back. // If an error is received, it most likely means the mailbox was // dropped before the message was processed. - reply_receiver.recv().await.map_err(|_| SendError {}) + reply_receiver.recv().await.map_err(|_| SendError) })) } } @@ -574,7 +567,7 @@ where Some(RecycledFuture::new(fut_storage, async move { // Send the message. - send_fut.await.map_err(|_| SendError {})?; + send_fut.await?; // Wait until the message is processed and the reply is sent back. // If an error is received, it most likely means the mailbox was @@ -582,7 +575,7 @@ where reply_receiver .recv() .await - .map_err(|_| SendError {}) + .map_err(|_| SendError) .map(reply_map) })) } @@ -687,7 +680,7 @@ where RecycledFuture::new(fut_storage, async move { // Send the message. - send_fut.await.map_err(|_| SendError {})?; + send_fut.await?; // Wait until the message is processed and the reply is sent back. // If an error is received, it most likely means the mailbox was @@ -695,7 +688,7 @@ where reply_receiver .recv() .await - .map_err(|_| SendError {}) + .map_err(|_| SendError) .map(reply_map) }) }) @@ -723,18 +716,6 @@ where } } -/// Error returned when the mailbox was closed or dropped. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub(super) struct SendError {} - -impl fmt::Display for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sending message into a closed mailbox") - } -} - -impl Error for SendError {} - pub(super) struct RecycledFuture<'a, T> { fut: ManuallyDrop + Send + 'a>>>, lender_box: &'a mut Option>, diff --git a/nexosim/src/ports/source.rs b/nexosim/src/ports/source.rs index 53bbd44..3eccb8c 100644 --- a/nexosim/src/ports/source.rs +++ b/nexosim/src/ports/source.rs @@ -11,6 +11,7 @@ use crate::simulation::{ Action, ActionKey, Address, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, }; use crate::util::slot; +use crate::util::unwrap_or_throw::UnwrapOrThrow; use broadcaster::{EventBroadcaster, QueryBroadcaster, ReplyIterator}; use sender::{ @@ -31,7 +32,7 @@ pub struct EventSource { } impl EventSource { - /// Creates a new, disconnected `EventSource` port. + /// Creates a disconnected `EventSource` port. pub fn new() -> Self { Self::default() } @@ -106,7 +107,7 @@ impl EventSource { pub fn event(&mut self, arg: T) -> Action { let fut = self.broadcaster.lock().unwrap().broadcast(arg); let fut = async { - fut.await.unwrap(); + fut.await.unwrap_or_throw(); }; Action::new(OnceAction::new(fut)) @@ -123,12 +124,12 @@ impl EventSource { let action = Action::new(KeyedOnceAction::new( // Cancellation is ignored once the action is already spawned on the - // executor. This means the action cannot be cancelled while the - // simulation is running, but since an event source is meant to be - // used outside the simulator, this shouldn't be an issue in - // practice. + // executor. This means the action cannot be cancelled once the + // simulation step targeted by the action is running, but since an + // event source is meant to be used outside the simulator, this + // shouldn't be an issue in practice. |_| async { - fut.await.unwrap(); + fut.await.unwrap_or_throw(); }, action_key.clone(), )); @@ -147,7 +148,7 @@ impl EventSource { Action::new(PeriodicAction::new( || async move { let fut = broadcaster.lock().unwrap().broadcast(arg); - fut.await.unwrap(); + fut.await.unwrap_or_throw(); }, period, )) @@ -171,7 +172,7 @@ impl EventSource { // practice. |_| async move { let fut = broadcaster.lock().unwrap().broadcast(arg); - fut.await.unwrap(); + fut.await.unwrap_or_throw(); }, period, action_key.clone(), @@ -211,7 +212,7 @@ pub struct QuerySource { } impl QuerySource { - /// Creates a new, disconnected `EventSource` port. + /// Creates a disconnected `EventSource` port. pub fn new() -> Self { Self::default() } @@ -309,7 +310,7 @@ impl QuerySource { let (writer, reader) = slot::slot(); let fut = self.broadcaster.lock().unwrap().broadcast(arg); let fut = async move { - let replies = fut.await.unwrap(); + let replies = fut.await.unwrap_or_throw(); let _ = writer.write(replies); }; diff --git a/nexosim/src/ports/source/broadcaster.rs b/nexosim/src/ports/source/broadcaster.rs index 7bd1bce..4f672ea 100644 --- a/nexosim/src/ports/source/broadcaster.rs +++ b/nexosim/src/ports/source/broadcaster.rs @@ -10,6 +10,7 @@ use diatomic_waker::WakeSink; use super::sender::{Sender, SenderFuture}; +use crate::channel::SendError; use crate::util::task_set::TaskSet; /// An object that can efficiently broadcast messages to several addresses. @@ -109,7 +110,7 @@ impl EventBroadcaster { pub(super) fn broadcast( &mut self, arg: T, - ) -> impl Future> + Send { + ) -> impl Future> + Send { enum Fut { Empty, Single(F1), @@ -130,13 +131,13 @@ impl EventBroadcaster { // No sender. Fut::Empty | Fut::Single(None) => Ok(()), - Fut::Single(Some(fut)) => fut.await.map_err(|_| BroadcastError {}), + Fut::Single(Some(fut)) => fut.await, Fut::Multiple(mut futures) => match futures.as_mut_slice() { // No sender. [] => Ok(()), // One sender. - [SenderFutureState::Pending(fut)] => fut.await.map_err(|_| BroadcastError {}), + [SenderFutureState::Pending(fut)] => fut.await, // Multiple senders. _ => BroadcastFuture::new(futures).await.map(|_| ()), }, @@ -185,7 +186,7 @@ impl QueryBroadcaster { pub(super) fn broadcast( &mut self, arg: T, - ) -> impl Future, BroadcastError>> + Send { + ) -> impl Future, SendError>> + Send { enum Fut { Empty, Single(F1), @@ -208,25 +209,19 @@ impl QueryBroadcaster { Fut::Single(Some(fut)) => fut .await - .map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())) - .map_err(|_| BroadcastError {}), + .map(|reply| ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter())), Fut::Multiple(mut futures) => match futures.as_mut_slice() { // No sender. [] => Ok(ReplyIterator(Vec::new().into_iter())), // One sender. - [SenderFutureState::Pending(fut)] => fut - .await - .map(|reply| { - ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter()) - }) - .map_err(|_| BroadcastError {}), + [SenderFutureState::Pending(fut)] => fut.await.map(|reply| { + ReplyIterator(vec![SenderFutureState::Ready(reply)].into_iter()) + }), // Multiple senders. - _ => BroadcastFuture::new(futures) - .await - .map_err(|_| BroadcastError {}), + _ => BroadcastFuture::new(futures).await, }, } } @@ -280,7 +275,7 @@ impl BroadcastFuture { } impl Future for BroadcastFuture { - type Output = Result, BroadcastError>; + type Output = Result, SendError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; @@ -300,10 +295,10 @@ impl Future for BroadcastFuture { this.future_states[task_idx] = SenderFutureState::Ready(output); this.pending_futures_count -= 1; } - Poll::Ready(Err(_)) => { + Poll::Ready(Err(SendError)) => { this.state = FutureState::Completed; - return Poll::Ready(Err(BroadcastError {})); + return Poll::Ready(Err(SendError)); } Poll::Pending => {} } @@ -353,10 +348,10 @@ impl Future for BroadcastFuture { this.future_states[task_idx] = SenderFutureState::Ready(output); this.pending_futures_count -= 1; } - Poll::Ready(Err(_)) => { + Poll::Ready(Err(SendError)) => { this.state = FutureState::Completed; - return Poll::Ready(Err(BroadcastError {})); + return Poll::Ready(Err(SendError)); } Poll::Pending => {} } @@ -373,10 +368,6 @@ impl Future for BroadcastFuture { } } -/// Error returned when a message could not be delivered. -#[derive(Debug)] -pub(super) struct BroadcastError {} - #[derive(Debug, PartialEq)] enum FutureState { Uninit, @@ -749,7 +740,6 @@ mod tests { use waker_fn::waker_fn; - use super::super::sender::SendError; use super::*; // An event that may be waken spuriously. diff --git a/nexosim/src/ports/source/sender.rs b/nexosim/src/ports/source/sender.rs index 0edc865..9dec914 100644 --- a/nexosim/src/ports/source/sender.rs +++ b/nexosim/src/ports/source/sender.rs @@ -1,5 +1,3 @@ -use std::error::Error; -use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -9,6 +7,7 @@ use futures_channel::oneshot; use recycle_box::{coerce_box, RecycleBox}; use crate::channel; +use crate::channel::SendError; use crate::model::Model; use crate::ports::{InputFn, ReplierFn}; @@ -73,7 +72,6 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }) .await - .map_err(|_| SendError {}) })) } } @@ -129,7 +127,6 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }) .await - .map_err(|_| SendError {}) })) } } @@ -185,7 +182,6 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }) .await - .map_err(|_| SendError {}) }) as SenderFuture<()> }) } @@ -243,10 +239,9 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }) - .await - .map_err(|_| SendError {})?; + .await?; - reply_receiver.await.map_err(|_| SendError {}) + reply_receiver.await.map_err(|_| SendError) })) } } @@ -314,13 +309,9 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }) - .await - .map_err(|_| SendError {})?; + .await?; - reply_receiver - .await - .map_err(|_| SendError {}) - .map(&*reply_map) + reply_receiver.await.map_err(|_| SendError).map(&*reply_map) })) } } @@ -393,26 +384,10 @@ where coerce_box!(RecycleBox::recycle(recycle_box, fut)) }) - .await - .map_err(|_| SendError {})?; + .await?; - reply_receiver - .await - .map_err(|_| SendError {}) - .map(&*reply_map) + reply_receiver.await.map_err(|_| SendError).map(&*reply_map) }) as SenderFuture }) } } - -/// Error returned when the mailbox was closed or dropped. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub(super) struct SendError {} - -impl fmt::Display for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sending message into a closed mailbox") - } -} - -impl Error for SendError {} diff --git a/nexosim/src/simulation.rs b/nexosim/src/simulation.rs index e8c0cc5..a80271b 100644 --- a/nexosim/src/simulation.rs +++ b/nexosim/src/simulation.rs @@ -80,8 +80,6 @@ mod mailbox; mod scheduler; mod sim_init; -use scheduler::SchedulerQueue; - pub(crate) use scheduler::{ GlobalScheduler, KeyedOnceAction, KeyedPeriodicAction, OnceAction, PeriodicAction, }; @@ -90,7 +88,7 @@ pub use mailbox::{Address, Mailbox}; pub use scheduler::{Action, ActionKey, AutoActionKey, Scheduler, SchedulingError}; pub use sim_init::SimInit; -use std::any::Any; +use std::any::{Any, TypeId}; use std::cell::Cell; use std::error::Error; use std::fmt; @@ -104,7 +102,9 @@ use std::{panic, task}; use pin_project::pin_project; use recycle_box::{coerce_box, RecycleBox}; -use crate::channel::ChannelObserver; +use scheduler::SchedulerQueue; + +use crate::channel::{ChannelObserver, SendError}; use crate::executor::{Executor, ExecutorError, Signal}; use crate::model::{BuildContext, Context, Model, ProtoModel}; use crate::ports::{InputFn, ReplierFn}; @@ -332,42 +332,46 @@ impl Simulation { return Err(ExecutionError::Terminated); } - self.executor.run(self.timeout).map_err(|e| match e { - ExecutorError::UnprocessedMessages(msg_count) => { - self.is_terminated = true; - let mut deadlock_info = Vec::new(); - for (model, observer) in &self.observers { - let mailbox_size = observer.len(); - if mailbox_size != 0 { - deadlock_info.push(DeadlockInfo { - model: model.clone(), - mailbox_size, - }); + self.executor.run(self.timeout).map_err(|e| { + self.is_terminated = true; + + match e { + ExecutorError::UnprocessedMessages(msg_count) => { + let mut deadlock_info = Vec::new(); + for (model, observer) in &self.observers { + let mailbox_size = observer.len(); + if mailbox_size != 0 { + deadlock_info.push(DeadlockInfo { + model: model.clone(), + mailbox_size, + }); + } } - } - if deadlock_info.is_empty() { - ExecutionError::MessageLoss(msg_count) - } else { - ExecutionError::Deadlock(deadlock_info) + if deadlock_info.is_empty() { + ExecutionError::MessageLoss(msg_count) + } else { + ExecutionError::Deadlock(deadlock_info) + } } - } - ExecutorError::Timeout => { - self.is_terminated = true; + ExecutorError::Timeout => ExecutionError::Timeout, + ExecutorError::Panic(model_id, payload) => { + let model = model_id + .get() + .map(|id| self.model_names.get(id).unwrap().clone()); + + // Filter out panics originating from a `SendError`. + if (*payload).type_id() == TypeId::of::() { + return ExecutionError::NoRecipient { model }; + } - ExecutionError::Timeout - } - ExecutorError::Panic(model_id, payload) => { - self.is_terminated = true; + if let Some(model) = model { + return ExecutionError::Panic { model, payload }; + } - let model = match model_id.get() { - // The panic was emitted by a model. - Some(id) => self.model_names.get(id).unwrap().clone(), // The panic is due to an internal issue. - None => panic::resume_unwind(payload), - }; - - ExecutionError::Panic { model, payload } + panic::resume_unwind(payload); + } } }) } @@ -530,7 +534,7 @@ pub struct DeadlockInfo { #[derive(Debug)] pub enum ExecutionError { /// The simulation has been terminated due to an earlier deadlock, message - /// loss, model panic, timeout or synchronization loss. + /// loss, missing recipient, model panic, timeout or synchronization loss. Terminated, /// The simulation has deadlocked due to the enlisted models. /// @@ -545,6 +549,22 @@ pub enum ExecutionError { /// This is a fatal error: any subsequent attempt to run the simulation will /// return an [`ExecutionError::Terminated`] error. MessageLoss(usize), + /// The recipient of a message does not exists. + /// + /// This indicates that the mailbox of the recipient of a message was not + /// migrated to the simulation and was no longer alive when the message was + /// sent. + /// + /// This is a fatal error: any subsequent attempt to run the simulation will + /// return an [`ExecutionError::Terminated`] error. + NoRecipient { + /// The fully qualified name of the model that attempted to send a + /// message, or `None` if the message was sent from the scheduler. + /// + /// The fully qualified name is made of the unqualified model name, if + /// relevant prepended by the dot-separated names of all parent models. + model: Option, + }, /// A panic was caught during execution. /// /// This is a fatal error: any subsequent attempt to run the simulation will @@ -619,6 +639,15 @@ impl fmt::Display for ExecutionError { Self::MessageLoss(count) => { write!(f, "{} messages have been lost", count) } + Self::NoRecipient{model} => { + match model { + Some(model) => write!(f, + "an attempt by model '{}' to send a message failed because the recipient's mailbox is no longer alive", + model + ), + None => f.write_str("an attempt by the scheduler to send a message failed because the recipient's mailbox is no longer alive"), + } + } Self::Panic{model, payload} => { let msg: &str = if let Some(s) = payload.downcast_ref::<&str>() { s diff --git a/nexosim/src/util.rs b/nexosim/src/util.rs index 631479a..2495257 100644 --- a/nexosim/src/util.rs +++ b/nexosim/src/util.rs @@ -7,3 +7,4 @@ pub(crate) mod seq_futures; pub(crate) mod slot; pub(crate) mod sync_cell; pub(crate) mod task_set; +pub(crate) mod unwrap_or_throw; diff --git a/nexosim/src/util/unwrap_or_throw.rs b/nexosim/src/util/unwrap_or_throw.rs new file mode 100644 index 0000000..c4225cf --- /dev/null +++ b/nexosim/src/util/unwrap_or_throw.rs @@ -0,0 +1,22 @@ +//! C++-style exceptions! + +use std::any::Any; +use std::panic; + +/// An extension trait that allows sending the error itself as a panic payload +/// when unwrapping fails. +pub(crate) trait UnwrapOrThrow { + fn unwrap_or_throw(self) -> T; +} + +impl UnwrapOrThrow for Result +where + E: 'static + Any + Send, +{ + fn unwrap_or_throw(self) -> T { + match self { + Ok(v) => v, + Err(e) => panic::panic_any(e), + } + } +} diff --git a/nexosim/tests/integration/main.rs b/nexosim/tests/integration/main.rs index c76016d..b71751a 100644 --- a/nexosim/tests/integration/main.rs +++ b/nexosim/tests/integration/main.rs @@ -6,6 +6,7 @@ mod model_scheduling; mod simulation_clock_sync; mod simulation_deadlock; mod simulation_message_loss; +mod simulation_no_recipient; mod simulation_panic; mod simulation_scheduling; #[cfg(not(miri))] diff --git a/nexosim/tests/integration/simulation_no_recipient.rs b/nexosim/tests/integration/simulation_no_recipient.rs new file mode 100644 index 0000000..d002833 --- /dev/null +++ b/nexosim/tests/integration/simulation_no_recipient.rs @@ -0,0 +1,169 @@ +//! Missing recipient detection. + +use std::time::Duration; + +use nexosim::model::Model; +use nexosim::ports::{EventSource, Output, QuerySource, Requestor}; +use nexosim::simulation::{ExecutionError, Mailbox, SimInit}; +use nexosim::time::MonotonicTime; + +const MT_NUM_THREADS: usize = 4; + +#[derive(Default)] +struct TestModel { + output: Output<()>, + requestor: Requestor<(), ()>, +} +impl TestModel { + async fn activate_output(&mut self) { + self.output.send(()).await; + } + async fn activate_requestor(&mut self) { + let _ = self.requestor.send(()).await; + } +} +impl Model for TestModel {} + +/// Send an event from a model to a dead input. +fn no_input_from_model(num_threads: usize) { + const MODEL_NAME: &str = "testmodel"; + + let mut model = TestModel::default(); + let mbox = Mailbox::new(); + let addr = mbox.address(); + let bad_mbox = Mailbox::new(); + + model.output.connect(TestModel::activate_output, &bad_mbox); + + drop(bad_mbox); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::with_num_threads(num_threads) + .add_model(model, mbox, MODEL_NAME) + .init(t0) + .unwrap() + .0; + + match simu.process_event(TestModel::activate_output, (), addr) { + Err(ExecutionError::NoRecipient { model }) => { + assert_eq!(model, Some(String::from(MODEL_NAME))); + } + _ => panic!("missing recipient not detected"), + } +} + +/// Send an event from a model to a dead replier. +fn no_replier_from_model(num_threads: usize) { + const MODEL_NAME: &str = "testmodel"; + + let mut model = TestModel::default(); + let mbox = Mailbox::new(); + let addr = mbox.address(); + let bad_mbox = Mailbox::new(); + + model + .requestor + .connect(TestModel::activate_requestor, &bad_mbox); + + drop(bad_mbox); + + let t0 = MonotonicTime::EPOCH; + let mut simu = SimInit::with_num_threads(num_threads) + .add_model(model, mbox, MODEL_NAME) + .init(t0) + .unwrap() + .0; + + match simu.process_event(TestModel::activate_requestor, (), addr) { + Err(ExecutionError::NoRecipient { model }) => { + assert_eq!(model, Some(String::from(MODEL_NAME))); + } + _ => panic!("missing recipient not detected"), + } +} + +/// Send an event from the scheduler to a dead input. +fn no_input_from_scheduler(num_threads: usize) { + let bad_mbox = Mailbox::new(); + + let mut src = EventSource::new(); + src.connect(TestModel::activate_output, &bad_mbox); + let event = src.event(()); + + drop(bad_mbox); + + let t0 = MonotonicTime::EPOCH; + let (mut simu, scheduler) = SimInit::with_num_threads(num_threads).init(t0).unwrap(); + + scheduler.schedule(Duration::from_secs(1), event).unwrap(); + + match simu.step() { + Err(ExecutionError::NoRecipient { model }) => { + assert_eq!(model, None); + } + _ => panic!("missing recipient not detected"), + } +} + +/// Send a query from the scheduler to a dead input. +fn no_replier_from_scheduler(num_threads: usize) { + let bad_mbox = Mailbox::new(); + + let mut src = QuerySource::new(); + src.connect(TestModel::activate_requestor, &bad_mbox); + let query = src.query(()).0; + + drop(bad_mbox); + + let t0 = MonotonicTime::EPOCH; + let (mut simu, scheduler) = SimInit::with_num_threads(num_threads).init(t0).unwrap(); + + scheduler.schedule(Duration::from_secs(1), query).unwrap(); + + match simu.step() { + Err(ExecutionError::NoRecipient { model }) => { + assert_eq!(model, None); + } + _ => panic!("missing recipient not detected"), + } +} + +#[test] +fn no_input_from_model_st() { + no_input_from_model(1); +} + +#[test] +fn no_input_from_model_mt() { + no_input_from_model(MT_NUM_THREADS); +} + +#[test] +fn no_replier_from_model_st() { + no_replier_from_model(1); +} + +#[test] +fn no_replier_from_model_mt() { + no_replier_from_model(MT_NUM_THREADS); +} + +#[test] +fn no_input_from_scheduler_st() { + no_input_from_scheduler(1); +} + +#[test] +fn no_input_from_scheduler_mt() { + no_input_from_scheduler(MT_NUM_THREADS); +} + +#[test] +fn no_replier_from_scheduler_st() { + no_replier_from_scheduler(1); +} + +#[test] +fn no_replier_from_scheduler_mt() { + no_replier_from_scheduler(MT_NUM_THREADS); +}