Skip to content

Commit

Permalink
[GLUTEN-7116] [CH] support outer explode (#7207)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
Support lateral view outer explode

(Fixes: #7116)

How was this patch tested?
This patch was tested by unit tests
  • Loading branch information
shuai-xu authored Sep 14, 2024
1 parent 7797243 commit cd180f0
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -921,4 +921,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
limit,
mode,
child)

override def genStringSplitTransformer(
substraitExprName: String,
srcExpr: ExpressionTransformer,
regexExpr: ExpressionTransformer,
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ case class CHGenerateExecTransformer(
inputRel,
generatorNode,
requiredChildOutputNodes.asJava,
outer,
context,
context.nextOperatorId(this.nodeName))
} else {
Expand All @@ -84,6 +85,7 @@ case class CHGenerateExecTransformer(
generatorNode,
requiredChildOutputNodes.asJava,
getExtensionNodeForValidation,
outer,
context,
context.nextOperatorId(this.nodeName))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,12 @@ case class GetArrayItemTransformer(
ConverterUtils.getTypeNode(getArrayItem.dataType, getArrayItem.nullable))
}
}
case class CHStringSplitTransformer(
substraitExprName: String,
children: Seq[ExpressionTransformer],
original: Expression,
override val dataType: DataType = ArrayType(StringType, containsNull = true))
extends ExpressionTransformer {
// In Spark: split return Array(String), while Array is nullable
// In CH: splitByXXX return Array(Nullable(String))
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ case class GenerateExecTransformer(
generatorNode,
requiredChildOutputNodes.asJava,
getExtensionNode(validation),
outer,
context,
operatorId)
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ ProjectRelParser::parseGenerate(DB::QueryPlanPtr query_plan, const substrait::Re

/// ARRAY JOIN
NameSet array_joined_columns{findArrayJoinNode(splitted_actions_dags.array_join)->result_name};
auto array_join_action = std::make_shared<ArrayJoinAction>(array_joined_columns, false, getContext());
auto array_join_action = std::make_shared<ArrayJoinAction>(array_joined_columns, generate_rel.outer(), getContext());
auto array_join_step = std::make_unique<ArrayJoinStep>(query_plan->getCurrentDataStream(), array_join_action);
array_join_step->setStepDescription("ARRAY JOIN In Generate");
steps.emplace_back(array_join_step.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ public class GenerateRelNode implements RelNode, Serializable {
private final ExpressionNode generator;
private final List<ExpressionNode> childOutput;
private final AdvancedExtensionNode extensionNode;
private final boolean outer;

GenerateRelNode(RelNode input, ExpressionNode generator, List<ExpressionNode> childOutput) {
this(input, generator, childOutput, null);
GenerateRelNode(
RelNode input, ExpressionNode generator, List<ExpressionNode> childOutput, boolean outer) {
this(input, generator, childOutput, null, outer);
}

GenerateRelNode(
RelNode input,
ExpressionNode generator,
List<ExpressionNode> childOutput,
AdvancedExtensionNode extensionNode) {
AdvancedExtensionNode extensionNode,
boolean outer) {
this.input = input;
this.generator = generator;
this.childOutput = childOutput;
this.extensionNode = extensionNode;
this.outer = outer;
}

@Override
Expand All @@ -67,6 +71,8 @@ public Rel toProtobuf() {
generateRelBuilder.addChildOutput(node.toProtobuf());
}

generateRelBuilder.setOuter(outer);

if (extensionNode != null) {
generateRelBuilder.setAdvancedExtension(extensionNode.toProtobuf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,20 +299,22 @@ public static RelNode makeGenerateRel(
RelNode input,
ExpressionNode generator,
List<ExpressionNode> childOutput,
boolean outer,
SubstraitContext context,
Long operatorId) {
context.registerRelToOperator(operatorId);
return new GenerateRelNode(input, generator, childOutput);
return new GenerateRelNode(input, generator, childOutput, outer);
}

public static RelNode makeGenerateRel(
RelNode input,
ExpressionNode generator,
List<ExpressionNode> childOutput,
AdvancedExtensionNode extensionNode,
boolean outer,
SubstraitContext context,
Long operatorId) {
context.registerRelToOperator(operatorId);
return new GenerateRelNode(input, generator, childOutput, extensionNode);
return new GenerateRelNode(input, generator, childOutput, extensionNode, outer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -682,4 +682,12 @@ trait SparkPlanExecApi {
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
HiveUDFTransformer.replaceWithExpressionTransformer(expr, attributeSeq)
}

def genStringSplitTransformer(
substraitExprName: String,
srcExpr: ExpressionTransformer,
regexExpr: ExpressionTransformer,
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,14 @@ object ExpressionConverter extends SQLConfHelper with Logging {
timeAdd.children,
timeAdd
)
case ss: StringSplit =>
BackendsApiManager.getSparkPlanExecApiInstance.genStringSplitTransformer(
substraitExprName,
replaceWithExpressionTransformer0(ss.str, attributeSeq, expressionsMap),
replaceWithExpressionTransformer0(ss.regex, attributeSeq, expressionsMap),
replaceWithExpressionTransformer0(ss.limit, attributeSeq, expressionsMap),
ss
)
case expr =>
GenericExpressionTransformer(
substraitExprName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,21 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
purge = false)
}

testGluten("GLUTEN-7116: Support outer explode") {
sql("create table if not exists test_7116 (id int, name string)")
sql("insert into test_7116 values (1, 'a,b'), (2, null), (null, 'c,d'), (3, '')")
val query =
"""
|select id, col_name
|from test_7116 lateral view outer explode(split(name, ',')) as col_name
|""".stripMargin
val df = sql(query)
checkAnswer(
df,
Seq(Row(1, "a"), Row(1, "b"), Row(2, null), Row(null, "c"), Row(null, "d"), Row(3, "")))
spark.sessionState.catalog.dropTable(
TableIdentifier("test_7116"),
ignoreIfNotExists = true,
purge = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,21 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
purge = false)
}

testGluten("GLUTEN-7116: Support outer explode") {
sql("create table if not exists test_7116 (id int, name string)")
sql("insert into test_7116 values (1, 'a,b'), (2, null), (null, 'c,d'), (3, '')")
val query =
"""
|select id, col_name
|from test_7116 lateral view outer explode(split(name, ',')) as col_name
|""".stripMargin
val df = sql(query)
checkAnswer(
df,
Seq(Row(1, "a"), Row(1, "b"), Row(2, null), Row(null, "c"), Row(null, "d"), Row(3, "")))
spark.sessionState.catalog.dropTable(
TableIdentifier("test_7116"),
ignoreIfNotExists = true,
purge = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,21 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
purge = false)
}

testGluten("GLUTEN-7116: Support outer explode") {
sql("create table if not exists test_7116 (id int, name string)")
sql("insert into test_7116 values (1, 'a,b'), (2, null), (null, 'c,d'), (3, '')")
val query =
"""
|select id, col_name
|from test_7116 lateral view outer explode(split(name, ',')) as col_name
|""".stripMargin
val df = sql(query)
checkAnswer(
df,
Seq(Row(1, "a"), Row(1, "b"), Row(2, null), Row(null, "c"), Row(null, "d"), Row(3, "")))
spark.sessionState.catalog.dropTable(
TableIdentifier("test_7116"),
ignoreIfNotExists = true,
purge = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,21 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
purge = false)
}

testGluten("GLUTEN-7116: Support outer explode") {
sql("create table if not exists test_7116 (id int, name string)")
sql("insert into test_7116 values (1, 'a,b'), (2, null), (null, 'c,d'), (3, '')")
val query =
"""
|select id, col_name
|from test_7116 lateral view outer explode(split(name, ',')) as col_name
|""".stripMargin
val df = sql(query)
checkAnswer(
df,
Seq(Row(1, "a"), Row(1, "b"), Row(2, null), Row(null, "c"), Row(null, "d"), Row(3, "")))
spark.sessionState.catalog.dropTable(
TableIdentifier("test_7116"),
ignoreIfNotExists = true,
purge = false)
}
}

0 comments on commit cd180f0

Please sign in to comment.