From 39b0c83fafa415d8236e568cbbdd83f30aefa7b8 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 17 Nov 2023 15:43:51 +0100 Subject: [PATCH] test(subscriber): add test for tasks being kept open In the Tokio instrumentation, a tracing span is created for each task which is spawned. Since the new span is created within the context of where `tokio::spawn()` (or similar) is called from, it gets a contextual parent attached. In tracing, when a span has a child span (either because the child was created in the context of the parent, or because the parent was set explicitly) then that span will not be closed until the child has closed. The result in the console subscriber is that a task which spawns another task won't have a `dropped_at` time set until the spawned task exits, even if the parent task exits much earlier. This causes Tokio Console to show an incorrect lost waker warning (#345). It also affects other spans that are entered when a task is spawned (#412). The solution is to modify the instrumentation in Tokio so that task spans are explicit roots (`parent: None`). This will be done as part of enriching the Tokio instrumentation (tokio-rs/tokio#5792). This change adds functionality to the test framework within `console-subscriber` so that the state of a task can be set as an expectation. The state is calculated based on 4 values: * `console_api::tasks::Stats::dropped_at` * `console_api::tasks::Stats::last_wake` * `console_api::PollStats::last_poll_started` * `console_api::PollStats::last_poll_ended` It can then be tested that a task that spawns another task and then ends actually goes to the `Completed` state, even if the spawned task is still running. As of Tokio 1.34.0, this test fails, but the PR tokio-rs/tokio#6158 fixes this and the test should pass from Tokio 1.35 onwards. --- console-subscriber/tests/framework.rs | 50 ++++++++++++- console-subscriber/tests/spawn.rs | 27 ++++++- console-subscriber/tests/support/mod.rs | 2 + console-subscriber/tests/support/task.rs | 92 +++++++++++++++++++++++- 4 files changed, 168 insertions(+), 3 deletions(-) diff --git a/console-subscriber/tests/framework.rs b/console-subscriber/tests/framework.rs index 777d49c06..c5e10c3f7 100644 --- a/console-subscriber/tests/framework.rs +++ b/console-subscriber/tests/framework.rs @@ -11,7 +11,7 @@ use futures::future; use tokio::{task, time::sleep}; mod support; -use support::{assert_task, assert_tasks, ExpectedTask}; +use support::{assert_task, assert_tasks, ExpectedTask, TaskState}; #[test] fn expect_present() { @@ -198,6 +198,54 @@ fn fail_polls() { assert_task(expected_task, future); } +#[test] +fn main_task_completes() { + let expected_task = ExpectedTask::default() + .match_default_name() + .expect_state(TaskState::Completed); + + let future = async {}; + + assert_task(expected_task, future); +} + +#[test] +#[should_panic(expected = "Test failed: Task validation failed: + - Task { name=task }: expected `state` to be Idle, but actual was Completed")] +fn fail_completed_task_is_idle() { + let expected_task = ExpectedTask::default() + .match_name("task".into()) + .expect_state(TaskState::Idle); + + let future = async { + _ = task::Builder::new() + .name("task") + .spawn(futures::future::ready(())) + .unwrap() + .await; + }; + + assert_task(expected_task, future); +} + +#[test] +#[should_panic(expected = "Test failed: Task validation failed: + - Task { name=task }: expected `state` to be Completed, but actual was Idle")] +fn fail_idle_task_is_completed() { + let expected_task = ExpectedTask::default() + .match_name("task".into()) + .expect_state(TaskState::Completed); + + let future = async { + _ = task::Builder::new() + .name("task") + .spawn(futures::future::pending::<()>()) + .unwrap(); + }; + + assert_task(expected_task, future); +} + async fn yield_to_runtime() { // There is a race condition that can occur when tests are run in parallel, // caused by tokio-rs/tracing#2743. It tends to cause test failures only diff --git a/console-subscriber/tests/spawn.rs b/console-subscriber/tests/spawn.rs index 21ecaad41..30ecb861e 100644 --- a/console-subscriber/tests/spawn.rs +++ b/console-subscriber/tests/spawn.rs @@ -3,7 +3,7 @@ use std::time::Duration; use tokio::time::sleep; mod support; -use support::{assert_tasks, spawn_named, ExpectedTask}; +use support::{assert_tasks, spawn_named, ExpectedTask, TaskState}; /// This test asserts the behavior that was fixed in #440. Before that fix, /// the polls of a child were also counted towards the parent (the task which @@ -34,3 +34,28 @@ fn child_polls_dont_count_towards_parent_polls() { assert_tasks(expected_tasks, future); } + +/// This test asserts that the lifetime of a task is not affected by the +/// lifetimes of tasks that it spawns. The test will pass when #345 is +/// fixed. +#[test] +fn spawner_task_with_running_children_completes() { + let expected_tasks = vec![ + ExpectedTask::default() + .match_name("parent".into()) + .expect_state(TaskState::Completed), + ExpectedTask::default() + .match_name("child".into()) + .expect_state(TaskState::Idle), + ]; + + let future = async { + spawn_named("parent", async { + spawn_named("child", futures::future::pending::<()>()); + }) + .await + .expect("joining parent failed"); + }; + + assert_tasks(expected_tasks, future); +} diff --git a/console-subscriber/tests/support/mod.rs b/console-subscriber/tests/support/mod.rs index 3a42583a2..090dd824f 100644 --- a/console-subscriber/tests/support/mod.rs +++ b/console-subscriber/tests/support/mod.rs @@ -8,6 +8,8 @@ use subscriber::run_test; pub(crate) use subscriber::MAIN_TASK_NAME; pub(crate) use task::ExpectedTask; +#[allow(unused_imports)] +pub(crate) use task::TaskState; use tokio::task::JoinHandle; /// Assert that an `expected_task` is recorded by a console-subscriber diff --git a/console-subscriber/tests/support/task.rs b/console-subscriber/tests/support/task.rs index 593d45f97..24adef7c2 100644 --- a/console-subscriber/tests/support/task.rs +++ b/console-subscriber/tests/support/task.rs @@ -1,6 +1,7 @@ -use std::{error, fmt}; +use std::{error, fmt, time::SystemTime}; use console_api::tasks; +use prost_types::Timestamp; use super::MAIN_TASK_NAME; @@ -13,6 +14,7 @@ use super::MAIN_TASK_NAME; pub(super) struct ActualTask { pub(super) id: u64, pub(super) name: Option, + pub(super) state: Option, pub(super) wakes: u64, pub(super) self_wakes: u64, pub(super) polls: u64, @@ -23,6 +25,7 @@ impl ActualTask { Self { id, name: None, + state: None, wakes: 0, self_wakes: 0, polls: 0, @@ -35,6 +38,59 @@ impl ActualTask { if let Some(poll_stats) = &stats.poll_stats { self.polls = poll_stats.polls; } + + self.state = calculate_task_state(stats); + } +} + +/// The state of a task. +/// +/// The task state is an amalgamation of a various fields. It is presented in +/// this way to make testing more straight forward. +#[derive(Clone, Debug, Eq, PartialEq)] +pub(crate) enum TaskState { + /// Task has completed. + /// + /// Indicates that [`dropped_at`] has some value. + /// + /// [`dropped_at`]: fn@tasks::Stats::dropped_at + Completed, + /// Task is being polled. + /// + /// Indicates that the task is not [`Completed`] and the + /// [`last_poll_started`] time is later than [`last_poll_ended`] (or + /// [`last_poll_ended`] has not been set). + Running, + /// Task has been scheduled. + /// + /// Indicates that the task is not [`Completed`] and the [`last_wake`] time + /// is later than [`last_poll_started`]. + Scheduled, + /// Task is idle. + /// + /// Indicates that the task is between polls. + Idle, +} + +fn calculate_task_state(stats: &tasks::Stats) -> Option { + if stats.dropped_at.is_some() { + return Some(TaskState::Completed); + } + + fn convert(ts: &Option) -> Option { + ts.as_ref().map(|v| v.clone().try_into().unwrap()) + } + let poll_stats = stats.poll_stats.as_ref()?; + let last_poll_started = convert(&poll_stats.last_poll_started); + let last_poll_ended = convert(&poll_stats.last_poll_ended); + let last_wake = convert(&stats.last_wake); + + if last_poll_started > last_poll_ended { + Some(TaskState::Running) + } else if last_wake > last_poll_started { + Some(TaskState::Scheduled) + } else { + Some(TaskState::Idle) } } @@ -88,6 +144,7 @@ impl fmt::Debug for TaskValidationFailure { pub(crate) struct ExpectedTask { match_name: Option, expect_present: Option, + expect_state: Option, expect_wakes: Option, expect_self_wakes: Option, expect_polls: Option, @@ -98,6 +155,7 @@ impl Default for ExpectedTask { Self { match_name: None, expect_present: None, + expect_state: None, expect_wakes: None, expect_self_wakes: None, expect_polls: None, @@ -147,6 +205,28 @@ impl ExpectedTask { no_expectations = false; } + if let Some(expected_state) = &self.expect_state { + no_expectations = false; + if let Some(actual_state) = &actual_task.state { + if expected_state != actual_state { + return Err(TaskValidationFailure { + expected: self.clone(), + actual: Some(actual_task.clone()), + failure: format!( + "{self}: expected `state` to be \ + {expected_state:?}, but actual was \ + {actual_state}", + actual_state = actual_task + .state + .as_ref() + .map(|s| format!("{:?}", s)) + .unwrap_or("None".into()), + ), + }); + } + } + } + if let Some(expected_wakes) = self.expect_wakes { no_expectations = false; if expected_wakes != actual_task.wakes { @@ -239,6 +319,16 @@ impl ExpectedTask { self } + /// Expects that a task has a specific [`TaskState`]. + /// + /// To validate, the actual task must be in this state at the time + /// the test ends and the validation is performed. + #[allow(dead_code)] + pub(crate) fn expect_state(mut self, state: TaskState) -> Self { + self.expect_state = Some(state); + self + } + /// Expects that a task has a specific value for `wakes`. /// /// To validate, the actual task matching this expected task must have