Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cubesql): Remove bottom-up extraction completely #9183

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .github/workflows/rust-cubesql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ jobs:
# We use host instead of cross container, because it's much faster
runs-on: ubuntu-24.04
timeout-minutes: 60
name: Unit (Rewrite Engine) (CUBESQL_TOP_DOWN_EXTRACTOR=${{ matrix.top-down-extractor }})
strategy:
matrix:
top-down-extractor: ['true', 'false']
fail-fast: false
name: Unit (Rewrite Engine)

steps:
- name: Checkout
Expand Down Expand Up @@ -94,7 +90,6 @@ jobs:
CUBESQL_TESTING_CUBE_TOKEN: ${{ secrets.CUBESQL_TESTING_CUBE_TOKEN }}
CUBESQL_TESTING_CUBE_URL: ${{ secrets.CUBESQL_TESTING_CUBE_URL }}
CUBESQL_SQL_PUSH_DOWN: true
CUBESQL_TOP_DOWN_EXTRACTOR: ${{ matrix.top-down-extractor }}
CUBESQL_REWRITE_CACHE: true
CUBESQL_REWRITE_TIMEOUT: 60
run: |
Expand Down
22 changes: 5 additions & 17 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13735,11 +13735,7 @@ ORDER BY "source"."str0" ASC
// CAST(CAST(ta_1.order_date AS Date32) - CAST(CAST(Utf8("1970-01-01") AS Date32) AS Date32) + Int64(3) AS Decimal(38, 10))
if Rewriter::sql_push_down_enabled() {
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
if Rewriter::top_down_extractor_enabled() {
assert!(sql.contains("LIMIT 1000"));
} else {
assert!(sql.contains("\"limit\": 1000"));
}
assert!(sql.contains("LIMIT 1000"));
assert!(sql.contains("% 7"));

let physical_plan = query_plan.as_physical_plan().await.unwrap();
Expand Down Expand Up @@ -15832,18 +15828,10 @@ LIMIT {{ limit }}{% endif %}"#.to_string(),
time_dimensions: Some(vec![V1LoadRequestQueryTimeDimension {
dimension: "KibanaSampleDataEcommerce.order_date".to_string(),
granularity: Some("month".to_string()),
date_range: if Rewriter::top_down_extractor_enabled() {
Some(json!(vec![
"2019-01-01T00:00:00.000Z".to_string(),
"2019-01-31T23:59:59.999Z".to_string()
]))
} else {
// Non-optimal variant with top down extractor disabled
Some(json!(vec![
"2019-01-01 00:00:00.000".to_string(),
"2019-01-31 23:59:59.999".to_string()
]))
}
date_range: Some(json!(vec![
"2019-01-01T00:00:00.000Z".to_string(),
"2019-01-31T23:59:59.999Z".to_string()
]))
}]),
order: Some(vec![]),
..Default::default()
Expand Down
1 change: 0 additions & 1 deletion rust/cubesql/cubesql/src/compile/query_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ pub trait QueryEngine {
state.auth_context().unwrap(),
qtrace,
span_id.clone(),
self.config_ref().top_down_extractor(),
)
.await
.map_err(|e| match e.cause {
Expand Down
101 changes: 6 additions & 95 deletions rust/cubesql/cubesql/src/compile/rewrite/cost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
},
transport::{MetaContext, V1CubeMetaDimensionExt},
};
use egg::{Analysis, CostFunction, EGraph, Id, Language, RecExpr};
use egg::{Analysis, EGraph, Id, Language, RecExpr};
use indexmap::IndexSet;

#[derive(Debug)]
Expand All @@ -21,7 +21,7 @@ impl BestCubePlan {
Self { meta_context }
}

pub fn initial_cost(&self, enode: &LogicalPlanLanguage, top_down: bool) -> CubePlanCost {
pub fn initial_cost(&self, enode: &LogicalPlanLanguage) -> CubePlanCost {
let table_scans = match enode {
LogicalPlanLanguage::TableScan(_) => 1,
_ => 0,
Expand All @@ -48,8 +48,7 @@ impl BestCubePlan {
};

let non_pushed_down_limit_sort = match enode {
LogicalPlanLanguage::Limit(_) if !top_down => 1,
LogicalPlanLanguage::Sort(_) if top_down => 1,
LogicalPlanLanguage::Sort(_) => 1,
_ => 0,
};

Expand Down Expand Up @@ -315,45 +314,6 @@ impl SortState {
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct CubePlanCostAndState {
pub cost: CubePlanCost,
pub state: CubePlanState,
pub sort_state: SortState,
}

impl PartialOrd for CubePlanCostAndState {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cost.cmp(&other.cost))
}
}

impl Ord for CubePlanCostAndState {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.cost.cmp(&other.cost)
}
}

impl CubePlanCostAndState {
pub fn add_child(&self, other: &Self) -> Self {
Self {
cost: self.cost.add_child(&other.cost),
state: self.state.add_child(&other.state),
sort_state: self.sort_state.add_child(&other.sort_state),
}
}

pub fn finalize(&self, enode: &LogicalPlanLanguage) -> Self {
Self {
cost: self
.cost
.finalize(&self.state, &self.sort_state, enode, false),
state: self.state.clone(),
sort_state: self.sort_state.clone(),
}
}
}

impl CubePlanCost {
pub fn add_child(&self, other: &Self) -> Self {
Self {
Expand Down Expand Up @@ -407,7 +367,6 @@ impl CubePlanCost {
state: &CubePlanState,
sort_state: &SortState,
enode: &LogicalPlanLanguage,
top_down: bool,
) -> Self {
Self {
replacers: self.replacers,
Expand All @@ -428,7 +387,7 @@ impl CubePlanCost {
},
non_pushed_down_limit_sort: match sort_state {
SortState::DirectChild => self.non_pushed_down_limit_sort,
SortState::Current if top_down => self.non_pushed_down_limit_sort,
SortState::Current => self.non_pushed_down_limit_sort,
_ => 0,
},
// Don't track state here: we want representation that have fewer wrappers with zero members _in total_
Expand Down Expand Up @@ -482,54 +441,6 @@ impl CubePlanCost {
}
}

impl CostFunction<LogicalPlanLanguage> for BestCubePlan {
type Cost = CubePlanCostAndState;
fn cost<C>(&mut self, enode: &LogicalPlanLanguage, mut costs: C) -> Self::Cost
where
C: FnMut(Id) -> Self::Cost,
{
let ast_size_outside_wrapper = match enode {
LogicalPlanLanguage::Aggregate(_) => 1,
LogicalPlanLanguage::Projection(_) => 1,
LogicalPlanLanguage::Limit(_) => 1,
LogicalPlanLanguage::Sort(_) => 1,
LogicalPlanLanguage::Filter(_) => 1,
LogicalPlanLanguage::Join(_) => 1,
LogicalPlanLanguage::CrossJoin(_) => 1,
LogicalPlanLanguage::Union(_) => 1,
LogicalPlanLanguage::Window(_) => 1,
LogicalPlanLanguage::Subquery(_) => 1,
LogicalPlanLanguage::Distinct(_) => 1,
_ => 0,
};

let cost = self.initial_cost(enode, false);
let initial_cost = CubePlanCostAndState {
cost,
state: match enode {
LogicalPlanLanguage::CubeScanWrapped(CubeScanWrapped(true)) => {
CubePlanState::Wrapped
}
LogicalPlanLanguage::CubeScanWrapper(_) => CubePlanState::Wrapper,
_ => CubePlanState::Unwrapped(ast_size_outside_wrapper),
},
sort_state: match enode {
LogicalPlanLanguage::Sort(_) => SortState::Current,
_ => SortState::None,
},
};
let res = enode
.children()
.iter()
.fold(initial_cost.clone(), |cost, id| {
let child = costs(*id);
cost.add_child(&child)
})
.finalize(enode);
res
}
}

pub trait TopDownCost: Clone + Debug + PartialOrd {
fn add(&self, other: &Self) -> Self;
}
Expand Down Expand Up @@ -858,7 +769,7 @@ impl TopDownState<LogicalPlanLanguage> for CubePlanTopDownState {

impl TopDownCostFunction<LogicalPlanLanguage, CubePlanTopDownState, CubePlanCost> for BestCubePlan {
fn cost(&self, node: &LogicalPlanLanguage) -> CubePlanCost {
self.initial_cost(node, true)
self.initial_cost(node)
}

fn finalize(
Expand All @@ -867,6 +778,6 @@ impl TopDownCostFunction<LogicalPlanLanguage, CubePlanTopDownState, CubePlanCost
node: &LogicalPlanLanguage,
state: &CubePlanTopDownState,
) -> CubePlanCost {
CubePlanCost::finalize(&cost, &state.wrapped, &state.limit, node, true)
CubePlanCost::finalize(&cost, &state.wrapped, &state.limit, node)
}
}
37 changes: 10 additions & 27 deletions rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use datafusion::{
logical_plan::LogicalPlan, physical_plan::planner::DefaultPhysicalPlanner, scalar::ScalarValue,
};
use egg::{EGraph, Extractor, Id, IterationData, Language, Rewrite, Runner, StopReason};
use egg::{EGraph, Id, IterationData, Language, Rewrite, Runner, StopReason};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -329,7 +329,6 @@
auth_context: AuthContextRef,
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
top_down_extractor: bool,
) -> Result<LogicalPlan, CubeError> {
let cube_context = self.cube_context.clone();
let egraph = self.graph.clone();
Expand All @@ -349,26 +348,16 @@
let (runner, qtrace_egraph_iterations) =
Self::run_rewrites(&cube_context, egraph, rules, "final")?;

let best = if top_down_extractor {
let mut extractor = TopDownExtractor::new(
&runner.egraph,
BestCubePlan::new(cube_context.meta.clone()),
CubePlanTopDownState::new(),
);
let Some((best_cost, best)) = extractor.find_best(root) else {
return Err(CubeError::internal("Unable to find best plan".to_string()));
};
log::debug!("Best cost: {:#?}", best_cost);
best
} else {
let extractor = Extractor::new(
&runner.egraph,
BestCubePlan::new(cube_context.meta.clone()),
);
let (best_cost, best) = extractor.find_best(root);
log::debug!("Best cost: {:#?}", best_cost);
best
let mut extractor = TopDownExtractor::new(
&runner.egraph,
BestCubePlan::new(cube_context.meta.clone()),
CubePlanTopDownState::new(),
);
let Some((best_cost, best)) = extractor.find_best(root) else {
return Err(CubeError::internal("Unable to find best plan".to_string()));

Check warning on line 357 in rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs#L357

Added line #L357 was not covered by tests
};
log::debug!("Best cost: {:#?}", best_cost);

let qtrace_best_graph = if Qtrace::is_enabled() {
best.as_ref().iter().cloned().collect()
} else {
Expand Down Expand Up @@ -461,12 +450,6 @@
.unwrap_or(true)
}

pub fn top_down_extractor_enabled() -> bool {
env::var("CUBESQL_TOP_DOWN_EXTRACTOR")
.map(|v| v.to_lowercase() != "false")
.unwrap_or(true)
}

pub fn rewrite_rules(
meta_context: Arc<MetaContext>,
config_obj: Arc<dyn ConfigObj>,
Expand Down
9 changes: 0 additions & 9 deletions rust/cubesql/cubesql/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ pub trait ConfigObj: DIService + Debug {
fn max_sessions(&self) -> usize;

fn no_implicit_order(&self) -> bool;

fn top_down_extractor(&self) -> bool;
}

#[derive(Debug, Clone)]
Expand All @@ -137,7 +135,6 @@ pub struct ConfigObjImpl {
pub non_streaming_query_max_row_limit: i32,
pub max_sessions: usize,
pub no_implicit_order: bool,
pub top_down_extractor: bool,
}

impl ConfigObjImpl {
Expand Down Expand Up @@ -175,7 +172,6 @@ impl ConfigObjImpl {
non_streaming_query_max_row_limit: env_parse("CUBEJS_DB_QUERY_LIMIT", 50000),
max_sessions: env_parse("CUBEJS_MAX_SESSIONS", 1024),
no_implicit_order: env_parse("CUBESQL_SQL_NO_IMPLICIT_ORDER", true),
top_down_extractor: env_parse("CUBESQL_TOP_DOWN_EXTRACTOR", true),
}
}
}
Expand Down Expand Up @@ -242,10 +238,6 @@ impl ConfigObj for ConfigObjImpl {
fn max_sessions(&self) -> usize {
self.max_sessions
}

fn top_down_extractor(&self) -> bool {
self.top_down_extractor
}
}

impl Config {
Expand Down Expand Up @@ -278,7 +270,6 @@ impl Config {
non_streaming_query_max_row_limit: 50000,
max_sessions: 1024,
no_implicit_order: true,
top_down_extractor: true,
}),
}
}
Expand Down
Loading