Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cubesql): Fix SELECT DISTINCT on pushdown #9144

Merged
merged 14 commits into from
Feb 12, 2025
184 changes: 182 additions & 2 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8205,6 +8205,184 @@ ORDER BY "source"."str0" ASC
)
}

#[tokio::test]
async fn test_select_distinct_dimensions() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_testing_logger();

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT customer_gender FROM KibanaSampleDataEcommerce".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT customer_gender FROM KibanaSampleDataEcommerce LIMIT 100".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(100),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT * FROM (SELECT customer_gender FROM KibanaSampleDataEcommerce LIMIT 100) q_0".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
limit: Some(100),
ungrouped: Some(true),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT customer_gender, order_date FROM KibanaSampleDataEcommerce"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
"KibanaSampleDataEcommerce.order_date".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT MAX(maxPrice) FROM KibanaSampleDataEcommerce".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.maxPrice".to_string(),]),
dimensions: Some(vec![]),
segments: Some(vec![]),
order: Some(vec![]),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT * FROM (SELECT customer_gender, MAX(maxPrice) FROM KibanaSampleDataEcommerce GROUP BY 1) q_0".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec!["KibanaSampleDataEcommerce.maxPrice".to_string(),]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
..Default::default()
}
);

let logical_plan = convert_select_to_query_plan(
"SELECT DISTINCT * FROM KibanaSampleDataEcommerce".to_string(),
DatabaseProtocol::PostgreSQL,
)
.await
.as_logical_plan();

println!("logical_plan: {:?}", logical_plan);

assert_eq!(
logical_plan.find_cube_scan().request,
V1LoadRequestQuery {
measures: Some(vec![
"KibanaSampleDataEcommerce.count".to_string(),
"KibanaSampleDataEcommerce.maxPrice".to_string(),
"KibanaSampleDataEcommerce.sumPrice".to_string(),
"KibanaSampleDataEcommerce.minPrice".to_string(),
"KibanaSampleDataEcommerce.avgPrice".to_string(),
"KibanaSampleDataEcommerce.countDistinct".to_string(),
]),
dimensions: Some(vec![
"KibanaSampleDataEcommerce.order_date".to_string(),
"KibanaSampleDataEcommerce.last_mod".to_string(),
"KibanaSampleDataEcommerce.customer_gender".to_string(),
"KibanaSampleDataEcommerce.notes".to_string(),
"KibanaSampleDataEcommerce.taxful_total_price".to_string(),
"KibanaSampleDataEcommerce.has_subscription".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
ungrouped: Some(true),
..Default::default()
}
)
}

#[tokio::test]
async fn test_sort_relations() -> Result<(), CubeError> {
init_testing_logger();
Expand Down Expand Up @@ -15665,8 +15843,10 @@ LIMIT {{ limit }}{% endif %}"#.to_string(),
"KibanaSampleDataEcommerce.customer_gender".to_string(),
]),
segments: Some(vec![]),
order: Some(vec![]),
ungrouped: Some(true),
order: Some(vec![vec![
"KibanaSampleDataEcommerce.customer_gender".to_string(),
"asc".to_string()
],]),
..Default::default()
}
)
Expand Down
103 changes: 100 additions & 3 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{
compile::rewrite::{
agg_fun_expr, aggregate, alias_expr, all_members,
analysis::{ConstantFolding, LogicalPlanData, MemberNamesToExpr, OriginalExpr},
analysis::{ConstantFolding, LogicalPlanData, Member, MemberNamesToExpr, OriginalExpr},
binary_expr, cast_expr, change_user_expr, column_expr, cross_join, cube_scan,
cube_scan_filters_empty_tail, cube_scan_members, cube_scan_members_empty_tail,
cube_scan_order_empty_tail, dimension_expr, expr_column_name, fun_expr, join, like_expr,
limit, list_concat_pushdown_replacer, list_concat_pushup_replacer, literal_expr,
cube_scan_order_empty_tail, dimension_expr, distinct, expr_column_name, fun_expr, join,
like_expr, limit, list_concat_pushdown_replacer, list_concat_pushup_replacer, literal_expr,
literal_member, measure_expr, member_pushdown_replacer, member_replacer,
merged_members_replacer, original_expr_name, projection, referenced_columns, rewrite,
rewriter::{CubeEGraph, CubeRewrite, RewriteRules},
Expand Down Expand Up @@ -262,6 +262,39 @@
),
self.push_down_limit("?skip", "?fetch", "?new_skip", "?new_fetch"),
),
transforming_rewrite(
"select-distinct-dimensions",
distinct(cube_scan(
"?alias_to_cube",
"?members",
"?filters",
"?orders",
"CubeScanLimit:None",
"CubeScanOffset:None",
"?split",
"?can_pushdown_join",
"CubeScanWrapped:false",
"?left_ungrouped",
)),
cube_scan(
"?alias_to_cube",
"?members",
"?filters",
"?orders",
"CubeScanLimit:None",
"CubeScanOffset:None",
"?split",
"?can_pushdown_join",
"CubeScanWrapped:false",
"CubeScanUngrouped:false",
),
self.select_distinct_dimensions(
"?alias_to_cube",
"?members",
"?filters",
"?left_ungrouped",
),
),
// MOD function to binary expr
transforming_rewrite_with_root(
"mod-fun-to-binary-expr",
Expand Down Expand Up @@ -1478,6 +1511,70 @@
)
}

fn select_distinct_dimensions(
&self,
alias_to_cube_var: &'static str,
members_var: &'static str,
filters_var: &'static str,
left_ungrouped_var: &'static str,
) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool {
let alias_to_cube_var = var!(alias_to_cube_var);
let members_var = var!(members_var);
let filters_var = var!(filters_var);
let left_ungrouped_var = var!(left_ungrouped_var);
let meta_context = self.meta_context.clone();

move |egraph, subst| {
let empty_filters = &egraph[subst[filters_var]]
.data
.is_empty_list
.unwrap_or(true);
let ungrouped =
var_iter!(egraph[subst[left_ungrouped_var]], CubeScanUngrouped).any(|v| *v);

if !empty_filters && ungrouped {
return false;

Check warning on line 1536 in rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs#L1536

Added line #L1536 was not covered by tests
}

let res = match egraph
.index(subst[members_var])
.data
.member_name_to_expr
.as_ref()
{
Some(names_to_expr) => {
names_to_expr.list.iter().all(|(_, member, _)| {
// we should allow transform for queries with dimensions only,
// as it doesn't make sense for measures
match member {
Member::Dimension { .. } => true,
Member::VirtualField { .. } => true,
Member::LiteralMember { .. } => true,

Check warning on line 1552 in rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs#L1551-L1552

Added lines #L1551 - L1552 were not covered by tests
_ => false,
}
})
}
None => {
// this might be the case of `SELECT DISTINCT *`
KSDaemon marked this conversation as resolved.
Show resolved Hide resolved
// we need to check that there are only dimensions defined in the referenced cube(s)
var_iter!(egraph[subst[alias_to_cube_var]], CubeScanAliasToCube)
.cloned()
.all(|alias_to_cube| {
alias_to_cube.iter().all(|(_, cube_name)| {
if let Some(cube) = meta_context.find_cube_with_name(&cube_name) {
cube.measures.len() == 0 && cube.segments.len() == 0
} else {
false

Check warning on line 1567 in rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs#L1567

Added line #L1567 was not covered by tests
}
})
})
}
};

res
}
}

fn push_down_non_empty_aggregate(
&self,
alias_to_cube_var: &'static str,
Expand Down
Loading