Skip to content

Commit

Permalink
Merge pull request #63 from asynchronics/feature/misc_api_changes
Browse files Browse the repository at this point in the history
Feature/misc api changes
  • Loading branch information
jauhien authored Nov 15, 2024
2 parents 1cefe4b + 3c1056d commit c749a49
Show file tree
Hide file tree
Showing 32 changed files with 885 additions and 1,031 deletions.
17 changes: 7 additions & 10 deletions asynchronix-util/examples/observables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,11 @@ impl Processor {
}

/// Process data for dt milliseconds.
pub async fn process(&mut self, dt: u64, context: &Context<Self>) {
pub async fn process(&mut self, dt: u64, cx: &mut Context<Self>) {
if matches!(self.state.observe(), ModeId::Idle | ModeId::Processing) {
self.state
.set(State::Processing(
context
.scheduler
.schedule_keyed_event(
Duration::from_millis(dt),
Self::finish_processing,
(),
)
cx.schedule_keyed_event(Duration::from_millis(dt), Self::finish_processing, ())
.unwrap()
.into_auto(),
))
Expand All @@ -155,7 +149,7 @@ impl Processor {

impl Model for Processor {
/// Propagate all internal states.
async fn init(mut self, _: &Context<Self>) -> InitializedModel<Self> {
async fn init(mut self, _: &mut Context<Self>) -> InitializedModel<Self> {
self.state.propagate().await;
self.acc.propagate().await;
self.elc.propagate().await;
Expand Down Expand Up @@ -188,7 +182,10 @@ fn main() -> Result<(), SimulationError> {
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new().add_model(proc, proc_mbox, "proc").init(t0)?;
let mut simu = SimInit::new()
.add_model(proc, proc_mbox, "proc")
.init(t0)?
.0;

// ----------
// Simulation.
Expand Down
10 changes: 4 additions & 6 deletions asynchronix/examples/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Model for MotorAssembly {}
impl ProtoModel for ProtoMotorAssembly {
type Model = MotorAssembly;

fn build(self, ctx: &mut BuildContext<Self>) -> MotorAssembly {
fn build(self, cx: &mut BuildContext<Self>) -> MotorAssembly {
let mut assembly = MotorAssembly::new();
let mut motor = Motor::new(self.init_pos);
let mut driver = Driver::new(1.0);
Expand All @@ -105,8 +105,8 @@ impl ProtoModel for ProtoMotorAssembly {
motor.position = self.position;

// Add the submodels to the simulation.
ctx.add_submodel(driver, driver_mbox, "driver");
ctx.add_submodel(motor, motor_mbox, "motor");
cx.add_submodel(driver, driver_mbox, "driver");
cx.add_submodel(motor, motor_mbox, "motor");

assembly
}
Expand All @@ -133,12 +133,10 @@ fn main() -> Result<(), SimulationError> {
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new()
let (mut simu, scheduler) = SimInit::new()
.add_model(assembly, assembly_mbox, "assembly")
.init(t0)?;

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand Down
31 changes: 12 additions & 19 deletions asynchronix/examples/espresso_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Controller {
}

/// Starts brewing or cancels the current brew -- input port.
pub async fn brew_cmd(&mut self, _: (), context: &Context<Self>) {
pub async fn brew_cmd(&mut self, _: (), cx: &mut Context<Self>) {
// If a brew was ongoing, sending the brew command is interpreted as a
// request to cancel it.
if let Some(key) = self.stop_brew_key.take() {
Expand All @@ -139,9 +139,7 @@ impl Controller {

// Schedule the `stop_brew()` method and turn on the pump.
self.stop_brew_key = Some(
context
.scheduler
.schedule_keyed_event(self.brew_time, Self::stop_brew, ())
cx.schedule_keyed_event(self.brew_time, Self::stop_brew, ())
.unwrap(),
);
self.pump_cmd.send(PumpCommand::On).await;
Expand Down Expand Up @@ -189,7 +187,7 @@ impl Tank {
}

/// Water volume added [m³] -- input port.
pub async fn fill(&mut self, added_volume: f64, context: &Context<Self>) {
pub async fn fill(&mut self, added_volume: f64, cx: &mut Context<Self>) {
// Ignore zero and negative values. We could also impose a maximum based
// on tank capacity.
if added_volume <= 0.0 {
Expand All @@ -207,11 +205,11 @@ impl Tank {
state.set_empty_key.cancel();

// Update the volume, saturating at 0 in case of rounding errors.
let time = context.scheduler.time();
let time = cx.time();
let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64();
self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0);

self.schedule_empty(state.flow_rate, time, context).await;
self.schedule_empty(state.flow_rate, time, cx).await;

// There is no need to broadcast the state of the water sense since
// it could not be previously `Empty` (otherwise the dynamic state
Expand All @@ -229,10 +227,10 @@ impl Tank {
/// # Panics
///
/// This method will panic if the flow rate is negative.
pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context<Self>) {
pub async fn set_flow_rate(&mut self, flow_rate: f64, cx: &mut Context<Self>) {
assert!(flow_rate >= 0.0);

let time = context.scheduler.time();
let time = cx.time();

// If the flow rate was non-zero up to now, update the volume.
if let Some(state) = self.dynamic_state.take() {
Expand All @@ -244,7 +242,7 @@ impl Tank {
self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0);
}

self.schedule_empty(flow_rate, time, context).await;
self.schedule_empty(flow_rate, time, cx).await;
}

/// Schedules a callback for when the tank becomes empty.
Expand All @@ -257,7 +255,7 @@ impl Tank {
&mut self,
flow_rate: f64,
time: MonotonicTime,
context: &Context<Self>,
cx: &mut Context<Self>,
) {
// Determine when the tank will be empty at the current flow rate.
let duration_until_empty = if self.volume == 0.0 {
Expand All @@ -274,10 +272,7 @@ impl Tank {
let duration_until_empty = Duration::from_secs_f64(duration_until_empty);

// Schedule the next update.
match context
.scheduler
.schedule_keyed_event(duration_until_empty, Self::set_empty, ())
{
match cx.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) {
Ok(set_empty_key) => {
let state = TankDynamicState {
last_volume_update: time,
Expand All @@ -304,7 +299,7 @@ impl Tank {

impl Model for Tank {
/// Broadcasts the initial state of the water sense.
async fn init(mut self, _: &Context<Self>) -> InitializedModel<Self> {
async fn init(mut self, _: &mut Context<Self>) -> InitializedModel<Self> {
self.water_sense
.send(if self.volume == 0.0 {
WaterSenseState::Empty
Expand Down Expand Up @@ -371,14 +366,12 @@ fn main() -> Result<(), SimulationError> {
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new()
let (mut simu, scheduler) = SimInit::new()
.add_model(controller, controller_mbox, "controller")
.add_model(pump, pump_mbox, "pump")
.add_model(tank, tank_mbox, "tank")
.init(t0)?;

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand Down
9 changes: 4 additions & 5 deletions asynchronix/examples/external_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,9 @@ impl Listener {

impl Model for Listener {
/// Initialize model.
async fn init(self, context: &Context<Self>) -> InitializedModel<Self> {
async fn init(self, cx: &mut Context<Self>) -> InitializedModel<Self> {
// Schedule periodic function that processes external events.
context
.scheduler
.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
cx.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
.unwrap();

self.into()
Expand Down Expand Up @@ -212,7 +210,8 @@ fn main() -> Result<(), SimulationError> {
let mut simu = SimInit::new()
.add_model(listener, listener_mbox, "listener")
.set_clock(AutoSystemClock::new())
.init(t0)?;
.init(t0)?
.0;

// ----------
// Simulation.
Expand Down
3 changes: 2 additions & 1 deletion asynchronix/examples/power_supply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ fn main() -> Result<(), SimulationError> {
.add_model(load1, load1_mbox, "load1")
.add_model(load2, load2_mbox, "load2")
.add_model(load3, load3_mbox, "load3")
.init(t0)?;
.init(t0)?
.0;

// ----------
// Simulation.
Expand Down
16 changes: 6 additions & 10 deletions asynchronix/examples/stepper_motor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Motor {

impl Model for Motor {
/// Broadcasts the initial position of the motor.
async fn init(mut self, _: &Context<Self>) -> InitializedModel<Self> {
async fn init(mut self, _: &mut Context<Self>) -> InitializedModel<Self> {
self.position.send(self.pos).await;
self.into()
}
Expand Down Expand Up @@ -126,7 +126,7 @@ impl Driver {
}

/// Pulse rate (sign = direction) [Hz] -- input port.
pub async fn pulse_rate(&mut self, pps: f64, context: &Context<Self>) {
pub async fn pulse_rate(&mut self, pps: f64, cx: &mut Context<Self>) {
let pps = pps.signum() * pps.abs().clamp(Self::MIN_PPS, Self::MAX_PPS);
if pps == self.pps {
return;
Expand All @@ -138,7 +138,7 @@ impl Driver {
// Trigger the rotation if the motor is currently idle. Otherwise the
// new value will be accounted for at the next pulse.
if is_idle {
self.send_pulse((), context).await;
self.send_pulse((), cx).await;
}
}

Expand All @@ -149,7 +149,7 @@ impl Driver {
fn send_pulse<'a>(
&'a mut self,
_: (),
context: &'a Context<Self>,
cx: &'a mut Context<Self>,
) -> impl Future<Output = ()> + Send + 'a {
async move {
let current_out = match self.next_phase {
Expand All @@ -170,9 +170,7 @@ impl Driver {
let pulse_duration = Duration::from_secs_f64(1.0 / self.pps.abs());

// Schedule the next pulse.
context
.scheduler
.schedule_event(pulse_duration, Self::send_pulse, ())
cx.schedule_event(pulse_duration, Self::send_pulse, ())
.unwrap();
}
}
Expand Down Expand Up @@ -208,13 +206,11 @@ fn main() -> Result<(), asynchronix::simulation::SimulationError> {
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new()
let (mut simu, scheduler) = SimInit::new()
.add_model(driver, driver_mbox, "driver")
.add_model(motor, motor_mbox, "motor")
.init(t0)?;

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand Down
18 changes: 9 additions & 9 deletions asynchronix/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<M: 'static> Inner<M> {
}

/// A receiver which can asynchronously execute `async` message that take an
/// argument of type `&mut M` and an optional `&Context<M>` argument.
/// argument of type `&mut M` and an optional `&mut Context<M>` argument.
pub(crate) struct Receiver<M> {
/// Shared data.
inner: Arc<Inner<M>>,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<M: Model> Receiver<M> {
pub(crate) async fn recv(
&mut self,
model: &mut M,
context: &Context<M>,
cx: &mut Context<M>,
) -> Result<(), RecvError> {
let msg = unsafe {
self.inner
Expand All @@ -124,7 +124,7 @@ impl<M: Model> Receiver<M> {
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1));

// Take the message to obtain a boxed future.
let fut = msg.call_once(model, context, self.future_box.take().unwrap());
let fut = msg.call_once(model, cx, self.future_box.take().unwrap());

// Now that the message was taken, drop `msg` to free its slot
// in the queue and signal to one awaiting sender that a slot is
Expand Down Expand Up @@ -207,7 +207,7 @@ impl<M: Model> Sender<M> {
where
F: for<'a> FnOnce(
&'a mut M,
&'a Context<M>,
&'a mut Context<M>,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send
Expand Down Expand Up @@ -364,7 +364,7 @@ impl<M> fmt::Debug for Sender<M> {
}

/// A closure that can be called once to create a future boxed in a `RecycleBox`
/// from an `&mut M`, a `&Context<M>` and an empty `RecycleBox`.
/// from an `&mut M`, a `&mut Context<M>` and an empty `RecycleBox`.
///
/// This is basically a workaround to emulate an `FnOnce` with the equivalent of
/// an `FnMut` so that it is possible to call it as a `dyn` trait stored in a
Expand All @@ -380,7 +380,7 @@ trait MessageFn<M: Model>: Send {
fn call_once<'a>(
&mut self,
model: &'a mut M,
context: &'a Context<M>,
cx: &'a mut Context<M>,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>;
}
Expand All @@ -402,20 +402,20 @@ impl<F, M: Model> MessageFn<M> for MessageFnOnce<F, M>
where
F: for<'a> FnOnce(
&'a mut M,
&'a Context<M>,
&'a mut Context<M>,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send,
{
fn call_once<'a>(
&mut self,
model: &'a mut M,
context: &'a Context<M>,
cx: &'a mut Context<M>,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a> {
let closure = self.msg_fn.take().unwrap();

(closure)(model, context, recycle_box)
(closure)(model, cx, recycle_box)
}
}

Expand Down
4 changes: 2 additions & 2 deletions asynchronix/src/dev_hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl Executor {
///
/// The maximum number of threads is set with the `pool_size` parameter.
pub fn new(pool_size: usize) -> Self {
let dummy_context = crate::executor::SimulationContext {
let dummy_cx = crate::executor::SimulationContext {
#[cfg(feature = "tracing")]
time_reader: crate::util::sync_cell::SyncCell::new(
crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH),
Expand All @@ -25,7 +25,7 @@ impl Executor {
};
Self(executor::Executor::new_multi_threaded(
pool_size,
dummy_context,
dummy_cx,
executor::Signal::new(),
))
}
Expand Down
3 changes: 2 additions & 1 deletion asynchronix/src/grpc/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ impl simulation_server::Simulation for GrpcSimulationService {

let (reply, bench) = self.initializer().init(request);

if let Some((simulation, endpoint_registry)) = bench {
if let Some((simulation, scheduler, endpoint_registry)) = bench {
*self.controller() = ControllerService::Started {
simulation,
scheduler,
event_source_registry: endpoint_registry.event_source_registry,
query_source_registry: endpoint_registry.query_source_registry,
key_registry: KeyRegistry::default(),
Expand Down
Loading

0 comments on commit c749a49

Please sign in to comment.