Skip to content

Commit

Permalink
Add tracing support for simulation timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
sbarral committed Sep 12, 2024
1 parent e376f17 commit 7487a26
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 107 deletions.
6 changes: 3 additions & 3 deletions asynchronix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ autotests = false
[features]
# gRPC service.
grpc = ["dep:bytes", "dep:ciborium", "dep:prost", "dep:prost-types", "dep:serde", "dep:tonic", "dep:tokio", "dep:tonic"]
tracing = ["dep:tracing"]
tracing = ["dep:tracing", "dep:tracing-subscriber"]

# DEVELOPMENT ONLY: API-unstable public exports meant for external test/benchmarking.
dev-hooks = []
Expand Down Expand Up @@ -55,14 +55,14 @@ serde = { version = "1", optional = true }
tokio = { version = "1.0", features=["net", "rt-multi-thread"], optional = true }
tonic = { version = "0.12", default-features = false, features=["codegen", "prost", "server"], optional = true }
tracing = { version= "0.1.40", default-features = false, features=["std"], optional = true }
tracing-subscriber = { version= "0.3.18", optional = true }

[dev-dependencies]
atomic-wait = "1.1"
futures-util = "0.3"
futures-executor = "0.3"
mio = { version = "1.0", features = ["os-poll", "net"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-subscriber = { version= "0.3.18", features=["env-filter"] }

[target.'cfg(asynchronix_loom)'.dev-dependencies]
loom = "0.5"
Expand Down
12 changes: 11 additions & 1 deletion asynchronix/src/dev_hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@ impl Executor {
///
/// The maximum number of threads is set with the `pool_size` parameter.
pub fn new(pool_size: usize) -> Self {
Self(executor::Executor::new_multi_threaded(pool_size))
let dummy_context = crate::executor::SimulationContext {
#[cfg(feature = "tracing")]
time_reader: crate::util::sync_cell::SyncCell::new(
crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH),
)
.reader(),
};
Self(executor::Executor::new_multi_threaded(
pool_size,
dummy_context,
))
}

/// Spawns a task which output will never be retrieved.
Expand Down
43 changes: 34 additions & 9 deletions asynchronix/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,23 @@ mod task;
use std::future::Future;
use std::sync::atomic::AtomicUsize;

use crate::macros::scoped_thread_local::scoped_thread_local;
#[cfg(feature = "tracing")]
use crate::time::AtomicTimeReader;
use task::Promise;

/// Unique identifier for executor instances.
static NEXT_EXECUTOR_ID: AtomicUsize = AtomicUsize::new(0);

/// Context common to all executor types.
#[derive(Clone)]
pub(crate) struct SimulationContext {
#[cfg(feature = "tracing")]
pub(crate) time_reader: AtomicTimeReader,
}

scoped_thread_local!(pub(crate) static SIMULATION_CONTEXT: SimulationContext);

/// A single-threaded or multi-threaded `async` executor.
#[derive(Debug)]
pub(crate) enum Executor {
Expand All @@ -21,8 +33,8 @@ pub(crate) enum Executor {

impl Executor {
/// Creates an executor that runs futures on the current thread.
pub(crate) fn new_single_threaded() -> Self {
Self::StExecutor(st_executor::Executor::new())
pub(crate) fn new_single_threaded(simulation_context: SimulationContext) -> Self {
Self::StExecutor(st_executor::Executor::new(simulation_context))
}

/// Creates an executor that runs futures on a thread pool.
Expand All @@ -33,8 +45,11 @@ impl Executor {
///
/// This will panic if the specified number of threads is zero or is more
/// than `usize::BITS`.
pub(crate) fn new_multi_threaded(num_threads: usize) -> Self {
Self::MtExecutor(mt_executor::Executor::new(num_threads))
pub(crate) fn new_multi_threaded(
num_threads: usize,
simulation_context: SimulationContext,
) -> Self {
Self::MtExecutor(mt_executor::Executor::new(num_threads, simulation_context))
}

/// Spawns a task which output will never be retrieved.
Expand Down Expand Up @@ -88,6 +103,16 @@ mod tests {

use super::*;

fn dummy_simulation_context() -> SimulationContext {
SimulationContext {
#[cfg(feature = "tracing")]
time_reader: crate::util::sync_cell::SyncCell::new(
crate::time::TearableAtomicTime::new(crate::time::MonotonicTime::EPOCH),
)
.reader(),
}
}

/// An object that runs an arbitrary closure when dropped.
struct RunOnDrop<F: FnOnce()> {
drop_fn: Option<F>,
Expand Down Expand Up @@ -208,25 +233,25 @@ mod tests {

#[test]
fn executor_deadlock_st() {
executor_deadlock(Executor::new_single_threaded());
executor_deadlock(Executor::new_single_threaded(dummy_simulation_context()));
}

#[test]
fn executor_deadlock_mt() {
executor_deadlock(Executor::new_multi_threaded(3));
executor_deadlock(Executor::new_multi_threaded(3, dummy_simulation_context()));
}

#[test]
fn executor_deadlock_mt_one_worker() {
executor_deadlock(Executor::new_multi_threaded(1));
executor_deadlock(Executor::new_multi_threaded(1, dummy_simulation_context()));
}
#[test]
fn executor_drop_cycle_st() {
executor_drop_cycle(Executor::new_single_threaded());
executor_drop_cycle(Executor::new_single_threaded(dummy_simulation_context()));
}

#[test]
fn executor_drop_cycle_mt() {
executor_drop_cycle(Executor::new_multi_threaded(3));
executor_drop_cycle(Executor::new_multi_threaded(3, dummy_simulation_context()));
}
}
17 changes: 10 additions & 7 deletions asynchronix/src/executor/mt_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ use std::time::{Duration, Instant};
use crossbeam_utils::sync::{Parker, Unparker};
use slab::Slab;

use super::task::{self, CancelToken, Promise, Runnable};
use super::{SimulationContext, NEXT_EXECUTOR_ID, SIMULATION_CONTEXT};
use crate::macros::scoped_thread_local::scoped_thread_local;
use crate::util::rng::Rng;

use super::task::{self, CancelToken, Promise, Runnable};
use super::NEXT_EXECUTOR_ID;
use pool_manager::PoolManager;

const BUCKET_SIZE: usize = 128;
Expand Down Expand Up @@ -95,7 +94,7 @@ impl Executor {
///
/// This will panic if the specified number of threads is zero or is more
/// than `usize::BITS`.
pub(crate) fn new(num_threads: usize) -> Self {
pub(crate) fn new(num_threads: usize, simulation_context: SimulationContext) -> Self {
let parker = Parker::new();
let unparker = parker.unparker().clone();

Expand Down Expand Up @@ -141,11 +140,15 @@ impl Executor {
.spawn({
let context = context.clone();
let active_tasks = active_tasks.clone();
let simulation_context = simulation_context.clone();
move || {
let worker = Worker::new(local_queue, context);
ACTIVE_TASKS.set(&active_tasks, || {
LOCAL_WORKER
.set(&worker, || run_local_worker(&worker, id, worker_parker))
SIMULATION_CONTEXT.set(&simulation_context, || {
ACTIVE_TASKS.set(&active_tasks, || {
LOCAL_WORKER.set(&worker, || {
run_local_worker(&worker, id, worker_parker)
})
})
});
}
})
Expand Down
30 changes: 18 additions & 12 deletions asynchronix/src/executor/st_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use slab::Slab;
use super::task::{self, CancelToken, Promise, Runnable};
use super::NEXT_EXECUTOR_ID;

use crate::executor::{SimulationContext, SIMULATION_CONTEXT};
use crate::macros::scoped_thread_local::scoped_thread_local;

const QUEUE_MIN_CAPACITY: usize = 32;
Expand All @@ -21,11 +22,13 @@ pub(crate) struct Executor {
context: ExecutorContext,
/// List of tasks that have not completed yet.
active_tasks: RefCell<Slab<CancelToken>>,
/// Read-only handle to the simulation time.
simulation_context: SimulationContext,
}

impl Executor {
/// Creates an executor that runs futures on the current thread.
pub(crate) fn new() -> Self {
pub(crate) fn new(simulation_context: SimulationContext) -> Self {
// Each executor instance has a unique ID inherited by tasks to ensure
// that tasks are scheduled on their parent executor.
let executor_id = NEXT_EXECUTOR_ID.fetch_add(1, Ordering::Relaxed);
Expand All @@ -40,6 +43,7 @@ impl Executor {
Self {
context,
active_tasks,
simulation_context,
}
}

Expand Down Expand Up @@ -102,14 +106,16 @@ impl Executor {
/// Execute spawned tasks, blocking until all futures have completed or
/// until the executor reaches a deadlock.
pub(crate) fn run(&mut self) {
ACTIVE_TASKS.set(&self.active_tasks, || {
EXECUTOR_CONTEXT.set(&self.context, || loop {
let task = match self.context.queue.borrow_mut().pop() {
Some(task) => task,
None => break,
};

task.run();
SIMULATION_CONTEXT.set(&self.simulation_context, || {
ACTIVE_TASKS.set(&self.active_tasks, || {
EXECUTOR_CONTEXT.set(&self.context, || loop {
let task = match self.context.queue.borrow_mut().pop() {
Some(task) => task,
None => break,
};

task.run();
})
})
});
}
Expand Down Expand Up @@ -225,9 +231,9 @@ impl<T: Future> Drop for CancellableFuture<T> {
///
/// # Panics
///
/// This function will panic if called from called outside from the executor
/// work thread or from another executor instance than the one the task for this
/// `Runnable` was spawned on.
/// This function will panic if called from outside the executor worker thread
/// or from another executor instance than the one the task for this `Runnable`
/// was spawned on.
fn schedule_task(task: Runnable, executor_id: usize) {
EXECUTOR_CONTEXT
.map(|context| {
Expand Down
53 changes: 9 additions & 44 deletions asynchronix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,46 +392,7 @@
//! asynchronix = { version = "0.3", features = ["tracing"] }
//! ```
//!
//! Each tracing event or span emitted by a model is then wrapped in a span
//! named `model` with a target `asynchronix` and an attribute `name`. The value
//! of the attribute is the name provided to
//! [`SimInit::add_model`](simulation::SimInit::add_model).
//!
//! Note that model spans are always emitted at
//! [`Level::INFO`](tracing::Level::INFO) .
//!
//! ### Tracing examples
//!
//! The examples below assume that the `tracing` feature flag is activated, the
//! `tracing_subscriber` crate is used with the `env-filter` feature flag
//! activated and the default subscriber is set up, e.g. with:
//!
//! ```ignore
//! tracing_subscriber::fmt::init();
//! ```
//!
//! In order to let only warnings and errors pass through but still see model
//! span information (which is emitted as info), you may run the bench with:
//!
//! ```{.bash}
//! $ RUST_LOG="warn,[model]=info" cargo run --release my_bench
//! 2024-09-09T21:05:47.891984Z WARN model{name="kettle"}: my_bench: water is boiling
//! 2024-09-09T21:08:13.284753Z WARN model{name="timer"}: my_bench: ring ring
//! 2024-09-09T21:08:13.284753Z WARN model{name="kettle"}: my_bench: water is hot
//! ```
//!
//! In order to see warnings or errors for the `kettle` model only, you may
//! instead run the bench with:
//!
//! ```{.bash}
//! $ RUST_LOG="[model{name=kettle}]=warn" cargo run --release my_bench
//! 2024-09-09T21:05:47.891984Z WARN model{name="kettle"}: my_bench: water is boiling
//! 2024-09-09T21:08:13.284753Z WARN model{name="kettle"}: my_bench: water is hot
//! ```
//!
//! If the `model` span name collides with that of spans defined outside
//! `asynchronix`, the above filters can be made more specific using
//! `asynchronix[model]` instead of just `[model]`.
//! See the [`tracing`] module for more information.
//!
//!
//! # Other resources
Expand Down Expand Up @@ -466,17 +427,21 @@

pub(crate) mod channel;
pub(crate) mod executor;
#[cfg(feature = "grpc")]
pub mod grpc;
mod loom_exports;
pub(crate) mod macros;
pub mod model;
pub mod ports;
#[cfg(feature = "grpc")]
pub mod registry;
pub mod simulation;
pub mod time;
pub(crate) mod util;

#[cfg(feature = "grpc")]
pub mod grpc;
#[cfg(feature = "grpc")]
pub mod registry;

#[cfg(feature = "tracing")]
pub mod tracing;

#[cfg(feature = "dev-hooks")]
pub mod dev_hooks;
7 changes: 3 additions & 4 deletions asynchronix/src/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,9 @@ use recycle_box::{coerce_box, RecycleBox};
use crate::executor::Executor;
use crate::model::{Context, Model, SetupContext};
use crate::ports::{InputFn, ReplierFn};
use crate::time::{Clock, MonotonicTime, TearableAtomicTime};
use crate::time::{AtomicTime, Clock, MonotonicTime};
use crate::util::seq_futures::SeqFuture;
use crate::util::slot;
use crate::util::sync_cell::SyncCell;

/// Simulation environment.
///
Expand Down Expand Up @@ -192,7 +191,7 @@ use crate::util::sync_cell::SyncCell;
pub struct Simulation {
executor: Executor,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCell<TearableAtomicTime>,
time: AtomicTime,
clock: Box<dyn Clock>,
}

Expand All @@ -201,7 +200,7 @@ impl Simulation {
pub(crate) fn new(
executor: Executor,
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCell<TearableAtomicTime>,
time: AtomicTime,
clock: Box<dyn Clock + 'static>,
) -> Self {
Self {
Expand Down
10 changes: 3 additions & 7 deletions asynchronix/src/simulation/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,18 @@ use crate::executor::Executor;
use crate::model::Model;
use crate::ports::InputFn;
use crate::simulation::Address;
use crate::time::{MonotonicTime, TearableAtomicTime};
use crate::time::{AtomicTimeReader, MonotonicTime};
use crate::util::priority_queue::PriorityQueue;
use crate::util::sync_cell::SyncCellReader;

/// Scheduler.
#[derive(Clone)]
pub struct Scheduler {
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCellReader<TearableAtomicTime>,
time: AtomicTimeReader,
}

impl Scheduler {
pub(crate) fn new(
scheduler_queue: Arc<Mutex<SchedulerQueue>>,
time: SyncCellReader<TearableAtomicTime>,
) -> Self {
pub(crate) fn new(scheduler_queue: Arc<Mutex<SchedulerQueue>>, time: AtomicTimeReader) -> Self {
Self {
scheduler_queue,
time,
Expand Down
Loading

0 comments on commit 7487a26

Please sign in to comment.