Skip to content

Commit

Permalink
Add optional serde support and make MonotonicTime serializable
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Mar 12, 2024
1 parent 43e4101 commit 71ad4b8
Show file tree
Hide file tree
Showing 26 changed files with 890 additions and 573 deletions.
1 change: 1 addition & 0 deletions .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ on:
- 'asynchronix/src/model/ports/broadcaster.rs'
- 'asynchronix/src/model/ports/broadcaster/**'
- 'asynchronix/src/util/slot.rs'
- 'asynchronix/src/util/spsc_queue.rs'
- 'asynchronix/src/util/sync_cell.rs'

jobs:
Expand Down
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# unreleased

- Add `serde` feature and serialization support for `MonotonicTime`.

# 0.2.1 (2024-03-06)

### Added

- Add support for custom clocks and provide an optional real-time clock
([#9], [#15]).

[#9]: https://github.com/asynchronics/asynchronix/pull/9
[#15]: https://github.com/asynchronics/asynchronix/pull/15

### Misc

- Update copyright in MIT license to include contributors.

# 0.2.0 (2023-08-15)

### Added (API-breaking changes)
Expand Down
2 changes: 1 addition & 1 deletion LICENSE-MIT
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2023 Serge Barral
Copyright (c) 2024 Asynchronics sp. z o.o. and Asynchronix Contributors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Add this to your `Cargo.toml`:

```toml
[dependencies]
asynchronix = "0.2.0"
asynchronix = "0.2.1"
```


Expand Down
9 changes: 7 additions & 2 deletions asynchronix/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[package]
name = "asynchronix"
# When incrementing version and releasing to crates.io:
# - Update crate version in this Cargo.toml
# - Update crate version in README.md
# - Update CHANGELOG.md
# - Update if necessary copyright notice in LICENSE-MIT
# - Create a "vX.Y.Z" git tag
authors = ["Serge Barral <[email protected]>"]
version = "0.2.0"
version = "0.2.1"
edition = "2021"
rust-version = "1.64"
license = "MIT OR Apache-2.0"
Expand All @@ -24,11 +25,11 @@ autotests = false
dev-hooks = []
# Logging of performance-related statistics; development only.
dev-logs = []
serde = ["dep:serde"]

[dependencies]
async-event = "0.1"
crossbeam-utils = "0.8"
crossbeam-queue = "0.3"
diatomic-waker = "0.1"
futures-task = "0.3"
multishot = "0.3"
Expand All @@ -39,6 +40,10 @@ slab = "0.4"
spin_sleep = "1"
st3 = "0.4"

[dependencies.serde]
version = "1"
optional = true

[target.'cfg(asynchronix_loom)'.dependencies]
loom = "0.5"
waker-fn = "1.1"
Expand Down
5 changes: 2 additions & 3 deletions asynchronix/examples/espresso_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::pin::Pin;
use std::time::Duration;

use asynchronix::model::{InitializedModel, Model, Output};
use asynchronix::simulation::{EventSlot, Mailbox, SimInit};
use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::{EventKey, MonotonicTime, Scheduler};

/// Water pump.
Expand Down Expand Up @@ -364,8 +364,7 @@ fn main() {
pump.flow_rate.connect(Tank::set_flow_rate, &tank_mbox);

// Model handles for simulation.
let mut flow_rate = EventSlot::new();
pump.flow_rate.connect_sink(&flow_rate);
let mut flow_rate = pump.flow_rate.connect_slot().0;
let controller_addr = controller_mbox.address();
let tank_addr = tank_mbox.address();

Expand Down
14 changes: 5 additions & 9 deletions asynchronix/examples/power_supply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//! └──────────┘
//! ```
use asynchronix::model::{Model, Output, Requestor};
use asynchronix::simulation::{EventSlot, Mailbox, SimInit};
use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::MonotonicTime;

/// Power supply.
Expand Down Expand Up @@ -124,14 +124,10 @@ fn main() {
psu.pwr_out.connect(Load::pwr_in, &load3_mbox);

// Model handles for simulation.
let mut psu_power = EventSlot::new();
let mut load1_power = EventSlot::new();
let mut load2_power = EventSlot::new();
let mut load3_power = EventSlot::new();
psu.power.connect_sink(&psu_power);
load1.power.connect_sink(&load1_power);
load2.power.connect_sink(&load2_power);
load3.power.connect_sink(&load3_power);
let mut psu_power = psu.power.connect_slot().0;
let mut load1_power = load1.power.connect_slot().0;
let mut load2_power = load2.power.connect_slot().0;
let mut load3_power = load3.power.connect_slot().0;
let psu_addr = psu_mbox.address();

// Start time (arbitrary since models do not depend on absolute time).
Expand Down
5 changes: 2 additions & 3 deletions asynchronix/examples/stepper_motor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::pin::Pin;
use std::time::Duration;

use asynchronix::model::{InitializedModel, Model, Output};
use asynchronix::simulation::{EventQueue, Mailbox, SimInit};
use asynchronix::simulation::{Mailbox, SimInit};
use asynchronix::time::{MonotonicTime, Scheduler};

/// Stepper motor.
Expand Down Expand Up @@ -200,8 +200,7 @@ fn main() {
driver.current_out.connect(Motor::current_in, &motor_mbox);

// Model handles for simulation.
let mut position = EventQueue::new();
motor.position.connect_sink(&position);
let mut position = motor.position.connect_stream().0;
let motor_addr = motor_mbox.address();
let driver_addr = driver_mbox.address();

Expand Down
4 changes: 2 additions & 2 deletions asynchronix/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ impl<M: Model> Sender<M> {
/// All channels are guaranteed to have different identifiers at any given
/// time, but an identifier may be reused after all handles to a channel
/// have been dropped.
pub(crate) fn channel_id(&self) -> NonZeroUsize {
NonZeroUsize::new(Arc::as_ptr(&self.inner) as usize).unwrap()
pub(crate) fn channel_id(&self) -> ChannelId {
ChannelId(NonZeroUsize::new(&*self.inner as *const Inner<M> as usize).unwrap())
}
}

Expand Down
12 changes: 5 additions & 7 deletions asynchronix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@
//! # impl Model for Delay {}
//! # }
//! use std::time::Duration;
//! use asynchronix::simulation::{EventSlot, Mailbox, SimInit};
//! use asynchronix::simulation::{Mailbox, SimInit};
//! use asynchronix::time::MonotonicTime;
//!
//! use models::{Delay, Multiplier};
Expand All @@ -217,8 +217,7 @@
//! delay1.output.connect(Delay::input, &delay2_mbox);
//!
//! // Keep handles to the system input and output for the simulation.
//! let mut output_slot = EventSlot::new();
//! delay2.output.connect_sink(&output_slot);
//! let mut output_slot = delay2.output.connect_slot().0;
//! let input_address = multiplier1_mbox.address();
//!
//! // Pick an arbitrary simulation start time and build the simulation.
Expand Down Expand Up @@ -256,7 +255,7 @@
//!
//! Simulation outputs can be monitored using
//! [`EventSlot`](simulation::EventSlot)s and
//! [`EventQueue`](simulation::EventQueue)s, which can be connected to any
//! [`EventStream`](simulation::EventStream)s, which can be connected to any
//! model's output port. While an event slot only gives access to the last value
//! sent from a port, an event stream is an iterator that yields all events that
//! were sent in first-in-first-out order.
Expand Down Expand Up @@ -294,7 +293,7 @@
//! # impl Model for Delay {}
//! # }
//! # use std::time::Duration;
//! # use asynchronix::simulation::{EventSlot, Mailbox, SimInit};
//! # use asynchronix::simulation::{Mailbox, SimInit};
//! # use asynchronix::time::MonotonicTime;
//! # use models::{Delay, Multiplier};
//! # let mut multiplier1 = Multiplier::default();
Expand All @@ -309,8 +308,7 @@
//! # multiplier1.output.connect(Multiplier::input, &multiplier2_mbox);
//! # multiplier2.output.connect(Delay::input, &delay2_mbox);
//! # delay1.output.connect(Delay::input, &delay2_mbox);
//! # let mut output_slot = EventSlot::new();
//! # delay2.output.connect_sink(&output_slot);
//! # let mut output_slot = delay2.output.connect_slot().0;
//! # let input_address = multiplier1_mbox.address();
//! # let t0 = MonotonicTime::EPOCH;
//! # let mut simu = SimInit::new()
Expand Down
2 changes: 1 addition & 1 deletion asynchronix/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
//! can be connected to input and requestor ports when assembling the simulation
//! bench. However, input ports may instead be defined as private methods if
//! they are only used by the model itself to schedule future actions (see the
//! [`Scheduler`] examples).
//! [`Scheduler`](crate::time::Scheduler) examples).
//!
//! Changing the signature of an input or replier port is not considered to
//! alter the public interface of a model provided that the event, request and
Expand Down
54 changes: 36 additions & 18 deletions asynchronix/src/model/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
//! ports should generally be preferred over requestor ports when possible.
use std::fmt;
use std::sync::{Arc, Mutex};

mod broadcaster;
mod sender;

use crate::model::ports::sender::EventSinkSender;
use crate::model::{InputFn, Model, ReplierFn};
use crate::simulation::{Address, EventSink};
use crate::simulation::{Address, EventSlot, EventStream};
use crate::util::spsc_queue;

use broadcaster::{EventBroadcaster, QueryBroadcaster};
use broadcaster::Broadcaster;

use self::sender::{InputSender, ReplierSender};
use self::sender::{EventSender, EventSlotSender, EventStreamSender, QuerySender};

#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
/// Unique identifier for a connection between two ports.
Expand All @@ -36,7 +37,7 @@ pub struct LineId(u64);
/// methods that return no value. They broadcast events to all connected input
/// ports.
pub struct Output<T: Clone + Send + 'static> {
broadcaster: EventBroadcaster<T>,
broadcaster: Broadcaster<T, ()>,
next_line_id: u64,
}

Expand All @@ -61,23 +62,40 @@ impl<T: Clone + Send + 'static> Output<T> {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(InputSender::new(input, address.into().0));
let sender = Box::new(EventSender::new(input, address.into().0));
self.broadcaster.add(sender, line_id);

line_id
}

/// Adds a connection to an event sink such as an
/// [`EventSlot`](crate::simulation::EventSlot) or
/// [`EventQueue`](crate::simulation::EventQueue).
pub fn connect_sink<S: EventSink<T>>(&mut self, sink: &S) -> LineId {
/// Adds a connection to an event stream iterator.
pub fn connect_stream(&mut self) -> (EventStream<T>, LineId) {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(EventSinkSender::new(sink.writer()));

let (producer, consumer) = spsc_queue::spsc_queue();
let sender = Box::new(EventStreamSender::new(producer));
let event_stream = EventStream::new(consumer);

self.broadcaster.add(sender, line_id);

line_id
(event_stream, line_id)
}

/// Adds a connection to an event slot.
pub fn connect_slot(&mut self) -> (EventSlot<T>, LineId) {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;

let slot = Arc::new(Mutex::new(None));
let sender = Box::new(EventSlotSender::new(slot.clone()));
let event_slot = EventSlot::new(slot);

self.broadcaster.add(sender, line_id);

(event_slot, line_id)
}

/// Removes the connection specified by the `LineId` parameter.
Expand All @@ -100,14 +118,14 @@ impl<T: Clone + Send + 'static> Output<T> {

/// Broadcasts an event to all connected input ports.
pub async fn send(&mut self, arg: T) {
self.broadcaster.broadcast(arg).await.unwrap();
self.broadcaster.broadcast_event(arg).await.unwrap();
}
}

impl<T: Clone + Send + 'static> Default for Output<T> {
fn default() -> Self {
Self {
broadcaster: EventBroadcaster::default(),
broadcaster: Broadcaster::default(),
next_line_id: 0,
}
}
Expand All @@ -125,7 +143,7 @@ impl<T: Clone + Send + 'static> fmt::Debug for Output<T> {
/// model methods that return a value. They broadcast queries to all connected
/// replier ports.
pub struct Requestor<T: Clone + Send + 'static, R: Send + 'static> {
broadcaster: QueryBroadcaster<T, R>,
broadcaster: Broadcaster<T, R>,
next_line_id: u64,
}

Expand All @@ -150,7 +168,7 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {
assert!(self.next_line_id != u64::MAX);
let line_id = LineId(self.next_line_id);
self.next_line_id += 1;
let sender = Box::new(ReplierSender::new(replier, address.into().0));
let sender = Box::new(QuerySender::new(replier, address.into().0));
self.broadcaster.add(sender, line_id);

line_id
Expand All @@ -176,14 +194,14 @@ impl<T: Clone + Send + 'static, R: Send + 'static> Requestor<T, R> {

/// Broadcasts a query to all connected replier ports.
pub async fn send(&mut self, arg: T) -> impl Iterator<Item = R> + '_ {
self.broadcaster.broadcast(arg).await.unwrap()
self.broadcaster.broadcast_query(arg).await.unwrap()
}
}

impl<T: Clone + Send + 'static, R: Send + 'static> Default for Requestor<T, R> {
fn default() -> Self {
Self {
broadcaster: QueryBroadcaster::default(),
broadcaster: Broadcaster::default(),
next_line_id: 0,
}
}
Expand Down
Loading

0 comments on commit 71ad4b8

Please sign in to comment.