Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohahaha committed Aug 15, 2024
1 parent 5897d34 commit 7f2f19b
Showing 1 changed file with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,21 @@
*/
package org.apache.spark.sql.sources

import org.apache.gluten.execution.SortExecTransformer
import org.apache.gluten.execution.{ProjectExecTransformer, SortExecTransformer}
import org.apache.gluten.extension.GlutenPlan

import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
import org.apache.spark.SparkConf
import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.QueryExecutionListener

import org.apache.hadoop.fs.{Path, RawLocalFileSystem}

import java.io.{File, IOException}

class GlutenInsertSuite
Expand Down Expand Up @@ -147,6 +145,33 @@ class GlutenInsertSuite
assert(parts == expectedPartitionNames)
}

testGluten("offload empty2null when v1writes fallback") {
withSQLConf((SQLConf.MAX_RECORDS_PER_FILE.key, "1000")) {
withTable("pt") {
spark.sql("CREATE TABLE pt (c1 int) USING PARQUET PARTITIONED BY(p string)")

val df = spark.sql(s"""
|INSERT OVERWRITE TABLE pt PARTITION(p)
|SELECT c1, c2 as p FROM source
|""".stripMargin)

val writeFiles = stripAQEPlan(
df.queryExecution.executedPlan
.asInstanceOf[CommandResultExec]
.commandPhysicalPlan).children.head
assert(!writeFiles.isInstanceOf[ColumnarWriteFilesExec])
assert(writeFiles.exists(_.isInstanceOf[ProjectExecTransformer]))
val projectExecTransformer = writeFiles.find(_.isInstanceOf[ProjectExecTransformer]).get.asInstanceOf[ProjectExecTransformer]
projectExecTransformer.projectList.find(_.toString().contains("empty2null"))

// The partition column should never be empty
checkAnswer(
spark.sql("SELECT * FROM pt"),
spark.sql("SELECT c1, if(c2 = '', null, c2) FROM source"))
}
}
}

testGluten("remove v1writes sort and project") {
// Only string type has empty2null expression
withTable("pt") {
Expand Down

0 comments on commit 7f2f19b

Please sign in to comment.