Skip to content

Commit

Permalink
Merge pull request #69 from asynchronics/feature-unirequestor
Browse files Browse the repository at this point in the history
Add UniRequestor port
  • Loading branch information
sbarral authored Nov 26, 2024
2 parents 224aea5 + bb7923f commit d88c527
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 4 deletions.
4 changes: 2 additions & 2 deletions nexosim-util/src/observables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ where

/// Observable state.
///
/// This object encapsulates state. Every state change is propagated to the
/// output.
/// This object encapsulates state. Every state change access is propagated to
/// the output.
#[derive(Debug)]
pub struct ObservableState<S, T>
where
Expand Down
1 change: 1 addition & 0 deletions nexosim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ tracing-subscriber = { version= "0.3.18", optional = true }
futures-util = "0.3"
futures-executor = "0.3"
tracing-subscriber = { version= "0.3.18", features=["env-filter"] }
nexosim-util = {version = "0.1.0", path = "../nexosim-util"}

[target.'cfg(nexosim_loom)'.dev-dependencies]
loom = "0.7"
Expand Down
174 changes: 174 additions & 0 deletions nexosim/examples/uni_requestor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
//! Example: sensor reading data from the environment model.
//!
//! This example demonstrates in particular:
//!
//! * cyclical self-scheduling methods,
//! * model initialization,
//! * simulation monitoring with buffered event sinks,
//! * connection with mapping,
//! * UniRequestor port.
//!
//! ```text
//! ┌─────────────┐ ┌──────────┐
//! │ │ temperature │ │ overheat
//! Temperature ●─────────►│ Environment ├──────────────►│ Sensor ├──────────►
//! │ │ │ │ state
//! └─────────────┘ └──────────┘
//! ```
use std::time::Duration;

use nexosim_util::observables::ObservableValue;

use nexosim::model::{Context, InitializedModel, Model};
use nexosim::ports::{EventBuffer, Output, UniRequestor};
use nexosim::simulation::{Mailbox, SimInit, SimulationError};
use nexosim::time::MonotonicTime;

/// Sensor model
pub struct Sensor {
/// Temperature [deg C] -- requestor port.
pub temp: UniRequestor<(), f64>,

/// Overheat detection [-] -- output port.
pub overheat: Output<bool>,

/// Temperature threshold [deg C] -- parameter.
threshold: f64,

/// Overheat detection [-] -- observable state.
oh: ObservableValue<bool>,
}

impl Sensor {
/// Creates new Sensor with overheat threshold set [deg C].
pub fn new(threshold: f64, temp: UniRequestor<(), f64>) -> Self {
let overheat = Output::new();
Self {
temp,
overheat: overheat.clone(),
threshold,
oh: ObservableValue::new(overheat),
}
}

/// Cyclically scheduled method that reads data from environment and
/// avaluates overheat state.
pub async fn tick(&mut self) {
let temp = self.temp.send(()).await.unwrap();
if temp > self.threshold {
if !self.oh.get() {
self.oh.set(true).await;
}
} else if *self.oh.get() {
self.oh.set(false).await;
}
}
}

impl Model for Sensor {
/// Propagate state and schedule cyclic method.
async fn init(mut self, context: &mut Context<Self>) -> InitializedModel<Self> {
self.oh.propagate().await;

context
.schedule_periodic_event(
Duration::from_millis(500),
Duration::from_millis(500),
Self::tick,
(),
)
.unwrap();

self.into()
}
}

/// Environment model.
pub struct Env {
/// Temperature [deg F] -- internal state.
temp: f64,
}

impl Env {
/// Creates new environment model with the temperature [deg F] set.
pub fn new(temp: f64) -> Self {
Self { temp }
}

/// Sets temperature [deg F].
pub async fn set_temp(&mut self, temp: f64) {
self.temp = temp;
}

/// Gets temperature [deg F].
pub async fn get_temp(&mut self, _: ()) -> f64 {
self.temp
}
}

impl Model for Env {}

/// Converts Fahrenheit to Celsius.
pub fn fahr_to_cels(t: f64) -> f64 {
5.0 * (t - 32.0) / 9.0
}

fn main() -> Result<(), SimulationError> {
// ---------------
// Bench assembly.
// ---------------

// Mailboxes.
let sensor_mbox = Mailbox::new();
let env_mbox = Mailbox::new();

// Connect data line and convert Fahrenheit degrees to Celsius.
let temp_req = UniRequestor::with_map(|x| *x, fahr_to_cels, Env::get_temp, &env_mbox);

// Models.
let mut sensor = Sensor::new(100.0, temp_req);
let env = Env::new(0.0);

// Model handles for simulation.
let env_addr = env_mbox.address();

let mut overheat = EventBuffer::new();
sensor.overheat.connect_sink(&overheat);

// Start time (arbitrary since models do not depend on absolute time).
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let (mut simu, scheduler) = SimInit::new()
.add_model(sensor, sensor_mbox, "sensor")
.add_model(env, env_mbox, "env")
.init(t0)?;

// ----------
// Simulation.
// ----------

// Check initial conditions.
assert_eq!(simu.time(), t0);
assert_eq!(overheat.next(), Some(false));
assert!(overheat.next().is_none());

// Change temperature in 2s.
scheduler
.schedule_event(Duration::from_secs(2), Env::set_temp, 105.0, &env_addr)
.unwrap();

// Change temperature in 4s.
scheduler
.schedule_event(Duration::from_secs(4), Env::set_temp, 213.0, &env_addr)
.unwrap();

simu.step_until(Duration::from_secs(3))?;
assert!(overheat.next().is_none());

simu.step_until(Duration::from_secs(5))?;
assert_eq!(overheat.next(), Some(true));

Ok(())
}
2 changes: 1 addition & 1 deletion nexosim/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ mod source;

pub use input::markers;
pub use input::{InputFn, ReplierFn};
pub use output::{Output, Requestor};
pub use output::{Output, Requestor, UniRequestor};
pub use sink::{
event_buffer::EventBuffer, event_slot::EventSlot, EventSink, EventSinkStream, EventSinkWriter,
};
Expand Down
116 changes: 115 additions & 1 deletion nexosim/src/ports/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::simulation::Address;
use crate::util::cached_rw_lock::CachedRwLock;

use broadcaster::{EventBroadcaster, QueryBroadcaster};
use sender::FilterMapReplierSender;
use sender::{FilterMapReplierSender, Sender};

use self::sender::{
EventSinkSender, FilterMapEventSinkSender, FilterMapInputSender, InputSender,
Expand Down Expand Up @@ -300,3 +300,117 @@ impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for Requestor<T, R
)
}
}

/// A requestor port with exactly one connection.
///
/// A `UniRequestor` port is connected to a unique replier port, i.e. to an
/// asynchronous model method that returns a value.
#[derive(Clone)]
pub struct UniRequestor<T: Clone + Send + 'static, R: Send + 'static> {
sender: Box<dyn Sender<T, R>>,
}

impl<T: Clone + Send + 'static, R: Send + 'static> UniRequestor<T, R> {
/// Creates a new `UniRequestor` port connected to a replier port of the model
/// specified by the address.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of type `R` and taking as argument a value of type `T`
/// plus, optionally, a context reference.
pub fn new<M, F, S>(replier: F, address: impl Into<Address<M>>) -> Self
where
M: Model,
F: for<'a> ReplierFn<'a, M, T, R, S> + Clone,
S: Send + 'static,
{
let sender = Box::new(ReplierSender::new(replier, address.into().0));

Self { sender }
}

/// Creates a new `UniRequestor` port connected with auto-conversion to a
/// replier port of the model specified by the address.
///
/// Queries and replies are mapped to other types using the closures
/// provided in argument.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the reply mapping closure and
/// taking as argument a value of the type returned by the query mapping
/// closure plus, optionally, a context reference.
pub fn with_map<M, C, D, F, U, Q, S>(
query_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> Self
where
M: Model,
C: Fn(&T) -> U + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(MapReplierSender::new(
query_map,
reply_map,
replier,
address.into().0,
));

Self { sender }
}

/// Creates a new `UniRequestor` port connected with filtering and
/// auto-conversion to a replier port of the model specified by the address.
///
/// Queries and replies are mapped to other types using the closures
/// provided in argument, or ignored if the query closure returns `None`.
///
/// The replier port must be an asynchronous method of a model of type `M`
/// returning a value of the type returned by the reply mapping closure and
/// taking as argument a value of the type returned by the query mapping
/// closure plus, optionally, a context reference.
pub fn with_filter_map<M, C, D, F, U, Q, S>(
query_filer_map: C,
reply_map: D,
replier: F,
address: impl Into<Address<M>>,
) -> Self
where
M: Model,
C: Fn(&T) -> Option<U> + Send + Sync + 'static,
D: Fn(Q) -> R + Send + Sync + 'static,
F: for<'a> ReplierFn<'a, M, U, Q, S> + Clone,
U: Send + 'static,
Q: Send + 'static,
S: Send + 'static,
{
let sender = Box::new(FilterMapReplierSender::new(
query_filer_map,
reply_map,
replier,
address.into().0,
));

Self { sender }
}

/// Sends a query to the connected replier port.
pub async fn send(&mut self, arg: T) -> Option<R> {
if let Some(fut) = self.sender.send_owned(arg) {
let output = fut.await.unwrap();
Some(output)
} else {
None
}
}
}

impl<T: Clone + Send + 'static, R: Send + 'static> fmt::Debug for UniRequestor<T, R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "UniRequestor")
}
}

0 comments on commit d88c527

Please sign in to comment.