Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/misc api changes #63

Merged
merged 4 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions asynchronix-util/examples/observables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,11 @@ impl Processor {
}

/// Process data for dt milliseconds.
pub async fn process(&mut self, dt: u64, context: &Context<Self>) {
pub async fn process(&mut self, dt: u64, cx: &mut Context<Self>) {
if matches!(self.state.observe(), ModeId::Idle | ModeId::Processing) {
self.state
.set(State::Processing(
context
.scheduler
.schedule_keyed_event(
Duration::from_millis(dt),
Self::finish_processing,
(),
)
cx.schedule_keyed_event(Duration::from_millis(dt), Self::finish_processing, ())
.unwrap()
.into_auto(),
))
Expand All @@ -155,7 +149,7 @@ impl Processor {

impl Model for Processor {
/// Propagate all internal states.
async fn init(mut self, _: &Context<Self>) -> InitializedModel<Self> {
async fn init(mut self, _: &mut Context<Self>) -> InitializedModel<Self> {
self.state.propagate().await;
self.acc.propagate().await;
self.elc.propagate().await;
Expand Down Expand Up @@ -188,7 +182,10 @@ fn main() -> Result<(), SimulationError> {
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new().add_model(proc, proc_mbox, "proc").init(t0)?;
let mut simu = SimInit::new()
.add_model(proc, proc_mbox, "proc")
.init(t0)?
.0;

// ----------
// Simulation.
Expand Down
10 changes: 4 additions & 6 deletions asynchronix/examples/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Model for MotorAssembly {}
impl ProtoModel for ProtoMotorAssembly {
type Model = MotorAssembly;

fn build(self, ctx: &mut BuildContext<Self>) -> MotorAssembly {
fn build(self, cx: &mut BuildContext<Self>) -> MotorAssembly {
let mut assembly = MotorAssembly::new();
let mut motor = Motor::new(self.init_pos);
let mut driver = Driver::new(1.0);
Expand All @@ -105,8 +105,8 @@ impl ProtoModel for ProtoMotorAssembly {
motor.position = self.position;

// Add the submodels to the simulation.
ctx.add_submodel(driver, driver_mbox, "driver");
ctx.add_submodel(motor, motor_mbox, "motor");
cx.add_submodel(driver, driver_mbox, "driver");
cx.add_submodel(motor, motor_mbox, "motor");

assembly
}
Expand All @@ -133,12 +133,10 @@ fn main() -> Result<(), SimulationError> {
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new()
let (mut simu, scheduler) = SimInit::new()
.add_model(assembly, assembly_mbox, "assembly")
.init(t0)?;

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand Down
31 changes: 12 additions & 19 deletions asynchronix/examples/espresso_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl Controller {
}

/// Starts brewing or cancels the current brew -- input port.
pub async fn brew_cmd(&mut self, _: (), context: &Context<Self>) {
pub async fn brew_cmd(&mut self, _: (), cx: &mut Context<Self>) {
// If a brew was ongoing, sending the brew command is interpreted as a
// request to cancel it.
if let Some(key) = self.stop_brew_key.take() {
Expand All @@ -139,9 +139,7 @@ impl Controller {

// Schedule the `stop_brew()` method and turn on the pump.
self.stop_brew_key = Some(
context
.scheduler
.schedule_keyed_event(self.brew_time, Self::stop_brew, ())
cx.schedule_keyed_event(self.brew_time, Self::stop_brew, ())
.unwrap(),
);
self.pump_cmd.send(PumpCommand::On).await;
Expand Down Expand Up @@ -189,7 +187,7 @@ impl Tank {
}

/// Water volume added [m³] -- input port.
pub async fn fill(&mut self, added_volume: f64, context: &Context<Self>) {
pub async fn fill(&mut self, added_volume: f64, cx: &mut Context<Self>) {
// Ignore zero and negative values. We could also impose a maximum based
// on tank capacity.
if added_volume <= 0.0 {
Expand All @@ -207,11 +205,11 @@ impl Tank {
state.set_empty_key.cancel();

// Update the volume, saturating at 0 in case of rounding errors.
let time = context.scheduler.time();
let time = cx.time();
let elapsed_time = time.duration_since(state.last_volume_update).as_secs_f64();
self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0);

self.schedule_empty(state.flow_rate, time, context).await;
self.schedule_empty(state.flow_rate, time, cx).await;

// There is no need to broadcast the state of the water sense since
// it could not be previously `Empty` (otherwise the dynamic state
Expand All @@ -229,10 +227,10 @@ impl Tank {
/// # Panics
///
/// This method will panic if the flow rate is negative.
pub async fn set_flow_rate(&mut self, flow_rate: f64, context: &Context<Self>) {
pub async fn set_flow_rate(&mut self, flow_rate: f64, cx: &mut Context<Self>) {
assert!(flow_rate >= 0.0);

let time = context.scheduler.time();
let time = cx.time();

// If the flow rate was non-zero up to now, update the volume.
if let Some(state) = self.dynamic_state.take() {
Expand All @@ -244,7 +242,7 @@ impl Tank {
self.volume = (self.volume - state.flow_rate * elapsed_time).max(0.0);
}

self.schedule_empty(flow_rate, time, context).await;
self.schedule_empty(flow_rate, time, cx).await;
}

/// Schedules a callback for when the tank becomes empty.
Expand All @@ -257,7 +255,7 @@ impl Tank {
&mut self,
flow_rate: f64,
time: MonotonicTime,
context: &Context<Self>,
cx: &mut Context<Self>,
) {
// Determine when the tank will be empty at the current flow rate.
let duration_until_empty = if self.volume == 0.0 {
Expand All @@ -274,10 +272,7 @@ impl Tank {
let duration_until_empty = Duration::from_secs_f64(duration_until_empty);

// Schedule the next update.
match context
.scheduler
.schedule_keyed_event(duration_until_empty, Self::set_empty, ())
{
match cx.schedule_keyed_event(duration_until_empty, Self::set_empty, ()) {
Ok(set_empty_key) => {
let state = TankDynamicState {
last_volume_update: time,
Expand All @@ -304,7 +299,7 @@ impl Tank {

impl Model for Tank {
/// Broadcasts the initial state of the water sense.
async fn init(mut self, _: &Context<Self>) -> InitializedModel<Self> {
async fn init(mut self, _: &mut Context<Self>) -> InitializedModel<Self> {
self.water_sense
.send(if self.volume == 0.0 {
WaterSenseState::Empty
Expand Down Expand Up @@ -371,14 +366,12 @@ fn main() -> Result<(), SimulationError> {
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new()
let (mut simu, scheduler) = SimInit::new()
.add_model(controller, controller_mbox, "controller")
.add_model(pump, pump_mbox, "pump")
.add_model(tank, tank_mbox, "tank")
.init(t0)?;

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand Down
9 changes: 4 additions & 5 deletions asynchronix/examples/external_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,9 @@ impl Listener {

impl Model for Listener {
/// Initialize model.
async fn init(self, context: &Context<Self>) -> InitializedModel<Self> {
async fn init(self, cx: &mut Context<Self>) -> InitializedModel<Self> {
// Schedule periodic function that processes external events.
context
.scheduler
.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
cx.schedule_periodic_event(DELTA, PERIOD, Listener::process, ())
.unwrap();

self.into()
Expand Down Expand Up @@ -212,7 +210,8 @@ fn main() -> Result<(), SimulationError> {
let mut simu = SimInit::new()
.add_model(listener, listener_mbox, "listener")
.set_clock(AutoSystemClock::new())
.init(t0)?;
.init(t0)?
.0;

// ----------
// Simulation.
Expand Down
3 changes: 2 additions & 1 deletion asynchronix/examples/power_supply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ fn main() -> Result<(), SimulationError> {
.add_model(load1, load1_mbox, "load1")
.add_model(load2, load2_mbox, "load2")
.add_model(load3, load3_mbox, "load3")
.init(t0)?;
.init(t0)?
.0;

// ----------
// Simulation.
Expand Down
16 changes: 6 additions & 10 deletions asynchronix/examples/stepper_motor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Motor {

impl Model for Motor {
/// Broadcasts the initial position of the motor.
async fn init(mut self, _: &Context<Self>) -> InitializedModel<Self> {
async fn init(mut self, _: &mut Context<Self>) -> InitializedModel<Self> {
self.position.send(self.pos).await;
self.into()
}
Expand Down Expand Up @@ -126,7 +126,7 @@ impl Driver {
}

/// Pulse rate (sign = direction) [Hz] -- input port.
pub async fn pulse_rate(&mut self, pps: f64, context: &Context<Self>) {
pub async fn pulse_rate(&mut self, pps: f64, cx: &mut Context<Self>) {
let pps = pps.signum() * pps.abs().clamp(Self::MIN_PPS, Self::MAX_PPS);
if pps == self.pps {
return;
Expand All @@ -138,7 +138,7 @@ impl Driver {
// Trigger the rotation if the motor is currently idle. Otherwise the
// new value will be accounted for at the next pulse.
if is_idle {
self.send_pulse((), context).await;
self.send_pulse((), cx).await;
}
}

Expand All @@ -149,7 +149,7 @@ impl Driver {
fn send_pulse<'a>(
&'a mut self,
_: (),
context: &'a Context<Self>,
cx: &'a mut Context<Self>,
) -> impl Future<Output = ()> + Send + 'a {
async move {
let current_out = match self.next_phase {
Expand All @@ -170,9 +170,7 @@ impl Driver {
let pulse_duration = Duration::from_secs_f64(1.0 / self.pps.abs());

// Schedule the next pulse.
context
.scheduler
.schedule_event(pulse_duration, Self::send_pulse, ())
cx.schedule_event(pulse_duration, Self::send_pulse, ())
.unwrap();
}
}
Expand Down Expand Up @@ -208,13 +206,11 @@ fn main() -> Result<(), asynchronix::simulation::SimulationError> {
let t0 = MonotonicTime::EPOCH;

// Assembly and initialization.
let mut simu = SimInit::new()
let (mut simu, scheduler) = SimInit::new()
.add_model(driver, driver_mbox, "driver")
.add_model(motor, motor_mbox, "motor")
.init(t0)?;

let scheduler = simu.scheduler();

// ----------
// Simulation.
// ----------
Expand Down
18 changes: 9 additions & 9 deletions asynchronix/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<M: 'static> Inner<M> {
}

/// A receiver which can asynchronously execute `async` message that take an
/// argument of type `&mut M` and an optional `&Context<M>` argument.
/// argument of type `&mut M` and an optional `&mut Context<M>` argument.
pub(crate) struct Receiver<M> {
/// Shared data.
inner: Arc<Inner<M>>,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<M: Model> Receiver<M> {
pub(crate) async fn recv(
&mut self,
model: &mut M,
context: &Context<M>,
cx: &mut Context<M>,
) -> Result<(), RecvError> {
let msg = unsafe {
self.inner
Expand All @@ -124,7 +124,7 @@ impl<M: Model> Receiver<M> {
THREAD_MSG_COUNT.set(THREAD_MSG_COUNT.get().wrapping_sub(1));

// Take the message to obtain a boxed future.
let fut = msg.call_once(model, context, self.future_box.take().unwrap());
let fut = msg.call_once(model, cx, self.future_box.take().unwrap());

// Now that the message was taken, drop `msg` to free its slot
// in the queue and signal to one awaiting sender that a slot is
Expand Down Expand Up @@ -207,7 +207,7 @@ impl<M: Model> Sender<M> {
where
F: for<'a> FnOnce(
&'a mut M,
&'a Context<M>,
&'a mut Context<M>,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send
Expand Down Expand Up @@ -364,7 +364,7 @@ impl<M> fmt::Debug for Sender<M> {
}

/// A closure that can be called once to create a future boxed in a `RecycleBox`
/// from an `&mut M`, a `&Context<M>` and an empty `RecycleBox`.
/// from an `&mut M`, a `&mut Context<M>` and an empty `RecycleBox`.
///
/// This is basically a workaround to emulate an `FnOnce` with the equivalent of
/// an `FnMut` so that it is possible to call it as a `dyn` trait stored in a
Expand All @@ -380,7 +380,7 @@ trait MessageFn<M: Model>: Send {
fn call_once<'a>(
&mut self,
model: &'a mut M,
context: &'a Context<M>,
cx: &'a mut Context<M>,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>;
}
Expand All @@ -402,20 +402,20 @@ impl<F, M: Model> MessageFn<M> for MessageFnOnce<F, M>
where
F: for<'a> FnOnce(
&'a mut M,
&'a Context<M>,
&'a mut Context<M>,
RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a>
+ Send,
{
fn call_once<'a>(
&mut self,
model: &'a mut M,
context: &'a Context<M>,
cx: &'a mut Context<M>,
recycle_box: RecycleBox<()>,
) -> RecycleBox<dyn Future<Output = ()> + Send + 'a> {
let closure = self.msg_fn.take().unwrap();

(closure)(model, context, recycle_box)
(closure)(model, cx, recycle_box)
}
}

Expand Down
4 changes: 2 additions & 2 deletions asynchronix/src/dev_hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl Executor {
///
/// The maximum number of threads is set with the `pool_size` parameter.
pub fn new(pool_size: usize) -> Self {
let dummy_context = crate::executor::SimulationContext {
let dummy_cx = crate::executor::SimulationContext {
#[cfg(feature = "tracing")]
time_reader: crate::util::sync_cell::SyncCell::new(
crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH),
Expand All @@ -25,7 +25,7 @@ impl Executor {
};
Self(executor::Executor::new_multi_threaded(
pool_size,
dummy_context,
dummy_cx,
executor::Signal::new(),
))
}
Expand Down
3 changes: 2 additions & 1 deletion asynchronix/src/grpc/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ impl simulation_server::Simulation for GrpcSimulationService {

let (reply, bench) = self.initializer().init(request);

if let Some((simulation, endpoint_registry)) = bench {
if let Some((simulation, scheduler, endpoint_registry)) = bench {
*self.controller() = ControllerService::Started {
simulation,
scheduler,
event_source_registry: endpoint_registry.event_source_registry,
query_source_registry: endpoint_registry.query_source_registry,
key_registry: KeyRegistry::default(),
Expand Down
Loading