Skip to content

Commit

Permalink
[GLUTEN-5182] [CH] fix fail to parse post join filter (apache#5183)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
This pr fix the bug that may fail to parse post join filter if contains complex expressions.

(Fixes: apache#5182)

How was this patch tested?
This patch was tested by ut.
  • Loading branch information
shuai-xu authored Apr 7, 2024
1 parent 7dd0180 commit 9d6f95d
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 15 deletions.
30 changes: 15 additions & 15 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1927,23 +1927,23 @@ ActionsDAGPtr ASTParser::convertToActions(const NamesAndTypesList & name_and_typ
ASTPtr ASTParser::parseToAST(const Names & names, const substrait::Expression & rel)
{
LOG_DEBUG(&Poco::Logger::get("ASTParser"), "substrait plan:\n{}", rel.DebugString());
if (rel.has_singular_or_list())
return parseArgumentToAST(names, rel);
if (!rel.has_scalar_function())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "the root of expression should be a scalar function:\n {}", rel.DebugString());

const auto & scalar_function = rel.scalar_function();
auto function_signature = function_mapping.at(std::to_string(rel.scalar_function().function_reference()));
if (rel.has_scalar_function())
{
const auto & scalar_function = rel.scalar_function();
auto function_signature = function_mapping.at(std::to_string(rel.scalar_function().function_reference()));

auto substrait_name = function_signature.substr(0, function_signature.find(':'));
auto func_parser = FunctionParserFactory::instance().tryGet(substrait_name, plan_parser);
String function_name = func_parser ? func_parser->getCHFunctionName(scalar_function)
: SerializedPlanParser::getFunctionName(function_signature, scalar_function);
auto substrait_name = function_signature.substr(0, function_signature.find(':'));
auto func_parser = FunctionParserFactory::instance().tryGet(substrait_name, plan_parser);
String function_name = func_parser ? func_parser->getCHFunctionName(scalar_function)
: SerializedPlanParser::getFunctionName(function_signature, scalar_function);

ASTs ast_args;
parseFunctionArgumentsToAST(names, scalar_function, ast_args);
ASTs ast_args;
parseFunctionArgumentsToAST(names, scalar_function, ast_args);

return makeASTFunction(function_name, ast_args);
return makeASTFunction(function_name, ast_args);
}
else
return parseArgumentToAST(names, rel);
}

void ASTParser::parseFunctionArgumentsToAST(
Expand Down Expand Up @@ -2375,4 +2375,4 @@ std::string NonNullableColumnsResolver::safeGetFunctionName(
return "";
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,43 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
ignoreIfNotExists = true,
purge = false)
}

testGluten("5182: Fix failed to parse post join filters") {
withSQLConf(
"spark.sql.hive.convertMetastoreParquet" -> "false",
"spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
sql("DROP TABLE IF EXISTS test_5128_0;")
sql("DROP TABLE IF EXISTS test_5128_1;")
sql(
"CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count int, " +
"status bigint, ts bigint, vm_typeid int) " +
"USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day` STRING);")
sql(
"CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
"ss_start_time bigint, ss_end_time bigint) " +
"USING hive OPTIONS(fileFormat 'parquet');")
sql(
"INSERT INTO test_5128_0 partition(day='2024-03-31') " +
"VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
val df = spark.sql(
"select ee.from_uid as uid,day, vgift_typeid, money from " +
"(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
"t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
"t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
"(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
"where day between '2024-03-30' and '2024-03-31' and status=1 and vm_typeid=2) t_a " +
"left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
"where t_b.groupid in (1,2)) ee where ss_id=1;")
checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
}
spark.sessionState.catalog.dropTable(
TableIdentifier("test_5128_0"),
ignoreIfNotExists = true,
purge = false)
spark.sessionState.catalog.dropTable(
TableIdentifier("test_5128_1"),
ignoreIfNotExists = true,
purge = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,43 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
ignoreIfNotExists = true,
purge = false)
}

testGluten("5182: Fix failed to parse post join filters") {
withSQLConf(
"spark.sql.hive.convertMetastoreParquet" -> "false",
"spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
sql("DROP TABLE IF EXISTS test_5128_0;")
sql("DROP TABLE IF EXISTS test_5128_1;")
sql(
"CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count int, " +
"status bigint, ts bigint, vm_typeid int) " +
"USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day` STRING);")
sql(
"CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
"ss_start_time bigint, ss_end_time bigint) " +
"USING hive OPTIONS(fileFormat 'parquet');")
sql(
"INSERT INTO test_5128_0 partition(day='2024-03-31') " +
"VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
val df = spark.sql(
"select ee.from_uid as uid,day, vgift_typeid, money from " +
"(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
"t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
"t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
"(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
"where day between '2024-03-30' and '2024-03-31' and status=1 and vm_typeid=2) t_a " +
"left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
"where t_b.groupid in (1,2)) ee where ss_id=1;")
checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
}
spark.sessionState.catalog.dropTable(
TableIdentifier("test_5128_0"),
ignoreIfNotExists = true,
purge = false)
spark.sessionState.catalog.dropTable(
TableIdentifier("test_5128_1"),
ignoreIfNotExists = true,
purge = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,43 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
ignoreIfNotExists = true,
purge = false)
}

testGluten("5182: Fix failed to parse post join filters") {
withSQLConf(
"spark.sql.hive.convertMetastoreParquet" -> "false",
"spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
sql("DROP TABLE IF EXISTS test_5128_0;")
sql("DROP TABLE IF EXISTS test_5128_1;")
sql(
"CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count int, " +
"status bigint, ts bigint, vm_typeid int) " +
"USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day` STRING);")
sql(
"CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
"ss_start_time bigint, ss_end_time bigint) " +
"USING hive OPTIONS(fileFormat 'parquet');")
sql(
"INSERT INTO test_5128_0 partition(day='2024-03-31') " +
"VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
val df = spark.sql(
"select ee.from_uid as uid,day, vgift_typeid, money from " +
"(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
"t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
"t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
"(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
"where day between '2024-03-30' and '2024-03-31' and status=1 and vm_typeid=2) t_a " +
"left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
"where t_b.groupid in (1,2)) ee where ss_id=1;")
checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
}
spark.sessionState.catalog.dropTable(
TableIdentifier("test_5128_0"),
ignoreIfNotExists = true,
purge = false)
spark.sessionState.catalog.dropTable(
TableIdentifier("test_5128_1"),
ignoreIfNotExists = true,
purge = false)
}
}

0 comments on commit 9d6f95d

Please sign in to comment.