Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change scheduler interface and add external inputs example. #30

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion asynchronix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ waker-fn = "1.1"


[dev-dependencies]
atomic-wait = "1.1"
futures-util = "0.3"
futures-executor = "0.3"

mio = { version = "1.0", features = ["os-poll", "net"] }

[build-dependencies]
tonic-build = { version = "0.11", optional = true }
Expand Down
17 changes: 10 additions & 7 deletions asynchronix/examples/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ fn main() {
.add_model(assembly, assembly_mbox, "assembly")
.init(t0);

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand All @@ -120,13 +122,14 @@ fn main() {
assert!(position.next().is_none());

// Start the motor in 2s with a PPS of 10Hz.
simu.schedule_event(
Duration::from_secs(2),
MotorAssembly::pulse_rate,
10.0,
&assembly_addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs(2),
MotorAssembly::pulse_rate,
10.0,
&assembly_addr,
)
.unwrap();

// Advance simulation time to two next events.
simu.step();
Expand Down
27 changes: 17 additions & 10 deletions asynchronix/examples/espresso_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl Controller {
// Schedule the `stop_brew()` method and turn on the pump.
self.stop_brew_key = Some(
context
.scheduler
.schedule_keyed_event(self.brew_time, Self::stop_brew, ())
.unwrap(),
);
Expand Down Expand Up @@ -206,7 +207,7 @@ impl Tank {
state.set_empty_key.cancel();

// Update the volume, saturating at 0 in case of rounding errors.
let time = context.time();
let time = context.scheduler.time();
let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64();
self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0);

Expand All @@ -231,7 +232,7 @@ impl Tank {
pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context<Self>) {
assert!(flow_rate >= 0.0);

let time = context.time();
let time = context.scheduler.time();

// If the flow rate was non-zero up to now, update the volume.
if let Some(state) = self.dynamic_state.take() {
Expand Down Expand Up @@ -273,7 +274,10 @@ impl Tank {
let duration_until_empty = Duration::from_secs_f64(duration_until_empty);

// Schedule the next update.
match context.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) {
match context
.scheduler
.schedule_keyed_event(duration_until_empty, Self::set_empty, ())
{
Ok(set_empty_key) => {
let state = TankDynamicState {
last_volume_update: time,
Expand Down Expand Up @@ -373,6 +377,8 @@ fn main() {
.add_model(tank, tank_mbox, "tank")
.init(t0);

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand Down Expand Up @@ -426,13 +432,14 @@ fn main() {
assert_eq!(flow_rate.next(), Some(0.0));

// Interrupt the brew after 15s by pressing again the brew button.
simu.schedule_event(
Duration::from_secs(15),
Controller::brew_cmd,
(),
&controller_addr,
)
.unwrap();
scheduler
.schedule_event(
Duration::from_secs(15),
Controller::brew_cmd,
(),
&controller_addr,
)
.unwrap();
simu.process_event(Controller::brew_cmd, (), &controller_addr);
assert_eq!(flow_rate.next(), Some(pump_flow_rate));

Expand Down
251 changes: 251 additions & 0 deletions asynchronix/examples/external_input.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
//! Example: a model that reads data from the external world.
//!
//! This example demonstrates in particular:
//!
//! * external world inputs (useful in cosimulation),
//! * system clock,
//! * periodic scheduling.
//!
//! ```text
//! ┌────────────────────────────────┐
//! │ Simulation │
//! ┌────────────┐ ┌────────────┐ │ ┌──────────┐ │
//! │ │ UDP │ │ message │ message │ │ message │ ┌─────────────┐
//! │ UDP Client ├─────────▶│ UDP Server ├──────────▶├─────────▶│ Listener ├─────────▶├──▶│ EventBuffer │
//! │ │ message │ │ │ │ │ │ └─────────────┘
//! └────────────┘ └────────────┘ │ └──────────┘ │
//! └────────────────────────────────┘
//! ```

use std::io::ErrorKind;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread::{self, sleep, JoinHandle};
use std::time::Duration;

use atomic_wait::{wait, wake_one};

use mio::net::UdpSocket as MioUdpSocket;
use mio::{Events, Interest, Poll, Token};

use asynchronix::model::{Context, InitializedModel, Model, SetupContext};
use asynchronix::ports::{EventBuffer, Output};
use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::{AutoSystemClock, MonotonicTime};

const DELTA: Duration = Duration::from_millis(2);
const PERIOD: Duration = Duration::from_millis(20);
const N: u32 = 10;
const SENDER: &str = "127.0.0.1:8000";
const RECEIVER: &str = "127.0.0.1:9000";

/// Model that receives external input.
pub struct Listener {
/// Received message.
pub message: Output<String>,

/// Receiver of external messages.
rx: Receiver<String>,

/// External sender.
tx: Option<Sender<String>>,

/// Synchronization with client.
start: Arc<AtomicU32>,

/// Synchronization with simulation.
stop: Arc<AtomicBool>,

/// Handle to UDP Server.
external_handle: Option<JoinHandle<()>>,
}

impl Listener {
/// Creates a Listener.
pub fn new(start: Arc<AtomicU32>) -> Self {
start.store(0, Ordering::Relaxed);

let (tx, rx) = channel();
Self {
message: Output::default(),
rx,
tx: Some(tx),
start,
stop: Arc::new(AtomicBool::new(false)),
external_handle: None,
}
}

/// Periodically scheduled function that processes external events.
pub async fn process(&mut self) {
loop {
if let Ok(message) = self.rx.try_recv() {
self.message.send(message).await;
} else {
break;
}
}
}

/// UDP server.
///
/// Code is based on the MIO UDP example.
fn listener(tx: Sender<String>, start: Arc<AtomicU32>, stop: Arc<AtomicBool>) {
const UDP_SOCKET: Token = Token(0);
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(10);
let mut socket = MioUdpSocket::bind(RECEIVER.parse().unwrap()).unwrap();
poll.registry()
.register(&mut socket, UDP_SOCKET, Interest::READABLE)
.unwrap();
let mut buf = [0; 1 << 16];

// Wake up the client.
start.store(1, Ordering::Relaxed);
wake_one(&*start);

'process: loop {
// Wait for UDP packet or end of simulation.
if let Err(err) = poll.poll(&mut events, Some(Duration::from_secs(1))) {
if err.kind() == ErrorKind::Interrupted {
// Exit if simulation is finished.
if stop.load(Ordering::Relaxed) {
break 'process;
}
continue;
}
break 'process;
}

for event in events.iter() {
match event.token() {
UDP_SOCKET => loop {
match socket.recv_from(&mut buf) {
Ok((packet_size, _)) => {
if let Ok(message) = std::str::from_utf8(&buf[..packet_size]) {
// Inject external message into simulation.
if tx.send(message.into()).is_err() {
break 'process;
}
};
}
Err(e) if e.kind() == ErrorKind::WouldBlock => {
break;
}
_ => {
break 'process;
}
}
},
_ => {
panic!("Got event for unexpected token: {:?}", event);
}
}
}
// Exit if simulation is finished.
if stop.load(Ordering::Relaxed) {
break 'process;
}
}

poll.registry().deregister(&mut socket).unwrap();
}
}

impl Model for Listener {
/// Start UDP Server on model setup.
fn setup(&mut self, _: &SetupContext<Self>) {
let tx = self.tx.take().unwrap();
let start = Arc::clone(&self.start);
let stop = Arc::clone(&self.stop);
self.external_handle = Some(thread::spawn(move || {
Self::listener(tx, start, stop);
}));
}

/// Initialize model.
async fn init(self, context: &Context<Self>) -> InitializedModel<Self> {
// Schedule periodic function that processes external events.
context
.scheduler
.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
.unwrap();

self.into()
}
}

impl Drop for Listener {
/// Notify UDP Server that simulation is over and wait for server shutdown.
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
let handle = self.external_handle.take();
if let Some(handle) = handle {
handle.join().unwrap();
}
}
}

fn main() {
// ---------------
// Bench assembly.
// ---------------

// Models.

// Client-server synchronization.
let start = Arc::new(AtomicU32::new(0));

let mut listener = Listener::new(Arc::clone(&start));

// Mailboxes.
let listener_mbox = Mailbox::new();

// Model handles for simulation.
let mut message = EventBuffer::new();
listener.message.connect_sink(&message);

// Start time (arbitrary since models do not depend on absolute time).
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new()
.add_model(listener, listener_mbox, "listener")
.set_clock(AutoSystemClock::new())
.init(t0);

// ----------
// Simulation.
// ----------

// External client that sends UDP messages.
let sender_handle = thread::spawn(move || {
// Wait until UDP Server is ready.
wait(&start, 0);

for i in 0..N {
let socket = UdpSocket::bind(SENDER).unwrap();
socket.send_to(i.to_string().as_bytes(), RECEIVER).unwrap();
if i % 3 == 0 {
sleep(PERIOD * i)
}
}
});

// Advance simulation, external messages will be collected.
simu.step_by(Duration::from_secs(2));

// Check collected external messages.
let mut packets = 0_u32;
for _ in 0..N {
// UDP can reorder packages, we are expecting that on not too loaded
// localhost packages would not be dropped
packets |= 1 << message.next().unwrap().parse::<u8>().unwrap();
}
assert_eq!(packets, u32::MAX >> 22);
assert_eq!(message.next(), None);

sender_handle.join().unwrap();
}
Loading
Loading