From 5b2c872e15faae65ab2b1a2dd46f93fe763ec926 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Fri, 22 Nov 2024 20:26:23 +0100 Subject: [PATCH 1/3] Add UniRequestor port --- nexosim-util/src/observables.rs | 4 +- nexosim/Cargo.toml | 1 + nexosim/examples/uni_requestor.rs | 174 ++++++++++++++++++++++++++++++ nexosim/src/ports.rs | 2 +- nexosim/src/ports/output.rs | 118 +++++++++++++++++++- 5 files changed, 295 insertions(+), 4 deletions(-) create mode 100644 nexosim/examples/uni_requestor.rs diff --git a/nexosim-util/src/observables.rs b/nexosim-util/src/observables.rs index 00d19a6..b3277f0 100644 --- a/nexosim-util/src/observables.rs +++ b/nexosim-util/src/observables.rs @@ -25,8 +25,8 @@ where /// Observable state. /// -/// This object encapsulates state. Every state change is propagated to the -/// output. +/// This object encapsulates state. Every state change access is propagated to +/// the output. #[derive(Debug)] pub struct ObservableState where diff --git a/nexosim/Cargo.toml b/nexosim/Cargo.toml index b3d1d30..31dd84e 100644 --- a/nexosim/Cargo.toml +++ b/nexosim/Cargo.toml @@ -63,6 +63,7 @@ tracing-subscriber = { version= "0.3.18", optional = true } futures-util = "0.3" futures-executor = "0.3" tracing-subscriber = { version= "0.3.18", features=["env-filter"] } +nexosim-util = {version = "0.1.0", path = "../nexosim-util"} [target.'cfg(nexosim_loom)'.dev-dependencies] loom = "0.7" diff --git a/nexosim/examples/uni_requestor.rs b/nexosim/examples/uni_requestor.rs new file mode 100644 index 0000000..6c722de --- /dev/null +++ b/nexosim/examples/uni_requestor.rs @@ -0,0 +1,174 @@ +//! Example: sensor reading data from the environment model. +//! +//! This example demonstrates in particular: +//! +//! * cyclical self-scheduling methods, +//! * model initialization, +//! * simulation monitoring with buffered event sinks, +//! * connection with mapping, +//! * UniRequestor port. +//! +//! ```text +//! ┌─────────────┐ ┌──────────┐ +//! │ │ temperature │ │ overheat +//! Temperature ●─────────►│ Environment ├──────────────►│ Sensor ├──────────► +//! │ │ │ │ state +//! └─────────────┘ └──────────┘ +//! ``` + +use std::time::Duration; + +use nexosim_util::observables::ObservableValue; + +use nexosim::model::{Context, InitializedModel, Model}; +use nexosim::ports::{EventBuffer, Output, UniRequestor}; +use nexosim::simulation::{Mailbox, SimInit, SimulationError}; +use nexosim::time::MonotonicTime; + +/// Sensor model +pub struct Sensor { + /// Temperature [deg C] -- requestor port. + pub temp: UniRequestor<(), f64>, + + /// Overheat detection [-] -- output port. + pub overheat: Output, + + /// Temperature threshold [deg C] -- parameter. + threshold: f64, + + /// Overheat detection [-] -- observable state. + oh: ObservableValue, +} + +impl Sensor { + /// Creates new Sensor with overheat threshold set [deg C]. + pub fn new(threshold: f64, temp: UniRequestor<(), f64>) -> Self { + let overheat = Output::new(); + Self { + temp, + overheat: overheat.clone(), + threshold, + oh: ObservableValue::new(overheat), + } + } + + /// Cyclically scheduled method that reads data from environment and + /// avaluates overheat state. + pub async fn tick(&mut self) { + let temp = self.temp.send(()).await.unwrap(); + if temp > self.threshold { + if !self.oh.get() { + self.oh.set(true).await; + } + } else if *self.oh.get() { + self.oh.set(false).await; + } + } +} + +impl Model for Sensor { + /// Propagate state and schedule cyclic method. + async fn init(mut self, context: &mut Context) -> InitializedModel { + self.oh.propagate().await; + + context + .schedule_periodic_event( + Duration::from_millis(500), + Duration::from_millis(500), + Self::tick, + (), + ) + .unwrap(); + + self.into() + } +} + +/// Environment model. +pub struct Env { + /// Temperature [deg F] -- internal state. + temp: f64, +} + +impl Env { + /// Creates new environment model with the temperature [deg F] set. + pub fn new(temp: f64) -> Self { + Self { temp } + } + + /// Sets temperature [deg F]. + pub async fn set_temp(&mut self, temp: f64) { + self.temp = temp; + } + + /// Gets temperature [deg F]. + pub async fn get_temp(&mut self, _: ()) -> f64 { + self.temp + } +} + +impl Model for Env {} + +/// Orbiting Mars and looking for its climate remember to convert units. +pub fn fahr_to_cels(t: f64) -> f64 { + 5.0 * (t - 32.0) / 9.0 +} + +fn main() -> Result<(), SimulationError> { + // --------------- + // Bench assembly. + // --------------- + + // Mailboxes. + let sensor_mbox = Mailbox::new(); + let env_mbox = Mailbox::new(); + + // Connect data line and convert Fahrenheit degrees to Celsius. + let temp_req = UniRequestor::new_map_connected(|x| *x, fahr_to_cels, Env::get_temp, &env_mbox); + + // Models. + let mut sensor = Sensor::new(100.0, temp_req); + let env = Env::new(0.0); + + // Model handles for simulation. + let env_addr = env_mbox.address(); + + let mut overheat = EventBuffer::new(); + sensor.overheat.connect_sink(&overheat); + + // Start time (arbitrary since models do not depend on absolute time). + let t0 = MonotonicTime::EPOCH; + + // Assembly and initialization. + let (mut simu, scheduler) = SimInit::new() + .add_model(sensor, sensor_mbox, "sensor") + .add_model(env, env_mbox, "env") + .init(t0)?; + + // ---------- + // Simulation. + // ---------- + + // Check initial conditions. + assert_eq!(simu.time(), t0); + assert_eq!(overheat.next(), Some(false)); + assert!(overheat.next().is_none()); + + // Change temperature in 2s. + scheduler + .schedule_event(Duration::from_secs(2), Env::set_temp, 105.0, &env_addr) + .unwrap(); + + // Change temperature in 4s. + scheduler + .schedule_event(Duration::from_secs(4), Env::set_temp, 213.0, &env_addr) + .unwrap(); + + simu.step_until(Duration::from_secs(3))?; + assert!(overheat.next().is_none()); + + simu.step_until(Duration::from_secs(5))?; + assert_eq!(overheat.next(), Some(true)); + + Ok(()) +} diff --git a/nexosim/src/ports.rs b/nexosim/src/ports.rs index d1a74b4..f17be4c 100644 --- a/nexosim/src/ports.rs +++ b/nexosim/src/ports.rs @@ -86,7 +86,7 @@ mod source; pub use input::markers; pub use input::{InputFn, ReplierFn}; -pub use output::{Output, Requestor}; +pub use output::{Output, Requestor, UniRequestor}; pub use sink::{ event_buffer::EventBuffer, event_slot::EventSlot, EventSink, EventSinkStream, EventSinkWriter, }; diff --git a/nexosim/src/ports/output.rs b/nexosim/src/ports/output.rs index e8dff63..b8c81ae 100644 --- a/nexosim/src/ports/output.rs +++ b/nexosim/src/ports/output.rs @@ -10,7 +10,7 @@ use crate::simulation::Address; use crate::util::cached_rw_lock::CachedRwLock; use broadcaster::{EventBroadcaster, QueryBroadcaster}; -use sender::FilterMapReplierSender; +use sender::{FilterMapReplierSender, Sender}; use self::sender::{ EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender, @@ -300,3 +300,119 @@ impl fmt::Debug for Requestor { + sender: Box>, +} + +impl UniRequestor { + /// Creates new UniRequestor port connected to a replier port of the model + /// specified by the address. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of type `R` and taking as argument a value of type `T` + /// plus, optionally, a context reference. + pub fn new_connected(replier: F, address: impl Into>) -> Self + where + M: Model, + F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, + S: Send + 'static, + { + let sender = Box::new(ReplierSender::new(replier, address.into().0)); + + Self { sender } + } + + /// Creates new UniRequestor port connected with auto-conversion to a + /// replier port of the model specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn new_map_connected( + query_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> Self + where + M: Model, + C: Fn(&T) -> U + Send + Sync + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(MapReplierSender::new( + query_map, + reply_map, + replier, + address.into().0, + )); + + Self { sender } + } + + /// Creates neq UniRequestor port connected with filtering and + /// auto-conversion to a replier port of the model specified by the address. + /// + /// Queries and replies are mapped to other types using the closures + /// provided in argument, or ignored if the query closure returns `None`. + /// + /// The replier port must be an asynchronous method of a model of type `M` + /// returning a value of the type returned by the reply mapping closure and + /// taking as argument a value of the type returned by the query mapping + /// closure plus, optionally, a context reference. + pub fn new_filter_map_connected( + query_filer_map: C, + reply_map: D, + replier: F, + address: impl Into>, + ) -> Self + where + M: Model, + C: Fn(&T) -> Option + Send + Sync + 'static, + D: Fn(Q) -> R + Send + Sync + 'static, + F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone, + U: Send + 'static, + Q: Send + 'static, + S: Send + 'static, + { + let sender = Box::new(FilterMapReplierSender::new( + query_filer_map, + reply_map, + replier, + address.into().0, + )); + + Self { sender } + } + + /// Send a query to the connected replier port. + pub async fn send(&mut self, arg: T) -> Option { + if let Some(fut) = self.sender.send_owned(arg) { + let output = fut.await.unwrap(); + Some(output) + } else { + None + } + } +} + +impl fmt::Debug for UniRequestor { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "UniRequestor") + } +} From c06233ad74072d10755823c2b5dfebffec212997 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ja=C5=ADhien=20Piatlicki?= Date: Mon, 25 Nov 2024 09:54:42 +0100 Subject: [PATCH 2/3] Change after review --- nexosim/src/ports/output.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nexosim/src/ports/output.rs b/nexosim/src/ports/output.rs index b8c81ae..f36fb0f 100644 --- a/nexosim/src/ports/output.rs +++ b/nexosim/src/ports/output.rs @@ -301,7 +301,7 @@ impl fmt::Debug for Requestor Date: Mon, 25 Nov 2024 10:17:48 +0100 Subject: [PATCH 3/3] Changes after review --- nexosim/examples/uni_requestor.rs | 4 ++-- nexosim/src/ports/output.rs | 18 ++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/nexosim/examples/uni_requestor.rs b/nexosim/examples/uni_requestor.rs index 6c722de..bd24044 100644 --- a/nexosim/examples/uni_requestor.rs +++ b/nexosim/examples/uni_requestor.rs @@ -109,7 +109,7 @@ impl Env { impl Model for Env {} -/// Orbiting Mars and looking for its climate remember to convert units. +/// Converts Fahrenheit to Celsius. pub fn fahr_to_cels(t: f64) -> f64 { 5.0 * (t - 32.0) / 9.0 } @@ -124,7 +124,7 @@ fn main() -> Result<(), SimulationError> { let env_mbox = Mailbox::new(); // Connect data line and convert Fahrenheit degrees to Celsius. - let temp_req = UniRequestor::new_map_connected(|x| *x, fahr_to_cels, Env::get_temp, &env_mbox); + let temp_req = UniRequestor::with_map(|x| *x, fahr_to_cels, Env::get_temp, &env_mbox); // Models. let mut sensor = Sensor::new(100.0, temp_req); diff --git a/nexosim/src/ports/output.rs b/nexosim/src/ports/output.rs index f36fb0f..d1f838b 100644 --- a/nexosim/src/ports/output.rs +++ b/nexosim/src/ports/output.rs @@ -303,23 +303,21 @@ impl fmt::Debug for Requestor { sender: Box>, } impl UniRequestor { - /// Creates new UniRequestor port connected to a replier port of the model + /// Creates a new `UniRequestor` port connected to a replier port of the model /// specified by the address. /// /// The replier port must be an asynchronous method of a model of type `M` /// returning a value of type `R` and taking as argument a value of type `T` /// plus, optionally, a context reference. - pub fn new_connected(replier: F, address: impl Into>) -> Self + pub fn new(replier: F, address: impl Into>) -> Self where M: Model, F: for<'a> ReplierFn<'a, M, T, R, S> + Clone, @@ -330,7 +328,7 @@ impl UniRequestor { Self { sender } } - /// Creates new UniRequestor port connected with auto-conversion to a + /// Creates a new `UniRequestor` port connected with auto-conversion to a /// replier port of the model specified by the address. /// /// Queries and replies are mapped to other types using the closures @@ -340,7 +338,7 @@ impl UniRequestor { /// returning a value of the type returned by the reply mapping closure and /// taking as argument a value of the type returned by the query mapping /// closure plus, optionally, a context reference. - pub fn new_map_connected( + pub fn with_map( query_map: C, reply_map: D, replier: F, @@ -365,7 +363,7 @@ impl UniRequestor { Self { sender } } - /// Creates neq UniRequestor port connected with filtering and + /// Creates a new `UniRequestor` port connected with filtering and /// auto-conversion to a replier port of the model specified by the address. /// /// Queries and replies are mapped to other types using the closures @@ -375,7 +373,7 @@ impl UniRequestor { /// returning a value of the type returned by the reply mapping closure and /// taking as argument a value of the type returned by the query mapping /// closure plus, optionally, a context reference. - pub fn new_filter_map_connected( + pub fn with_filter_map( query_filer_map: C, reply_map: D, replier: F, @@ -400,7 +398,7 @@ impl UniRequestor { Self { sender } } - /// Send a query to the connected replier port. + /// Sends a query to the connected replier port. pub async fn send(&mut self, arg: T) -> Option { if let Some(fut) = self.sender.send_owned(arg) { let output = fut.await.unwrap();