diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index aeb01504c19..281bdc92c8a 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,4 +1,3 @@ -use crate::future::poll_fn; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; @@ -11,7 +10,7 @@ use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::sync::Arc; -use std::task::Poll::{Pending, Ready}; +use std::task::Poll::Ready; use std::time::Duration; /// Executes tasks on the current thread @@ -20,10 +19,6 @@ pub(crate) struct BasicScheduler { /// between all `block_on` calls. inner: Mutex>>, - /// Notifier for waking up other threads to steal the - /// parker. - notify: Notify, - /// Sendable task spawner spawner: Spawner, } @@ -70,6 +65,10 @@ struct Shared { /// Unpark the blocked thread unpark: Box, + + /// Notifier for waking up other threads to steal the + /// parker. + notify: Notify, } /// Thread-local context. @@ -101,6 +100,7 @@ impl BasicScheduler

{ shared: Arc::new(Shared { queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), unpark: unpark as Box, + notify: Notify::new(), }), }; @@ -114,53 +114,14 @@ impl BasicScheduler

{ park, })); - BasicScheduler { - inner, - notify: Notify::new(), - spawner, - } + BasicScheduler { inner, spawner } } pub(crate) fn spawner(&self) -> &Spawner { &self.spawner } - pub(crate) fn block_on(&self, future: F) -> F::Output { - pin!(future); - - // Attempt to steal the dedicated parker and block_on the future if we can there, - // otherwise, lets select on a notification that the parker is available - // or the future is complete. - loop { - if let Some(inner) = &mut self.take_inner() { - return inner.block_on(future); - } else { - let mut enter = crate::runtime::enter(false); - - let notified = self.notify.notified(); - pin!(notified); - - if let Some(out) = enter - .block_on(poll_fn(|cx| { - if notified.as_mut().poll(cx).is_ready() { - return Ready(None); - } - - if let Ready(out) = future.as_mut().poll(cx) { - return Ready(Some(out)); - } - - Pending - })) - .expect("Failed to `Enter::block_on`") - { - return out; - } - } - } - } - - fn take_inner(&self) -> Option> { + pub(crate) fn take_inner(&self) -> Option> { let inner = self.inner.lock().take()?; Some(InnerGuard { @@ -324,6 +285,10 @@ impl Spawner { handle } + pub(crate) fn notify(&self) -> &Notify { + &self.shared.notify + } + fn pop(&self) -> Option>> { self.shared.queue.lock().pop_front() } @@ -393,13 +358,13 @@ impl Wake for Shared { /// Used to ensure we always place the Inner value /// back into its slot in `BasicScheduler`, even if the /// future panics. -struct InnerGuard<'a, P: Park> { +pub(crate) struct InnerGuard<'a, P: Park> { inner: Option>, basic_scheduler: &'a BasicScheduler

, } impl InnerGuard<'_, P> { - fn block_on(&mut self, future: F) -> F::Output { + pub(crate) fn block_on(&mut self, future: F) -> F::Output { // The only time inner gets set to `None` is if we have dropped // already so this unwrap is safe. self.inner.as_mut().unwrap().block_on(future) @@ -417,7 +382,7 @@ impl Drop for InnerGuard<'_, P> { // Wake up other possible threads that could steal // the dedicated parker P. - self.basic_scheduler.notify.notify_one() + self.basic_scheduler.spawner.shared.notify.notify_one() } } } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 76b28f24e36..fdacc412044 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,9 +1,12 @@ +use crate::future::poll_fn; +use crate::park::Park; use crate::runtime::blocking::task::BlockingTask; use crate::runtime::task::{self, JoinHandle}; -use crate::runtime::{blocking, context, driver, Spawner}; +use crate::runtime::{basic_scheduler, blocking, context, driver, Spawner}; use crate::util::error::CONTEXT_MISSING_ERROR; use std::future::Future; +use std::task::Poll::{Pending, Ready}; use std::{error, fmt}; /// Handle to the runtime. @@ -201,6 +204,105 @@ impl Handle { let _ = self.blocking_spawner.spawn(task, &self); handle } + + /// Run a future to completion on the Tokio runtime. This is the + /// runtime's entry point. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers + /// which the future spawns internally will be executed on the runtime. + /// + /// # Multi thread scheduler + /// + /// When the multi thread scheduler is used this will allow futures + /// to run within the io driver and timer context of the overall runtime. + /// + /// # Current thread scheduler + /// + /// When the current thread scheduler is enabled `block_on` + /// can be called concurrently from multiple threads. The first call + /// will take ownership of the io and timer drivers. This means + /// other threads which do not own the drivers will hook into that one. + /// When the first `block_on` completes, other threads will be able to + /// "steal" the driver to allow continued execution of their futures. + /// + /// # Panics + /// + /// This function panics if the provided future panics, or if called within an + /// asynchronous execution context. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// // Get a handle to rhe runtime + /// let handle = rt.handle(); + /// + /// // Execute the future, blocking the current thread until completion using the handle + /// handle.block_on(async { + /// println!("hello"); + /// }); + /// ``` + pub fn block_on(&self, future: F) -> F::Output { + match &self.spawner { + Spawner::Basic(exec) => self + .block_on_with_basic_scheduler::( + future, exec, None, + ), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(_) => { + let _enter = self.enter(); + let mut enter = crate::runtime::enter(true); + enter.block_on(future).expect("failed to park thread") + } + } + } + + pub(crate) fn block_on_with_basic_scheduler( + &self, + future: F, + spawner: &basic_scheduler::Spawner, + scheduler: Option<&basic_scheduler::BasicScheduler

>, + ) -> F::Output { + let _enter = self.enter(); + + pin!(future); + + // Attempt to steal the dedicated parker and block_on the future if we can there, + // otherwise, lets select on a notification that the parker is available + // or the future is complete. + loop { + if let Some(mut inner) = scheduler.and_then(basic_scheduler::BasicScheduler::take_inner) + { + return inner.block_on(future); + } + + let mut enter = crate::runtime::enter(false); + + let notified = spawner.notify().notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } + + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } + } } /// Error returned by `try_current` when no Runtime has been started diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index b138a66455c..eb1d0ab019e 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -444,12 +444,15 @@ cfg_rt! { /// /// [handle]: fn@Handle::block_on pub fn block_on(&self, future: F) -> F::Output { - let _enter = self.enter(); - match &self.kind { - Kind::CurrentThread(exec) => exec.block_on(future), + Kind::CurrentThread(exec) => { + // Attempt to steal the dedicated parker and block_on the future if we can there, + // othwerwise, lets select on a notification that the parker is available + // or the future is complete. + self.handle.block_on_with_basic_scheduler(future, exec.spawner(), Some(exec)) + } #[cfg(feature = "rt-multi-thread")] - Kind::ThreadPool(exec) => exec.block_on(future), + Kind::ThreadPool(_) => self.handle.block_on(future), } } diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 47f8ee3454f..8b4cfa53ef9 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -58,18 +58,6 @@ impl ThreadPool { pub(crate) fn spawner(&self) -> &Spawner { &self.spawner } - - /// Blocks the current thread waiting for the future to complete. - /// - /// The future will execute on the current thread, but all spawned tasks - /// will be executed on the thread pool. - pub(crate) fn block_on(&self, future: F) -> F::Output - where - F: Future, - { - let mut enter = crate::runtime::enter(true); - enter.block_on(future).expect("failed to park thread") - } } impl fmt::Debug for ThreadPool {