From 7f2f19b0e96928ef996dea8ded3ef6743d95e601 Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Thu, 15 Aug 2024 11:48:47 +0800 Subject: [PATCH 1/2] add test --- .../spark/sql/sources/GlutenInsertSuite.scala | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index 5c60115c5e1d..65d681263bcf 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -16,23 +16,21 @@ */ package org.apache.spark.sql.sources -import org.apache.gluten.execution.SortExecTransformer +import org.apache.gluten.execution.{ProjectExecTransformer, SortExecTransformer} import org.apache.gluten.extension.GlutenPlan - +import org.apache.hadoop.fs.{Path, RawLocalFileSystem} import org.apache.spark.SparkConf import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.QueryExecutionListener -import org.apache.hadoop.fs.{Path, RawLocalFileSystem} - import java.io.{File, IOException} class GlutenInsertSuite @@ -147,6 +145,33 @@ class GlutenInsertSuite assert(parts == expectedPartitionNames) } + testGluten("offload empty2null when v1writes fallback") { + withSQLConf((SQLConf.MAX_RECORDS_PER_FILE.key, "1000")) { + withTable("pt") { + spark.sql("CREATE TABLE pt (c1 int) USING PARQUET PARTITIONED BY(p string)") + + val df = spark.sql(s""" + |INSERT OVERWRITE TABLE pt PARTITION(p) + |SELECT c1, c2 as p FROM source + |""".stripMargin) + + val writeFiles = stripAQEPlan( + df.queryExecution.executedPlan + .asInstanceOf[CommandResultExec] + .commandPhysicalPlan).children.head + assert(!writeFiles.isInstanceOf[ColumnarWriteFilesExec]) + assert(writeFiles.exists(_.isInstanceOf[ProjectExecTransformer])) + val projectExecTransformer = writeFiles.find(_.isInstanceOf[ProjectExecTransformer]).get.asInstanceOf[ProjectExecTransformer] + projectExecTransformer.projectList.find(_.toString().contains("empty2null")) + + // The partition column should never be empty + checkAnswer( + spark.sql("SELECT * FROM pt"), + spark.sql("SELECT c1, if(c2 = '', null, c2) FROM source")) + } + } + } + testGluten("remove v1writes sort and project") { // Only string type has empty2null expression withTable("pt") { From 492cd71f0e86074f244743a27943618ee4e06824 Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Thu, 15 Aug 2024 11:49:59 +0800 Subject: [PATCH 2/2] fix format --- .../apache/spark/sql/sources/GlutenInsertSuite.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index 65d681263bcf..ca0ada39ceec 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -18,19 +18,21 @@ package org.apache.spark.sql.sources import org.apache.gluten.execution.{ProjectExecTransformer, SortExecTransformer} import org.apache.gluten.extension.GlutenPlan -import org.apache.hadoop.fs.{Path, RawLocalFileSystem} + import org.apache.spark.SparkConf import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.QueryExecutionListener +import org.apache.hadoop.fs.{Path, RawLocalFileSystem} + import java.io.{File, IOException} class GlutenInsertSuite @@ -161,7 +163,10 @@ class GlutenInsertSuite .commandPhysicalPlan).children.head assert(!writeFiles.isInstanceOf[ColumnarWriteFilesExec]) assert(writeFiles.exists(_.isInstanceOf[ProjectExecTransformer])) - val projectExecTransformer = writeFiles.find(_.isInstanceOf[ProjectExecTransformer]).get.asInstanceOf[ProjectExecTransformer] + val projectExecTransformer = writeFiles + .find(_.isInstanceOf[ProjectExecTransformer]) + .get + .asInstanceOf[ProjectExecTransformer] projectExecTransformer.projectList.find(_.toString().contains("empty2null")) // The partition column should never be empty