Skip to content

Commit

Permalink
Move parking from communication into scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Aug 27, 2024
1 parent e0d98d3 commit 2b8ff0d
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 67 deletions.
8 changes: 0 additions & 8 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ impl Allocate for Generic {
fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
Generic::Thread(t) => t.await_events(_duration),
Generic::Process(p) => p.await_events(_duration),
Generic::ProcessBinary(pb) => pb.await_events(_duration),
Generic::ZeroCopy(z) => z.await_events(_duration),
}
}
}


Expand Down
9 changes: 0 additions & 9 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;

pub use self::thread::Thread;
pub use self::process::Process;
Expand Down Expand Up @@ -51,14 +50,6 @@ pub trait Allocate {
/// into a performance problem.
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;

/// Awaits communication events.
///
/// This method may park the current thread, for at most `duration`,
/// until new events arrive.
/// The method is not guaranteed to wait for any amount of time, but
/// good implementations should use this as a hint to park the thread.
fn await_events(&self, _duration: Option<Duration>) { }

/// Ensure that received messages are surfaced in each channel.
///
/// This method should be called to ensure that received messages are
Expand Down
5 changes: 0 additions & 5 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::rc::Rc;
use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use std::any::Any;
use std::time::Duration;
use std::collections::{HashMap};
use crossbeam_channel::{Sender, Receiver};

Expand Down Expand Up @@ -178,10 +177,6 @@ impl Allocate for Process {
self.inner.events()
}

fn await_events(&self, duration: Option<Duration>) {
self.inner.await_events(duration);
}

fn receive(&mut self) {
let mut events = self.inner.events().borrow_mut();
while let Ok(index) = self.counters_recv.try_recv() {
Expand Down
11 changes: 0 additions & 11 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

use crate::allocator::{Allocate, AllocateBuilder};
Expand Down Expand Up @@ -35,16 +34,6 @@ impl Allocate for Thread {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<Duration>) {
if self.events.borrow().is_empty() {
if let Some(duration) = duration {
std::thread::park_timeout(duration);
}
else {
std::thread::park();
}
}
}
}

/// Thread-local counting channel push endpoint.
Expand Down
3 changes: 0 additions & 3 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,4 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
self.inner.events()
}
fn await_events(&self, duration: Option<std::time::Duration>) {
self.inner.await_events(duration);
}
}
10 changes: 0 additions & 10 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,4 @@ impl Allocate for ProcessAllocator {
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<std::time::Duration>) {
if self.events.borrow().is_empty() {
if let Some(duration) = duration {
std::thread::park_timeout(duration);
}
else {
std::thread::park();
}
}
}
}
30 changes: 29 additions & 1 deletion timely/src/scheduling/activate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl Activations {
/// This method should be used before putting a worker thread to sleep, as it
/// indicates the amount of time before the thread should be unparked for the
/// next scheduled activation.
pub fn empty_for(&self) -> Option<Duration> {
fn empty_for(&self) -> Option<Duration> {
if !self.bounds.is_empty() || self.timer.is_none() {
Some(Duration::new(0,0))
}
Expand All @@ -189,6 +189,34 @@ impl Activations {
})
}
}

/// Indicates that there is nothing to do for `timeout`, and that the scheduler
/// can allow the thread to sleep until then.
///
/// The method does not *need* to park the thread, and indeed it may elect to
/// unpark earlier if there are deferred activations.
pub fn park_timeout(&self, timeout: Option<Duration>) {
let empty_for = self.empty_for();
let timeout = match (timeout, empty_for) {
(Some(x), Some(y)) => Some(std::cmp::min(x,y)),
(x, y) => x.or(y),
};

if let Some(timeout) = timeout {
std::thread::park_timeout(timeout);
}
else {
std::thread::park();
}
}

/// True iff there are no immediate activations.
///
/// Used by others to guard work done in anticipation of potentially parking.
/// An alternate method name could be `would_park`.
pub fn is_idle(&self) -> bool {
self.bounds.is_empty() || self.timer.is_none()
}
}

/// A thread-safe handle to an `Activations`.
Expand Down
35 changes: 15 additions & 20 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl<A: Allocate> Worker<A> {
/// worker.step_or_park(Some(Duration::from_secs(1)));
/// });
/// ```
pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
pub fn step_or_park(&mut self, timeout: Option<Duration>) -> bool {

{ // Process channel events. Activate responders.
let mut allocator = self.allocator.borrow_mut();
Expand Down Expand Up @@ -362,28 +362,23 @@ impl<A: Allocate> Worker<A> {
.borrow_mut()
.advance();

// Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
let empty_for = self.activations.borrow().empty_for();
// Determine the minimum park duration, where `None` are an absence of a constraint.
let delay = match (duration, empty_for) {
(Some(x), Some(y)) => Some(std::cmp::min(x,y)),
(x, y) => x.or(y),
};
if self.activations.borrow().is_idle() {
// If the timeout is zero, don't bother trying to park.
// More generally, we could put some threshold in here.
if timeout != Some(Duration::new(0, 0)) {
// Log parking and flush log.
if let Some(l) = self.logging().as_mut() {
l.log(crate::logging::ParkEvent::park(timeout));
l.flush();
}

if delay != Some(Duration::new(0,0)) {
// We have just drained `allocator.events()` up above;
// otherwise we should first check it for emptiness.
self.activations.borrow().park_timeout(timeout);

// Log parking and flush log.
if let Some(l) = self.logging().as_mut() {
l.log(crate::logging::ParkEvent::park(delay));
l.flush();
// Log return from unpark.
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
}

self.allocator
.borrow()
.await_events(delay);

// Log return from unpark.
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
}
else { // Schedule active dataflows.

Expand Down

0 comments on commit 2b8ff0d

Please sign in to comment.