-
Notifications
You must be signed in to change notification settings - Fork 165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Apply DataFusion's projection pushdown rule #907
base: main
Are you sure you want to change the base?
Changes from 5 commits
8dff659
f2dca63
c7fdb89
913eb15
a70cff6
c51ced6
bdc8144
4c5a3ef
14c32c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,11 +17,14 @@ | |
|
||
//! Define JNI APIs which can be called from Java/Scala. | ||
|
||
use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; | ||
use arrow::{ | ||
datatypes::DataType as ArrowDataType, | ||
ffi::{FFI_ArrowArray, FFI_ArrowSchema}, | ||
}; | ||
use arrow_array::RecordBatch; | ||
use datafusion::execution::session_state::SessionStateBuilder; | ||
use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown; | ||
use datafusion::{ | ||
execution::{ | ||
disk_manager::DiskManagerConfig, | ||
|
@@ -42,8 +45,6 @@ use jni::{ | |
}; | ||
use std::{collections::HashMap, sync::Arc, task::Poll}; | ||
|
||
use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; | ||
|
||
use crate::{ | ||
errors::{try_unwrap_or_throw, CometError, CometResult}, | ||
execution::{ | ||
|
@@ -92,6 +93,8 @@ struct ExecutionContext { | |
pub debug_native: bool, | ||
/// Whether to write native plans with metrics to stdout | ||
pub explain_native: bool, | ||
/// Whether to enable physical optimizer | ||
pub enable_optimizer: bool, | ||
} | ||
|
||
/// Accept serialized query plan and return the address of the native query plan. | ||
|
@@ -132,6 +135,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( | |
// Whether we've enabled additional debugging on the native side | ||
let debug_native = parse_bool(&configs, "debug_native")?; | ||
let explain_native = parse_bool(&configs, "explain_native")?; | ||
let enable_optimizer = parse_bool(&configs, "native_optimizer")?; | ||
|
||
let worker_threads = configs | ||
.get("worker_threads") | ||
|
@@ -184,6 +188,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( | |
session_ctx: Arc::new(session), | ||
debug_native, | ||
explain_native, | ||
enable_optimizer, | ||
}); | ||
|
||
Ok(Box::into_raw(exec_context) as i64) | ||
|
@@ -249,11 +254,14 @@ fn prepare_datafusion_session_context( | |
|
||
let runtime = RuntimeEnv::new(rt_config).unwrap(); | ||
|
||
let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); | ||
let state = SessionStateBuilder::new() | ||
.with_config(session_config) | ||
.with_runtime_env(Arc::new(runtime)) | ||
.with_default_features() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. default features needed to use the planner? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me see if I can remove that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing this causes test failures. We were previously calling the following method to create the context and this also enabled default features. pub fn new_with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self {
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime)
.with_default_features()
.build();
Self::new_with_state(state)
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we using a planner only perhaps this should be enough
? |
||
.with_physical_optimizer_rules(vec![Arc::new(ProjectionPushdown::new())]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if other rules can be considered? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possibly. We are starting out with a physical plan that is already optimized by Spark though, so many optimizations have already been applied. We also are running queries in single partitions within a distributed cluster so we cannot leverage anything that uses |
||
.build(); | ||
|
||
datafusion_functions_nested::register_all(&mut session_ctx)?; | ||
|
||
Ok(session_ctx) | ||
Ok(SessionContext::new_with_state(state)) | ||
} | ||
|
||
fn parse_bool(conf: &HashMap<String, String>, name: &str) -> CometResult<bool> { | ||
|
@@ -355,6 +363,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( | |
&mut exec_context.input_sources.clone(), | ||
)?; | ||
|
||
// optimize the physical plan | ||
let root_op = if exec_context.enable_optimizer { | ||
planner.optimize_plan(root_op)? | ||
} else { | ||
root_op | ||
}; | ||
|
||
exec_context.root_op = Some(Arc::clone(&root_op)); | ||
exec_context.scans = scans; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,10 +116,9 @@ impl ExecutionPlan for CopyExec { | |
self: Arc<Self>, | ||
children: Vec<Arc<dyn ExecutionPlan>>, | ||
) -> DataFusionResult<Arc<dyn ExecutionPlan>> { | ||
let input = Arc::clone(&self.input); | ||
let new_input = input.with_new_children(children)?; | ||
assert!(children.len() == 1); | ||
Ok(Arc::new(CopyExec { | ||
input: new_input, | ||
input: Arc::clone(&children[0]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding the optimizer highlighted that the previous implementation of |
||
schema: Arc::clone(&self.schema), | ||
cache: self.cache.clone(), | ||
metrics: self.metrics.clone(), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was only used in tests so I moved it into the test module