Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7116] [CH] support outer explode #7207

Merged
merged 8 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -925,4 +925,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
limit,
mode,
child)

override def genStringSplitTransformer(
substraitExprName: String,
srcExpr: ExpressionTransformer,
regexExpr: ExpressionTransformer,
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ case class CHGenerateExecTransformer(
inputRel,
generatorNode,
requiredChildOutputNodes.asJava,
outer,
context,
context.nextOperatorId(this.nodeName))
} else {
Expand All @@ -84,6 +85,7 @@ case class CHGenerateExecTransformer(
generatorNode,
requiredChildOutputNodes.asJava,
getExtensionNodeForValidation,
outer,
context,
context.nextOperatorId(this.nodeName))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,12 @@ case class GetArrayItemTransformer(
ConverterUtils.getTypeNode(getArrayItem.dataType, getArrayItem.nullable))
}
}
case class CHStringSplitTransformer(
substraitExprName: String,
children: Seq[ExpressionTransformer],
original: Expression,
override val dataType: DataType = ArrayType(StringType, containsNull = true))
extends ExpressionTransformer {
// In Spark: split return Array(String), while Array is nullable
// In CH: splitByXXX return Array(Nullable(String))
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ case class GenerateExecTransformer(
generatorNode,
requiredChildOutputNodes.asJava,
getExtensionNode(validation),
outer,
context,
operatorId)
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ ProjectRelParser::parseGenerate(DB::QueryPlanPtr query_plan, const substrait::Re

/// ARRAY JOIN
NameSet array_joined_columns{findArrayJoinNode(splitted_actions_dags.array_join)->result_name};
auto array_join_action = std::make_shared<ArrayJoinAction>(array_joined_columns, false, getContext());
auto array_join_action = std::make_shared<ArrayJoinAction>(array_joined_columns, generate_rel.outer(), getContext());
auto array_join_step = std::make_unique<ArrayJoinStep>(query_plan->getCurrentDataStream(), array_join_action);
array_join_step->setStepDescription("ARRAY JOIN In Generate");
steps.emplace_back(array_join_step.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ public class GenerateRelNode implements RelNode, Serializable {
private final ExpressionNode generator;
private final List<ExpressionNode> childOutput;
private final AdvancedExtensionNode extensionNode;
private final boolean outer;

GenerateRelNode(RelNode input, ExpressionNode generator, List<ExpressionNode> childOutput) {
this(input, generator, childOutput, null);
GenerateRelNode(
RelNode input, ExpressionNode generator, List<ExpressionNode> childOutput, boolean outer) {
this(input, generator, childOutput, null, outer);
}

GenerateRelNode(
RelNode input,
ExpressionNode generator,
List<ExpressionNode> childOutput,
AdvancedExtensionNode extensionNode) {
AdvancedExtensionNode extensionNode,
boolean outer) {
this.input = input;
this.generator = generator;
this.childOutput = childOutput;
this.extensionNode = extensionNode;
this.outer = outer;
}

@Override
Expand All @@ -67,6 +71,8 @@ public Rel toProtobuf() {
generateRelBuilder.addChildOutput(node.toProtobuf());
}

generateRelBuilder.setOuter(outer);

if (extensionNode != null) {
generateRelBuilder.setAdvancedExtension(extensionNode.toProtobuf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,20 +299,22 @@ public static RelNode makeGenerateRel(
RelNode input,
ExpressionNode generator,
List<ExpressionNode> childOutput,
boolean outer,
SubstraitContext context,
Long operatorId) {
context.registerRelToOperator(operatorId);
return new GenerateRelNode(input, generator, childOutput);
return new GenerateRelNode(input, generator, childOutput, outer);
}

public static RelNode makeGenerateRel(
RelNode input,
ExpressionNode generator,
List<ExpressionNode> childOutput,
AdvancedExtensionNode extensionNode,
boolean outer,
SubstraitContext context,
Long operatorId) {
context.registerRelToOperator(operatorId);
return new GenerateRelNode(input, generator, childOutput, extensionNode);
return new GenerateRelNode(input, generator, childOutput, extensionNode, outer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -693,4 +693,12 @@ trait SparkPlanExecApi {
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
HiveUDFTransformer.replaceWithExpressionTransformer(expr, attributeSeq)
}

def genStringSplitTransformer(
substraitExprName: String,
srcExpr: ExpressionTransformer,
regexExpr: ExpressionTransformer,
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,14 @@ object ExpressionConverter extends SQLConfHelper with Logging {
timeAdd.children,
timeAdd
)
case ss: StringSplit =>
BackendsApiManager.getSparkPlanExecApiInstance.genStringSplitTransformer(
substraitExprName,
replaceWithExpressionTransformer0(ss.str, attributeSeq, expressionsMap),
replaceWithExpressionTransformer0(ss.regex, attributeSeq, expressionsMap),
replaceWithExpressionTransformer0(ss.limit, attributeSeq, expressionsMap),
ss
)
case expr =>
GenericExpressionTransformer(
substraitExprName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,21 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
purge = false)
}

testGluten("GLUTEN-7116: Support outer explode") {
sql("create table if not exists test_7116 (id int, name string)")
sql("insert into test_7116 values (1, 'a,b'), (2, null), (null, 'c,d'), (3, '')")
val query =
"""
|select id, col_name
|from test_7116 lateral view outer explode(split(name, ',')) as col_name
|""".stripMargin
val df = sql(query)
checkAnswer(
df,
Seq(Row(1, "a"), Row(1, "b"), Row(2, null), Row(null, "c"), Row(null, "d"), Row(3, "")))
spark.sessionState.catalog.dropTable(
TableIdentifier("test_7116"),
ignoreIfNotExists = true,
purge = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,21 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
purge = false)
}

testGluten("GLUTEN-7116: Support outer explode") {
sql("create table if not exists test_7116 (id int, name string)")
sql("insert into test_7116 values (1, 'a,b'), (2, null), (null, 'c,d'), (3, '')")
val query =
"""
|select id, col_name
|from test_7116 lateral view outer explode(split(name, ',')) as col_name
|""".stripMargin
val df = sql(query)
checkAnswer(
df,
Seq(Row(1, "a"), Row(1, "b"), Row(2, null), Row(null, "c"), Row(null, "d"), Row(3, "")))
spark.sessionState.catalog.dropTable(
TableIdentifier("test_7116"),
ignoreIfNotExists = true,
purge = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,21 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
purge = false)
}

testGluten("GLUTEN-7116: Support outer explode") {
sql("create table if not exists test_7116 (id int, name string)")
sql("insert into test_7116 values (1, 'a,b'), (2, null), (null, 'c,d'), (3, '')")
val query =
"""
|select id, col_name
|from test_7116 lateral view outer explode(split(name, ',')) as col_name
|""".stripMargin
val df = sql(query)
checkAnswer(
df,
Seq(Row(1, "a"), Row(1, "b"), Row(2, null), Row(null, "c"), Row(null, "d"), Row(3, "")))
spark.sessionState.catalog.dropTable(
TableIdentifier("test_7116"),
ignoreIfNotExists = true,
purge = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,21 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
purge = false)
}

testGluten("GLUTEN-7116: Support outer explode") {
sql("create table if not exists test_7116 (id int, name string)")
sql("insert into test_7116 values (1, 'a,b'), (2, null), (null, 'c,d'), (3, '')")
val query =
"""
|select id, col_name
|from test_7116 lateral view outer explode(split(name, ',')) as col_name
|""".stripMargin
val df = sql(query)
checkAnswer(
df,
Seq(Row(1, "a"), Row(1, "b"), Row(2, null), Row(null, "c"), Row(null, "d"), Row(3, "")))
spark.sessionState.catalog.dropTable(
TableIdentifier("test_7116"),
ignoreIfNotExists = true,
purge = false)
}
}
Loading