Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: move block_on to handle #3097

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 15 additions & 50 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::future::poll_fn;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
Expand All @@ -11,7 +10,7 @@ use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::task::Poll::Ready;
use std::time::Duration;

/// Executes tasks on the current thread
Expand All @@ -20,10 +19,6 @@ pub(crate) struct BasicScheduler<P: Park> {
/// between all `block_on` calls.
inner: Mutex<Option<Inner<P>>>,

/// Notifier for waking up other threads to steal the
/// parker.
notify: Notify,

/// Sendable task spawner
spawner: Spawner,
}
Expand Down Expand Up @@ -70,6 +65,10 @@ struct Shared {

/// Unpark the blocked thread
unpark: Box<dyn Unpark>,

/// Notifier for waking up other threads to steal the
/// parker.
notify: Notify,
}

/// Thread-local context.
Expand Down Expand Up @@ -101,6 +100,7 @@ impl<P: Park> BasicScheduler<P> {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
notify: Notify::new(),
}),
};

Expand All @@ -114,53 +114,14 @@ impl<P: Park> BasicScheduler<P> {
park,
}));

BasicScheduler {
inner,
notify: Notify::new(),
spawner,
}
BasicScheduler { inner, spawner }
}

pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
}

pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
pin!(future);

// Attempt to steal the dedicated parker and block_on the future if we can there,
// otherwise, lets select on a notification that the parker is available
// or the future is complete.
loop {
if let Some(inner) = &mut self.take_inner() {
return inner.block_on(future);
} else {
let mut enter = crate::runtime::enter(false);

let notified = self.notify.notified();
pin!(notified);

if let Some(out) = enter
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}

if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}

Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
}

fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
pub(crate) fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
let inner = self.inner.lock().take()?;

Some(InnerGuard {
Expand Down Expand Up @@ -324,6 +285,10 @@ impl Spawner {
handle
}

pub(crate) fn notify(&self) -> &Notify {
&self.shared.notify
}

fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
self.shared.queue.lock().pop_front()
}
Expand Down Expand Up @@ -393,13 +358,13 @@ impl Wake for Shared {
/// Used to ensure we always place the Inner value
/// back into its slot in `BasicScheduler`, even if the
/// future panics.
struct InnerGuard<'a, P: Park> {
pub(crate) struct InnerGuard<'a, P: Park> {
inner: Option<Inner<P>>,
basic_scheduler: &'a BasicScheduler<P>,
}

impl<P: Park> InnerGuard<'_, P> {
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
pub(crate) fn block_on<F: Future>(&mut self, future: F) -> F::Output {
// The only time inner gets set to `None` is if we have dropped
// already so this unwrap is safe.
self.inner.as_mut().unwrap().block_on(future)
Expand All @@ -417,7 +382,7 @@ impl<P: Park> Drop for InnerGuard<'_, P> {

// Wake up other possible threads that could steal
// the dedicated parker P.
self.basic_scheduler.notify.notify_one()
self.basic_scheduler.spawner.shared.notify.notify_one()
}
}
}
104 changes: 103 additions & 1 deletion tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::future::poll_fn;
use crate::park::Park;
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{blocking, context, driver, Spawner};
use crate::runtime::{basic_scheduler, blocking, context, driver, Spawner};
use crate::util::error::CONTEXT_MISSING_ERROR;

use std::future::Future;
use std::task::Poll::{Pending, Ready};
use std::{error, fmt};

/// Handle to the runtime.
Expand Down Expand Up @@ -201,6 +204,105 @@ impl Handle {
let _ = self.blocking_spawner.spawn(task, &self);
handle
}

/// Run a future to completion on the Tokio runtime. This is the
/// runtime's entry point.
Comment on lines +208 to +209
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect, this is not the entry point but just an additional block_on?

///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers
/// which the future spawns internally will be executed on the runtime.
///
/// # Multi thread scheduler
///
/// When the multi thread scheduler is used this will allow futures
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would mention it will also run spawned tasks on the runtime worker threads.

/// to run within the io driver and timer context of the overall runtime.
///
/// # Current thread scheduler
///
/// When the current thread scheduler is enabled `block_on`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is the behavior of Runtime::block_on I don't think this should be the behavior of Handle::block_on? @carllerche What do you think? I don't think the handle's block on should be able to steal the runtime. In that case it is the same as Runtime except it removes the ownership of the runtime type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, my initial thinking was that the handle should follow the Runtime's behavior, but it does complicate the shutdown logic as well as seems a bit weird given the "current_thread" name... Perhaps, block_on in this case should just block the thread and enter the runtime context...

/// can be called concurrently from multiple threads. The first call
/// will take ownership of the io and timer drivers. This means
/// other threads which do not own the drivers will hook into that one.
/// When the first `block_on` completes, other threads will be able to
/// "steal" the driver to allow continued execution of their futures.
///
/// # Panics
///
/// This function panics if the provided future panics, or if called within an
/// asynchronous execution context.
///
/// # Examples
///
/// ```no_run
/// use tokio::runtime::Runtime;
///
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
/// // Get a handle to rhe runtime
/// let handle = rt.handle();
///
/// // Execute the future, blocking the current thread until completion using the handle
/// handle.block_on(async {
/// println!("hello");
/// });
/// ```
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
match &self.spawner {
Spawner::Basic(exec) => self
.block_on_with_basic_scheduler::<F, crate::park::thread::ParkThread>(
future, exec, None,
),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(_) => {
let _enter = self.enter();
let mut enter = crate::runtime::enter(true);
enter.block_on(future).expect("failed to park thread")
}
}
}

pub(crate) fn block_on_with_basic_scheduler<F: Future, P: Park>(
&self,
future: F,
spawner: &basic_scheduler::Spawner,
scheduler: Option<&basic_scheduler::BasicScheduler<P>>,
) -> F::Output {
let _enter = self.enter();

pin!(future);

// Attempt to steal the dedicated parker and block_on the future if we can there,
// otherwise, lets select on a notification that the parker is available
// or the future is complete.
loop {
if let Some(mut inner) = scheduler.and_then(basic_scheduler::BasicScheduler::take_inner)
{
return inner.block_on(future);
}

let mut enter = crate::runtime::enter(false);

let notified = spawner.notify().notified();
pin!(notified);

if let Some(out) = enter
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}

if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}

Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
}

/// Error returned by `try_current` when no Runtime has been started
Expand Down
11 changes: 7 additions & 4 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,15 @@ cfg_rt! {
///
/// [handle]: fn@Handle::block_on
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let _enter = self.enter();

match &self.kind {
Kind::CurrentThread(exec) => exec.block_on(future),
Kind::CurrentThread(exec) => {
// Attempt to steal the dedicated parker and block_on the future if we can there,
// othwerwise, lets select on a notification that the parker is available
// or the future is complete.
self.handle.block_on_with_basic_scheduler(future, exec.spawner(), Some(exec))
}
#[cfg(feature = "rt-multi-thread")]
Kind::ThreadPool(exec) => exec.block_on(future),
Kind::ThreadPool(_) => self.handle.block_on(future),
}
}

Expand Down
12 changes: 0 additions & 12 deletions tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,6 @@ impl ThreadPool {
pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
}

/// Blocks the current thread waiting for the future to complete.
///
/// The future will execute on the current thread, but all spawned tasks
/// will be executed on the thread pool.
pub(crate) fn block_on<F>(&self, future: F) -> F::Output
where
F: Future,
{
let mut enter = crate::runtime::enter(true);
enter.block_on(future).expect("failed to park thread")
}
}

impl fmt::Debug for ThreadPool {
Expand Down