Skip to content

Commit

Permalink
init (apache#12453)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms authored Sep 15, 2024
1 parent 3ac92ad commit aac10a4
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 27 deletions.
37 changes: 25 additions & 12 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use crate::unparser::utils::unproject_agg_exprs;
use datafusion_common::{
internal_err, not_impl_err, plan_err, Column, DataFusionError, Result, TableReference,
internal_err, not_impl_err, plan_err,
tree_node::{TransformedResult, TreeNode},
Column, DataFusionError, Result, TableReference,
};
use datafusion_expr::{
expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
Expand All @@ -34,7 +36,7 @@ use super::{
rewrite::{
inject_column_aliases, normalize_union_schema,
rewrite_plan_for_sort_on_non_projected_fields,
subquery_alias_inner_query_and_columns,
subquery_alias_inner_query_and_columns, TableAliasRewriter,
},
utils::{find_agg_node_within_select, unproject_window_exprs, AggVariant},
Unparser,
Expand Down Expand Up @@ -554,13 +556,11 @@ impl Unparser<'_> {
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::TableScan(table_scan) => {
// TODO: support filters for table scan with alias. Remove this check after #12368 issue.
// see the issue: https://github.com/apache/datafusion/issues/12368
if alias.is_some() && !table_scan.filters.is_empty() {
return not_impl_err!(
"Subquery alias is not supported for table scan with pushdown filters"
);
}
let mut filter_alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
table_schema: table_scan.source.schema(),
alias_name: alias_name.clone(),
});

let mut builder = LogicalPlanBuilder::scan(
table_scan.table_name.clone(),
Expand All @@ -587,12 +587,25 @@ impl Unparser<'_> {
builder = builder.project(project_columns)?;
}

let filter_expr = table_scan
let filter_expr: Result<Option<Expr>> = table_scan
.filters
.iter()
.cloned()
.reduce(|acc, expr| acc.and(expr));
if let Some(filter) = filter_expr {
.map(|expr| {
if let Some(ref mut rewriter) = filter_alias_rewriter {
expr.rewrite(rewriter).data()
} else {
Ok(expr)
}
})
.reduce(|acc, expr_result| {
acc.and_then(|acc_expr| {
expr_result.map(|expr| acc_expr.and(expr))
})
})
.transpose();

if let Some(filter) = filter_expr? {
builder = builder.filter(filter)?;
}

Expand Down
40 changes: 38 additions & 2 deletions datafusion/sql/src/unparser/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use std::{
sync::Arc,
};

use arrow_schema::SchemaRef;
use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode},
Result,
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter},
Column, Result, TableReference,
};
use datafusion_expr::{expr::Alias, tree_node::transform_sort_vec};
use datafusion_expr::{Expr, LogicalPlan, Projection, Sort, SortExpr};
Expand Down Expand Up @@ -300,3 +301,38 @@ fn find_projection(logical_plan: &LogicalPlan) -> Option<&Projection> {
_ => None,
}
}
/// A `TreeNodeRewriter` implementation that rewrites `Expr::Column` expressions by
/// replacing the column's name with an alias if the column exists in the provided schema.
///
/// This is typically used to apply table aliases in query plans, ensuring that
/// the column references in the expressions use the correct table alias.
///
/// # Fields
///
/// * `table_schema`: The schema (`SchemaRef`) representing the table structure
/// from which the columns are referenced. This is used to look up columns by their names.
/// * `alias_name`: The alias (`TableReference`) that will replace the table name
/// in the column references when applicable.
pub struct TableAliasRewriter {
pub table_schema: SchemaRef,
pub alias_name: TableReference,
}

impl TreeNodeRewriter for TableAliasRewriter {
type Node = Expr;

fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
match expr {
Expr::Column(column) => {
if let Ok(field) = self.table_schema.field_with_name(&column.name) {
let new_column =
Column::new(Some(self.alias_name.clone()), field.name().clone());
Ok(Transformed::yes(Expr::Column(new_column)))
} else {
Ok(Transformed::no(Expr::Column(column)))
}
}
_ => Ok(Transformed::no(expr)),
}
}
}
26 changes: 13 additions & 13 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,19 +705,19 @@ fn test_table_scan_pushdown() -> Result<()> {
"SELECT * FROM t1 WHERE ((t1.id > 1) AND (t1.age < 2))"
);

// TODO: support filters for table scan with alias. Enable this test after #12368 issue is fixed
// see the issue: https://github.com/apache/datafusion/issues/12368
// let table_scan_with_filter_alias = table_scan_with_filters(
// Some("t1"),
// &schema,
// None,
// vec![col("id").gt(col("age"))],
// )?.alias("ta")?.build()?;
// let table_scan_with_filter_alias = plan_to_sql(&table_scan_with_filter_alias)?;
// assert_eq!(
// format!("{}", table_scan_with_filter_alias),
// "SELECT * FROM t1 AS ta WHERE (ta.id > ta.age)"
// );
let table_scan_with_filter_alias = table_scan_with_filters(
Some("t1"),
&schema,
None,
vec![col("id").gt(col("age"))],
)?
.alias("ta")?
.build()?;
let table_scan_with_filter_alias = plan_to_sql(&table_scan_with_filter_alias)?;
assert_eq!(
format!("{}", table_scan_with_filter_alias),
"SELECT * FROM t1 AS ta WHERE (ta.id > ta.age)"
);

let table_scan_with_projection_and_filter = table_scan_with_filters(
Some("t1"),
Expand Down

0 comments on commit aac10a4

Please sign in to comment.