From d3cf0534f3ce10aea27dbbbf6f5a5055c5eb5945 Mon Sep 17 00:00:00 2001 From: Mingliang Zhu Date: Mon, 9 Sep 2024 09:47:15 +0800 Subject: [PATCH] [GLUTEN-7144][VL] RAS: Spark input file function support (#7146) Closes #7144 --- .../backendsapi/velox/VeloxRuleApi.scala | 2 + .../ScalarFunctionsValidateSuite.scala | 77 ++++++++----------- .../enumerated/RasOffloadProject.scala | 1 - 3 files changed, 34 insertions(+), 46 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index b278224b2a82..6204ab092a39 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -92,6 +92,7 @@ private object VeloxRuleApi { def injectRas(injector: RasInjector): Unit = { // Gluten RAS: Pre rules. injector.inject(_ => RemoveTransitions) + injector.inject(_ => PushDownInputFileExpression.PreOffload) injector.inject(c => FallbackOnANSIMode.apply(c.session)) injector.inject(c => PlanOneRowRelation.apply(c.session)) injector.inject(_ => FallbackEmptySchemaRelation()) @@ -106,6 +107,7 @@ private object VeloxRuleApi { injector.inject(_ => RemoveTransitions) injector.inject(_ => RemoveNativeWriteFilesSortAndProject()) injector.inject(c => RewriteTransformer.apply(c.session)) + injector.inject(_ => PushDownInputFileExpression.PostOffload) injector.inject(_ => EnsureLocalSortRequirements) injector.inject(_ => EliminateLocalSort) injector.inject(_ => CollapseProjectExecTransformer) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala index a376fd488dd7..57f194f4bea3 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala @@ -28,38 +28,6 @@ class ScalarFunctionsValidateSuiteRasOff extends ScalarFunctionsValidateSuite { super.sparkConf .set("spark.gluten.ras.enabled", "false") } - import testImplicits._ - // Since https://github.com/apache/incubator-gluten/pull/6200. - test("Test input_file_name function") { - runQueryAndCompare("""SELECT input_file_name(), l_orderkey - | from lineitem limit 100""".stripMargin) { - checkGlutenOperatorMatch[ProjectExecTransformer] - } - - runQueryAndCompare("""SELECT input_file_name(), l_orderkey - | from - | (select l_orderkey from lineitem - | union all - | select o_orderkey as l_orderkey from orders) - | limit 100""".stripMargin) { - checkGlutenOperatorMatch[ProjectExecTransformer] - } - withTempPath { - path => - Seq(1, 2, 3).toDF("a").write.json(path.getCanonicalPath) - spark.read.json(path.getCanonicalPath).createOrReplaceTempView("json_table") - val sql = - """ - |SELECT input_file_name(), a - |FROM - |(SELECT a FROM json_table - |UNION ALL - |SELECT l_orderkey as a FROM lineitem) - |LIMIT 100 - |""".stripMargin - compareResultsAgainstVanillaSpark(sql, true, { _ => }) - } - } } class ScalarFunctionsValidateSuiteRasOn extends ScalarFunctionsValidateSuite { @@ -67,19 +35,6 @@ class ScalarFunctionsValidateSuiteRasOn extends ScalarFunctionsValidateSuite { super.sparkConf .set("spark.gluten.ras.enabled", "true") } - - // TODO: input_file_name is not yet supported in RAS - ignore("Test input_file_name function") { - runQueryAndCompare("""SELECT input_file_name(), l_orderkey - | from lineitem limit 100""".stripMargin) { _ => } - - runQueryAndCompare("""SELECT input_file_name(), l_orderkey - | from - | (select l_orderkey from lineitem - | union all - | select o_orderkey as l_orderkey from orders) - | limit 100""".stripMargin) { _ => } - } } abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { @@ -1381,6 +1336,38 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } + test("Test input_file_name function") { + runQueryAndCompare("""SELECT input_file_name(), l_orderkey + | from lineitem limit 100""".stripMargin) { + checkGlutenOperatorMatch[ProjectExecTransformer] + } + + runQueryAndCompare("""SELECT input_file_name(), l_orderkey + | from + | (select l_orderkey from lineitem + | union all + | select o_orderkey as l_orderkey from orders) + | limit 100""".stripMargin) { + checkGlutenOperatorMatch[ProjectExecTransformer] + } + + withTempPath { + path => + Seq(1, 2, 3).toDF("a").write.json(path.getCanonicalPath) + spark.read.json(path.getCanonicalPath).createOrReplaceTempView("json_table") + val sql = + """ + |SELECT input_file_name(), a + |FROM + |(SELECT a FROM json_table + |UNION ALL + |SELECT l_orderkey as a FROM lineitem) + |LIMIT 100 + |""".stripMargin + compareResultsAgainstVanillaSpark(sql, true, { _ => }) + } + } + testWithSpecifiedSparkVersion("array insert", Some("3.4")) { withTempPath { path => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadProject.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadProject.scala index 4b55fff0ada1..0bbf57499b73 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadProject.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffloadProject.scala @@ -20,7 +20,6 @@ import org.apache.gluten.execution.ProjectExecTransformer import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} -/** TODO: Map [[org.apache.gluten.extension.columnar.OffloadProject]] to RAS. */ object RasOffloadProject extends RasOffload { override def offload(node: SparkPlan): SparkPlan = node match { case ProjectExec(projectList, child) =>