Skip to content

Commit

Permalink
[GLUTEN-4451] [CH] fix header maybe changed by FilterTransform (#6166)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
Rollback header if changed in FilterTransform

(Fixes: #4451)

How was this patch tested?
This patch was tested by integration tests.
  • Loading branch information
shuai-xu authored Jun 21, 2024
1 parent 812e976 commit e25ab2e
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2638,5 +2638,55 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr
spark.sql("drop table test_tbl_5910_0")
spark.sql("drop table test_tbl_5910_1")
}

test("GLUTEN-4451: Fix schema may be changed by filter") {
val create_sql =
"""
|create table if not exists test_tbl_4451(
| month_day string,
| month_dif int,
| is_month_new string,
| country string,
| os string,
| mr bigint
|) using parquet
|PARTITIONED BY (
| day string,
| app_name string)
|""".stripMargin
val insert_sql1 =
"INSERT into test_tbl_4451 partition (day='2024-06-01', app_name='abc') " +
"values('2024-06-01', 0, '1', 'CN', 'iOS', 100)"
val insert_sql2 =
"INSERT into test_tbl_4451 partition (day='2024-06-01', app_name='abc') " +
"values('2024-06-01', 0, '1', 'CN', 'iOS', 50)"
val insert_sql3 =
"INSERT into test_tbl_4451 partition (day='2024-06-01', app_name='abc') " +
"values('2024-06-01', 1, '1', 'CN', 'iOS', 80)"
spark.sql(create_sql)
spark.sql(insert_sql1)
spark.sql(insert_sql2)
spark.sql(insert_sql3)
val select_sql =
"""
|SELECT * FROM (
| SELECT
| month_day,
| country,
| if(os = 'ALite','Android',os) AS os,
| is_month_new,
| nvl(sum(if(month_dif = 0, mr, 0)),0) AS `month0_n`,
| nvl(sum(if(month_dif = 1, mr, 0)) / sum(if(month_dif = 0, mr, 0)),0) AS `month1_rate`,
| '2024-06-18' as day,
| app_name
| FROM test_tbl_4451
| GROUP BY month_day,country,if(os = 'ALite','Android',os),is_month_new,app_name
|) tt
|WHERE month0_n > 0 AND month1_rate <= 1 AND os IN ('all','Android','iOS')
| AND app_name IS NOT NULL
|""".stripMargin
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
spark.sql("drop table test_tbl_4451")
}
}
// scalastyle:on line.size.limit
7 changes: 6 additions & 1 deletion cpp-ch/local-engine/Parser/FilterRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ DB::QueryPlanPtr FilterRelParser::parse(DB::QueryPlanPtr query_plan, const subst
filter_step->setStepDescription("WHERE");
steps.emplace_back(filter_step.get());
query_plan->addStep(std::move(filter_step));


// header maybe changed, need to rollback it
if (!blocksHaveEqualStructure(input_header, query_plan->getCurrentDataStream().header)) {
steps.emplace_back(getPlanParser()->addRollbackFilterHeaderStep(query_plan, input_header));
}

// remove nullable
auto * remove_null_step = getPlanParser()->addRemoveNullableStep(*query_plan, non_nullable_columns);
if (remove_null_step)
Expand Down
13 changes: 13 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,19 @@ IQueryPlanStep * SerializedPlanParser::addRemoveNullableStep(QueryPlan & plan, c
return step_ptr;
}

IQueryPlanStep * SerializedPlanParser::addRollbackFilterHeaderStep(QueryPlanPtr & query_plan, const Block & input_header)
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan->getCurrentDataStream().header.getColumnsWithTypeAndName(),
input_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto expression_step = std::make_unique<ExpressionStep>(query_plan->getCurrentDataStream(), convert_actions_dag);
expression_step->setStepDescription("Generator for rollback filter");
auto * step_ptr = expression_step.get();
query_plan->addStep(std::move(expression_step));
return step_ptr;
}

DataTypePtr wrapNullableType(substrait::Type_Nullability nullable, DataTypePtr nested_type)
{
return wrapNullableType(nullable == substrait::Type_Nullability_NULLABILITY_NULLABLE, nested_type);
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ class SerializedPlanParser
static std::string getFunctionName(const std::string & function_sig, const substrait::Expression_ScalarFunction & function);

IQueryPlanStep * addRemoveNullableStep(QueryPlan & plan, const std::set<String> & columns);
IQueryPlanStep * addRollbackFilterHeaderStep(QueryPlanPtr & query_plan, const Block & input_header);

static ContextMutablePtr global_context;
static Context::ConfigurationPtr config;
Expand Down

0 comments on commit e25ab2e

Please sign in to comment.