Skip to content

Commit

Permalink
chore(executor): add some time tracking log (#15651)
Browse files Browse the repository at this point in the history
* chore(executor): add some time tracking log

* chore(executor): add some time tracking log
  • Loading branch information
zhang2014 authored May 28, 2024
1 parent fd5a7c2 commit 6f74cc2
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions src/common/base/src/runtime/defer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
struct Defer<F: FnOnce() -> R, R>(Option<F>);

impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
fn drop(&mut self) {
self.0.take().unwrap()();
}
}

Defer(Some(f))
}
2 changes: 2 additions & 0 deletions src/common/base/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod backtrace;
mod catch_unwind;
mod defer;
pub mod error_info;
mod global_runtime;
mod memory;
Expand All @@ -30,6 +31,7 @@ pub use backtrace::AsyncTaskItem;
pub use catch_unwind::catch_unwind;
pub use catch_unwind::drop_guard;
pub use catch_unwind::CatchUnwindFuture;
pub use defer::defer;
pub use global_runtime::GlobalIORuntime;
pub use global_runtime::GlobalQueryRuntime;
pub use memory::set_alloc_error_hook;
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ databend-common-expression = { path = "../../expression" }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
log = "0.4.21"
minitrace = { workspace = true }
petgraph = "0.6.2"
serde = { workspace = true }
Expand Down
45 changes: 44 additions & 1 deletion src/query/pipeline/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use std::fmt::Formatter;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;

use databend_common_base::runtime::defer;
use databend_common_base::runtime::drop_guard;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use log::info;
use petgraph::matrix_graph::Zero;

use crate::pipe::Pipe;
Expand Down Expand Up @@ -440,10 +443,24 @@ impl Pipeline {
}
}

#[track_caller]
pub fn set_on_init<F: FnOnce() -> Result<()> + Send + Sync + 'static>(&mut self, f: F) {
let location = std::panic::Location::caller();
if let Some(old_on_init) = self.on_init.take() {
self.on_init = Some(Box::new(move || {
old_on_init()?;
let instants = Instant::now();

let _guard = defer(move || {
info!(
"OnFinished callback elapsed: {:?} while in {}:{}:{}",
instants.elapsed(),
location.file(),
location.line(),
location.column()
);
});

f()
}));

Expand All @@ -453,33 +470,59 @@ impl Pipeline {
self.on_init = Some(Box::new(f));
}

#[track_caller]
pub fn set_on_finished<
F: FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static,
>(
&mut self,
f: F,
) {
let location = std::panic::Location::caller();
if let Some(on_finished) = self.on_finished.take() {
self.on_finished = Some(Box::new(move |(profiles, may_error)| {
on_finished((profiles, may_error))?;
let instants = Instant::now();
let _guard = defer(move || {
info!(
"OnFinished callback elapsed: {:?} while in {}:{}:{}",
instants.elapsed(),
location.file(),
location.line(),
location.column()
);
});

f((profiles, may_error))
}));

return;
}

self.on_finished = Some(Box::new(f));
}

#[track_caller]
pub fn push_front_on_finished_callback<
F: FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static,
>(
&mut self,
f: F,
) {
let location = std::panic::Location::caller();
if let Some(on_finished) = self.on_finished.take() {
self.on_finished = Some(Box::new(move |(profiles, may_error)| {
let instants = Instant::now();
let guard = defer(move || {
info!(
"OnFinished callback elapsed: {:?} while in {}:{}:{}",
instants.elapsed(),
location.file(),
location.line(),
location.column()
);
});

f((profiles, may_error))?;
drop(guard);
on_finished((profiles, may_error))
}));

Expand Down
28 changes: 18 additions & 10 deletions src/query/service/src/pipelines/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::time::Instant;

use databend_common_base::base::WatchNotify;
use databend_common_base::runtime::catch_unwind;
use databend_common_base::runtime::defer;
use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
Expand Down Expand Up @@ -179,6 +180,13 @@ impl PipelineExecutor {
}

pub fn execute(&self) -> Result<()> {
let instants = Instant::now();
let _guard = defer(move || {
info!(
"Pipeline executor finished, elapsed: {:?}",
instants.elapsed()
);
});
match self {
PipelineExecutor::QueryPipelineExecutor(executor) => executor.execute(),
PipelineExecutor::QueriesPipelineExecutor(query_wrapper) => {
Expand Down Expand Up @@ -233,16 +241,16 @@ impl PipelineExecutor {
let this_graph = Arc::downgrade(&query_wrapper.graph);
let finished_notify = query_wrapper.finished_notify.clone();
GlobalIORuntime::instance().spawn(async move {
let finished_future = Box::pin(finished_notify.notified());
let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds));
if let Either::Left(_) = select(max_execute_future, finished_future).await {
if let Some(graph) = this_graph.upgrade() {
graph.should_finish(Err(ErrorCode::AbortedQuery(
"Aborted query, because the execution time exceeds the maximum execution time limit",
))).expect("exceed max execute time, but cannot send error message");
}
}
});
let finished_future = Box::pin(finished_notify.notified());
let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds));
if let Either::Left(_) = select(max_execute_future, finished_future).await {
if let Some(graph) = this_graph.upgrade() {
graph.should_finish(Err(ErrorCode::AbortedQuery(
"Aborted query, because the execution time exceeds the maximum execution time limit",
))).expect("exceed max execute time, but cannot send error message");
}
}
});
}

Ok(())
Expand Down

0 comments on commit 6f74cc2

Please sign in to comment.