From 6f74cc24c7c59490228c6895ea06f8dcd2e1e418 Mon Sep 17 00:00:00 2001 From: Winter Zhang Date: Tue, 28 May 2024 13:05:08 +0800 Subject: [PATCH] chore(executor): add some time tracking log (#15651) * chore(executor): add some time tracking log * chore(executor): add some time tracking log --- Cargo.lock | 1 + src/common/base/src/runtime/defer.rs | 25 +++++++++++ src/common/base/src/runtime/mod.rs | 2 + src/query/pipeline/core/Cargo.toml | 1 + src/query/pipeline/core/src/pipeline.rs | 45 ++++++++++++++++++- .../pipelines/executor/pipeline_executor.rs | 28 +++++++----- 6 files changed, 91 insertions(+), 11 deletions(-) create mode 100644 src/common/base/src/runtime/defer.rs diff --git a/Cargo.lock b/Cargo.lock index e848016991ec..f8cc2e3bda58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3547,6 +3547,7 @@ dependencies = [ "databend-common-exception", "databend-common-expression", "futures", + "log", "minitrace", "petgraph", "serde", diff --git a/src/common/base/src/runtime/defer.rs b/src/common/base/src/runtime/defer.rs new file mode 100644 index 000000000000..9756278b631b --- /dev/null +++ b/src/common/base/src/runtime/defer.rs @@ -0,0 +1,25 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub fn defer R, R>(f: F) -> impl Drop { + struct Defer R, R>(Option); + + impl R, R> Drop for Defer { + fn drop(&mut self) { + self.0.take().unwrap()(); + } + } + + Defer(Some(f)) +} diff --git a/src/common/base/src/runtime/mod.rs b/src/common/base/src/runtime/mod.rs index f5f1b613618a..868913d6845c 100644 --- a/src/common/base/src/runtime/mod.rs +++ b/src/common/base/src/runtime/mod.rs @@ -14,6 +14,7 @@ mod backtrace; mod catch_unwind; +mod defer; pub mod error_info; mod global_runtime; mod memory; @@ -30,6 +31,7 @@ pub use backtrace::AsyncTaskItem; pub use catch_unwind::catch_unwind; pub use catch_unwind::drop_guard; pub use catch_unwind::CatchUnwindFuture; +pub use defer::defer; pub use global_runtime::GlobalIORuntime; pub use global_runtime::GlobalQueryRuntime; pub use memory::set_alloc_error_hook; diff --git a/src/query/pipeline/core/Cargo.toml b/src/query/pipeline/core/Cargo.toml index 745a8b95082a..530e0d2877cb 100644 --- a/src/query/pipeline/core/Cargo.toml +++ b/src/query/pipeline/core/Cargo.toml @@ -19,6 +19,7 @@ databend-common-expression = { path = "../../expression" } async-backtrace = { workspace = true } async-trait = { workspace = true } futures = { workspace = true } +log = "0.4.21" minitrace = { workspace = true } petgraph = "0.6.2" serde = { workspace = true } diff --git a/src/query/pipeline/core/src/pipeline.rs b/src/query/pipeline/core/src/pipeline.rs index 5ee95abb9470..4802acaeff61 100644 --- a/src/query/pipeline/core/src/pipeline.rs +++ b/src/query/pipeline/core/src/pipeline.rs @@ -17,10 +17,13 @@ use std::fmt::Formatter; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Instant; +use databend_common_base::runtime::defer; use databend_common_base::runtime::drop_guard; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use log::info; use petgraph::matrix_graph::Zero; use crate::pipe::Pipe; @@ -440,10 +443,24 @@ impl Pipeline { } } + #[track_caller] pub fn set_on_init Result<()> + Send + Sync + 'static>(&mut self, f: F) { + let location = std::panic::Location::caller(); if let Some(old_on_init) = self.on_init.take() { self.on_init = Some(Box::new(move || { old_on_init()?; + let instants = Instant::now(); + + let _guard = defer(move || { + info!( + "OnFinished callback elapsed: {:?} while in {}:{}:{}", + instants.elapsed(), + location.file(), + location.line(), + location.column() + ); + }); + f() })); @@ -453,33 +470,59 @@ impl Pipeline { self.on_init = Some(Box::new(f)); } + #[track_caller] pub fn set_on_finished< F: FnOnce((&Vec, &Result<()>)) -> Result<()> + Send + Sync + 'static, >( &mut self, f: F, ) { + let location = std::panic::Location::caller(); if let Some(on_finished) = self.on_finished.take() { self.on_finished = Some(Box::new(move |(profiles, may_error)| { on_finished((profiles, may_error))?; + let instants = Instant::now(); + let _guard = defer(move || { + info!( + "OnFinished callback elapsed: {:?} while in {}:{}:{}", + instants.elapsed(), + location.file(), + location.line(), + location.column() + ); + }); + f((profiles, may_error)) })); - return; } self.on_finished = Some(Box::new(f)); } + #[track_caller] pub fn push_front_on_finished_callback< F: FnOnce((&Vec, &Result<()>)) -> Result<()> + Send + Sync + 'static, >( &mut self, f: F, ) { + let location = std::panic::Location::caller(); if let Some(on_finished) = self.on_finished.take() { self.on_finished = Some(Box::new(move |(profiles, may_error)| { + let instants = Instant::now(); + let guard = defer(move || { + info!( + "OnFinished callback elapsed: {:?} while in {}:{}:{}", + instants.elapsed(), + location.file(), + location.line(), + location.column() + ); + }); + f((profiles, may_error))?; + drop(guard); on_finished((profiles, may_error)) })); diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 891620708540..f34b2310dd8f 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -20,6 +20,7 @@ use std::time::Instant; use databend_common_base::base::WatchNotify; use databend_common_base::runtime::catch_unwind; +use databend_common_base::runtime::defer; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; @@ -179,6 +180,13 @@ impl PipelineExecutor { } pub fn execute(&self) -> Result<()> { + let instants = Instant::now(); + let _guard = defer(move || { + info!( + "Pipeline executor finished, elapsed: {:?}", + instants.elapsed() + ); + }); match self { PipelineExecutor::QueryPipelineExecutor(executor) => executor.execute(), PipelineExecutor::QueriesPipelineExecutor(query_wrapper) => { @@ -233,16 +241,16 @@ impl PipelineExecutor { let this_graph = Arc::downgrade(&query_wrapper.graph); let finished_notify = query_wrapper.finished_notify.clone(); GlobalIORuntime::instance().spawn(async move { - let finished_future = Box::pin(finished_notify.notified()); - let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds)); - if let Either::Left(_) = select(max_execute_future, finished_future).await { - if let Some(graph) = this_graph.upgrade() { - graph.should_finish(Err(ErrorCode::AbortedQuery( - "Aborted query, because the execution time exceeds the maximum execution time limit", - ))).expect("exceed max execute time, but cannot send error message"); - } - } - }); + let finished_future = Box::pin(finished_notify.notified()); + let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds)); + if let Either::Left(_) = select(max_execute_future, finished_future).await { + if let Some(graph) = this_graph.upgrade() { + graph.should_finish(Err(ErrorCode::AbortedQuery( + "Aborted query, because the execution time exceeds the maximum execution time limit", + ))).expect("exceed max execute time, but cannot send error message"); + } + } + }); } Ok(())