From d43812d2783a3575cc59d763603ec043c557c5f7 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Thu, 5 Nov 2020 09:58:41 +0100 Subject: [PATCH 1/6] rt: move block_on to handle Signed-off-by: Marc-Antoine Perennou --- tokio/src/runtime/basic_scheduler.rs | 60 +++++--------------- tokio/src/runtime/handle.rs | 84 ++++++++++++++++++++++++++++ tokio/src/runtime/mod.rs | 15 +++-- tokio/src/runtime/thread_pool/mod.rs | 12 ---- 4 files changed, 107 insertions(+), 64 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 860eab408a9..42e53eec0a6 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,4 +1,3 @@ -use crate::future::poll_fn; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; @@ -11,7 +10,7 @@ use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::sync::Arc; -use std::task::Poll::{Pending, Ready}; +use std::task::Poll::Ready; use std::time::Duration; /// Executes tasks on the current thread @@ -20,10 +19,6 @@ pub(crate) struct BasicScheduler { /// between all `block_on` calls. inner: Mutex>>, - /// Notifier for waking up other threads to steal the - /// parker. - notify: Notify, - /// Sendable task spawner spawner: Spawner, } @@ -70,6 +65,10 @@ struct Shared { /// Unpark the blocked thread unpark: Box, + + /// Notifier for waking up other threads to steal the + /// parker. + notify: Notify, } /// Thread-local context. @@ -101,6 +100,7 @@ impl BasicScheduler

{ shared: Arc::new(Shared { queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), unpark: unpark as Box, + notify: Notify::new(), }), }; @@ -116,7 +116,6 @@ impl BasicScheduler

{ BasicScheduler { inner, - notify: Notify::new(), spawner, } } @@ -125,42 +124,7 @@ impl BasicScheduler

{ &self.spawner } - pub(crate) fn block_on(&self, future: F) -> F::Output { - pin!(future); - - // Attempt to steal the dedicated parker and block_on the future if we can there, - // othwerwise, lets select on a notification that the parker is available - // or the future is complete. - loop { - if let Some(inner) = &mut self.take_inner() { - return inner.block_on(future); - } else { - let mut enter = crate::runtime::enter(false); - - let notified = self.notify.notified(); - pin!(notified); - - if let Some(out) = enter - .block_on(poll_fn(|cx| { - if notified.as_mut().poll(cx).is_ready() { - return Ready(None); - } - - if let Ready(out) = future.as_mut().poll(cx) { - return Ready(Some(out)); - } - - Pending - })) - .expect("Failed to `Enter::block_on`") - { - return out; - } - } - } - } - - fn take_inner(&self) -> Option> { + pub(crate) fn take_inner(&self) -> Option> { let inner = self.inner.lock().take()?; Some(InnerGuard { @@ -324,6 +288,10 @@ impl Spawner { handle } + pub(crate) fn notify(&self) -> &Notify { + &self.shared.notify + } + fn pop(&self) -> Option>> { self.shared.queue.lock().pop_front() } @@ -393,13 +361,13 @@ impl Wake for Shared { /// Used to ensure we always place the Inner value /// back into its slot in `BasicScheduler`, even if the /// future panics. -struct InnerGuard<'a, P: Park> { +pub(crate) struct InnerGuard<'a, P: Park> { inner: Option>, basic_scheduler: &'a BasicScheduler

, } impl InnerGuard<'_, P> { - fn block_on(&mut self, future: F) -> F::Output { + pub(crate) fn block_on(&mut self, future: F) -> F::Output { // The only time inner gets set to `None` is if we have dropped // already so this unwrap is safe. self.inner.as_mut().unwrap().block_on(future) @@ -417,7 +385,7 @@ impl Drop for InnerGuard<'_, P> { // Wake up other possible threads that could steal // the dedicated parker P. - self.basic_scheduler.notify.notify_one() + self.basic_scheduler.spawner.shared.notify.notify_one() } } } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 138d13b25b0..4790d0403ac 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,8 +1,10 @@ +use crate::future::poll_fn; use crate::runtime::blocking::task::BlockingTask; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{blocking, context, driver, Spawner}; use std::future::Future; +use std::task::Poll::{Pending, Ready}; use std::{error, fmt}; /// Handle to the runtime. @@ -200,6 +202,88 @@ impl Handle { let _ = self.blocking_spawner.spawn(task, &self); handle } + + /// Run a future to completion on the Tokio runtime. This is the + /// runtime's entry point. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers + /// which the future spawns internally will be executed on the runtime. + /// + /// # Multi thread scheduler + /// + /// When the multi thread scheduler is used this will allow futures + /// to run within the io driver and timer context of the overall runtime. + /// + /// # Current thread scheduler + /// + /// When the current thread scheduler is enabled `block_on` + /// can be called concurrently from multiple threads. The first call + /// will take ownership of the io and timer drivers. This means + /// other threads which do not own the drivers will hook into that one. + /// When the first `block_on` completes, other threads will be able to + /// "steal" the driver to allow continued execution of their futures. + /// + /// # Panics + /// + /// This function panics if the provided future panics, or if called within an + /// asynchronous execution context. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// // Get a handle to rhe runtime + /// let handle = rt.handle(); + /// + /// // Execute the future, blocking the current thread until completion using the handle + /// handle.block_on(async { + /// println!("hello"); + /// }); + /// ``` + pub fn block_on(&self, future: F) -> F::Output { + let _enter = self.enter(); + + match &self.spawner { + Spawner::Basic(exec) => { + pin!(future); + + // Lets select on a notification that the parker is available or + // the future is complete. + loop { + let mut enter = crate::runtime::enter(false); + + let notified = exec.notify().notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } + + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } + }, + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(_) => { + let mut enter = crate::runtime::enter(true); + enter.block_on(future).expect("failed to park thread") + }, + } + } } /// Error returned by `try_current` when no Runtime has been started diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index d7f068ec1fe..a854e979193 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -444,13 +444,16 @@ cfg_rt! { /// /// [handle]: fn@Handle::block_on pub fn block_on(&self, future: F) -> F::Output { - let _enter = self.enter(); - - match &self.kind { - Kind::CurrentThread(exec) => exec.block_on(future), - #[cfg(feature = "rt-multi-thread")] - Kind::ThreadPool(exec) => exec.block_on(future), + if let Kind::CurrentThread(exec) = &self.kind { + // Attempt to steal the dedicated parker and block_on the future if we can there, + // othwerwise, lets select on a notification that the parker is available + // or the future is complete. + if let Some(mut inner) = exec.take_inner() { + let _enter = self.enter(); + return inner.block_on(future); + } } + self.handle.block_on(future) } /// Enter the runtime context. diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 47f8ee3454f..8b4cfa53ef9 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -58,18 +58,6 @@ impl ThreadPool { pub(crate) fn spawner(&self) -> &Spawner { &self.spawner } - - /// Blocks the current thread waiting for the future to complete. - /// - /// The future will execute on the current thread, but all spawned tasks - /// will be executed on the thread pool. - pub(crate) fn block_on(&self, future: F) -> F::Output - where - F: Future, - { - let mut enter = crate::runtime::enter(true); - enter.block_on(future).expect("failed to park thread") - } } impl fmt::Debug for ThreadPool { From 7bb036efecbc636df2ae746165dd51d08607f9f3 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Thu, 5 Nov 2020 14:23:35 +0100 Subject: [PATCH 2/6] rt: fix formatting Signed-off-by: Marc-Antoine Perennou --- tokio/src/runtime/basic_scheduler.rs | 5 +---- tokio/src/runtime/handle.rs | 6 +++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 42e53eec0a6..281bdc92c8a 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -114,10 +114,7 @@ impl BasicScheduler

{ park, })); - BasicScheduler { - inner, - spawner, - } + BasicScheduler { inner, spawner } } pub(crate) fn spawner(&self) -> &Spawner { diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 4790d0403ac..269b9ce5a42 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -271,17 +271,17 @@ impl Handle { Pending })) - .expect("Failed to `Enter::block_on`") + .expect("Failed to `Enter::block_on`") { return out; } } - }, + } #[cfg(feature = "rt-multi-thread")] Spawner::ThreadPool(_) => { let mut enter = crate::runtime::enter(true); enter.block_on(future).expect("failed to park thread") - }, + } } } } From 43094fbad00a4622672a6a79778860c012c159fa Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Thu, 5 Nov 2020 14:29:02 +0100 Subject: [PATCH 3/6] rt: avoid irrefutable if let Signed-off-by: Marc-Antoine Perennou --- tokio/src/runtime/mod.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index a854e979193..019072ec1f3 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -444,16 +444,21 @@ cfg_rt! { /// /// [handle]: fn@Handle::block_on pub fn block_on(&self, future: F) -> F::Output { - if let Kind::CurrentThread(exec) = &self.kind { - // Attempt to steal the dedicated parker and block_on the future if we can there, - // othwerwise, lets select on a notification that the parker is available - // or the future is complete. - if let Some(mut inner) = exec.take_inner() { - let _enter = self.enter(); - return inner.block_on(future); + match &self.kind { + Kind::CurrentThread(exec) => { + // Attempt to steal the dedicated parker and block_on the future if we can there, + // othwerwise, lets select on a notification that the parker is available + // or the future is complete. + if let Some(mut inner) = exec.take_inner() { + let _enter = self.enter(); + inner.block_on(future) + } else { + self.handle.block_on(future) + } } + #[cfg(feature = "rt-multi-thread")] + Kind::ThreadPool(_) => self.handle.block_on(future), } - self.handle.block_on(future) } /// Enter the runtime context. From e4727fff6b9ee773f1932edfe9a21ecae5771d76 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Fri, 6 Nov 2020 09:42:00 +0100 Subject: [PATCH 4/6] rt: naive attempt at restoring previous Runtime::block_on behaviour Signed-off-by: Marc-Antoine Perennou --- tokio/src/runtime/handle.rs | 69 +++++++++++++++++++++---------------- tokio/src/runtime/mod.rs | 7 +--- 2 files changed, 41 insertions(+), 35 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 269b9ce5a42..d9aa2522dc8 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,7 +1,8 @@ use crate::future::poll_fn; +use crate::park::Park; use crate::runtime::blocking::task::BlockingTask; use crate::runtime::task::{self, JoinHandle}; -use crate::runtime::{blocking, context, driver, Spawner}; +use crate::runtime::{basic_scheduler, blocking, context, driver, Spawner}; use std::future::Future; use std::task::Poll::{Pending, Ready}; @@ -245,42 +246,52 @@ impl Handle { /// }); /// ``` pub fn block_on(&self, future: F) -> F::Output { - let _enter = self.enter(); - match &self.spawner { Spawner::Basic(exec) => { - pin!(future); + self.block_on_with_basic_scheduler::(future, exec, None) + } + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(_) => { + let _enter = self.enter(); + let mut enter = crate::runtime::enter(true); + enter.block_on(future).expect("failed to park thread") + } + } + } - // Lets select on a notification that the parker is available or - // the future is complete. - loop { - let mut enter = crate::runtime::enter(false); + pub(crate) fn block_on_with_basic_scheduler(&self, future: F, spawner: &basic_scheduler::Spawner, scheduler: Option<&basic_scheduler::BasicScheduler

>) -> F::Output { + let _enter = self.enter(); - let notified = exec.notify().notified(); - pin!(notified); + pin!(future); - if let Some(out) = enter - .block_on(poll_fn(|cx| { - if notified.as_mut().poll(cx).is_ready() { - return Ready(None); - } + // Attempt to steal the dedicated parker and block_on the future if we can there, + // othwerwise, lets select on a notification that the parker is available + // or the future is complete. + loop { + if let Some(mut inner) = scheduler.and_then(basic_scheduler::BasicScheduler::take_inner) { + return inner.block_on(future) + } + + let mut enter = crate::runtime::enter(false); - if let Ready(out) = future.as_mut().poll(cx) { - return Ready(Some(out)); - } + let notified = spawner.notify().notified(); + pin!(notified); - Pending - })) - .expect("Failed to `Enter::block_on`") - { - return out; + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); } - } - } - #[cfg(feature = "rt-multi-thread")] - Spawner::ThreadPool(_) => { - let mut enter = crate::runtime::enter(true); - enter.block_on(future).expect("failed to park thread") + + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } + + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; } } } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 019072ec1f3..9eaf1485859 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -449,12 +449,7 @@ cfg_rt! { // Attempt to steal the dedicated parker and block_on the future if we can there, // othwerwise, lets select on a notification that the parker is available // or the future is complete. - if let Some(mut inner) = exec.take_inner() { - let _enter = self.enter(); - inner.block_on(future) - } else { - self.handle.block_on(future) - } + self.handle.block_on_with_basic_scheduler(future, exec.spawner(), Some(exec)) } #[cfg(feature = "rt-multi-thread")] Kind::ThreadPool(_) => self.handle.block_on(future), From 14ee0c2a12e82dad98fa1827e76c3f824a48cd39 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Fri, 6 Nov 2020 09:45:07 +0100 Subject: [PATCH 5/6] rt: formatting fix Signed-off-by: Marc-Antoine Perennou --- tokio/src/runtime/handle.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index d9aa2522dc8..62351ce76d5 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -259,7 +259,12 @@ impl Handle { } } - pub(crate) fn block_on_with_basic_scheduler(&self, future: F, spawner: &basic_scheduler::Spawner, scheduler: Option<&basic_scheduler::BasicScheduler

>) -> F::Output { + pub(crate) fn block_on_with_basic_scheduler( + &self, + future: F, + spawner: &basic_scheduler::Spawner, + scheduler: Option<&basic_scheduler::BasicScheduler

>, + ) -> F::Output { let _enter = self.enter(); pin!(future); @@ -268,8 +273,9 @@ impl Handle { // othwerwise, lets select on a notification that the parker is available // or the future is complete. loop { - if let Some(mut inner) = scheduler.and_then(basic_scheduler::BasicScheduler::take_inner) { - return inner.block_on(future) + if let Some(mut inner) = scheduler.and_then(basic_scheduler::BasicScheduler::take_inner) + { + return inner.block_on(future); } let mut enter = crate::runtime::enter(false); From 54fc4fbfe9c79534ae44f72a10f94bd5228047c2 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Fri, 6 Nov 2020 10:01:09 +0100 Subject: [PATCH 6/6] rt: another formatting fix Signed-off-by: Marc-Antoine Perennou --- tokio/src/runtime/handle.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 62351ce76d5..61e364c416f 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -247,9 +247,10 @@ impl Handle { /// ``` pub fn block_on(&self, future: F) -> F::Output { match &self.spawner { - Spawner::Basic(exec) => { - self.block_on_with_basic_scheduler::(future, exec, None) - } + Spawner::Basic(exec) => self + .block_on_with_basic_scheduler::( + future, exec, None, + ), #[cfg(feature = "rt-multi-thread")] Spawner::ThreadPool(_) => { let _enter = self.enter();