Skip to content

Commit

Permalink
[VL] Add VeloxPartitionedTableTPCHSuite for testing partitioned tables (
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 authored Dec 14, 2023
1 parent 4c6f2c4 commit 211fee4
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package io.glutenproject.execution
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, TestUtils}

import java.io.File

abstract class VeloxTPCHTableSupport extends VeloxWholeStageTransformerSuite {
protected val rootPath: String = getClass.getResource("/").getPath
override protected val backend: String = "velox"
Expand Down Expand Up @@ -239,3 +241,28 @@ class VeloxTPCHV2BhjSuite extends VeloxTPCHSuite {
.set("spark.sql.autoBroadcastJoinThreshold", "30M")
}
}

class VeloxPartitionedTableTPCHSuite extends VeloxTPCHSuite {
override protected def createTPCHNotNullTables(): Unit = {
TPCHTables = TPCHTable.map {
table =>
val tableDir = getClass.getResource(resourcePath).getFile
val tablePath = new File(tableDir, table.name).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)

tableDF.write
.format(fileFormat)
.partitionBy(table.partitionColumns: _*)
.mode("append")
.saveAsTable(table.name)
(table.name, tableDF)
}.toMap
}

override protected def afterAll(): Unit = {
if (TPCHTables != null) {
TPCHTables.keys.foreach(v => spark.sql(s"DROP TABLE IF EXISTS $v"))
}
super.afterAll()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,25 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.io.Source
import scala.reflect.ClassTag

case class Table(name: String, partitionColumns: Seq[String])

abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSparkSession {

protected val backend: String
protected val resourcePath: String
protected val fileFormat: String
protected val logLevel: String = "WARN"

protected val TPCHTableNames: Seq[String] =
Seq("customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier")
protected val TPCHTable = Seq(
Table("part", partitionColumns = "p_brand" :: Nil),
Table("supplier", partitionColumns = Nil),
Table("partsupp", partitionColumns = Nil),
Table("customer", partitionColumns = "c_mktsegment" :: Nil),
Table("orders", partitionColumns = "o_orderdate" :: Nil),
Table("lineitem", partitionColumns = "l_shipdate" :: Nil),
Table("nation", partitionColumns = Nil),
Table("region", partitionColumns = Nil)
)

protected var TPCHTables: Map[String, DataFrame] = _

Expand All @@ -64,14 +74,17 @@ abstract class WholeStageTransformerSuite extends GlutenQueryTest with SharedSpa
}

protected def createTPCHNotNullTables(): Unit = {
TPCHTables = TPCHTableNames.map {
table =>
val tableDir = getClass.getResource(resourcePath).getFile
val tablePath = new File(tableDir, table).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.createOrReplaceTempView(table)
(table, tableDF)
}.toMap
TPCHTables = TPCHTable
.map(_.name)
.map {
table =>
val tableDir = getClass.getResource(resourcePath).getFile
val tablePath = new File(tableDir, table).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.createOrReplaceTempView(table)
(table, tableDF)
}
.toMap
}

override protected def sparkConf: SparkConf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ class VeloxTPCHDeltaSuite extends VeloxTPCHSuite {
}

override protected def createTPCHNotNullTables(): Unit = {
TPCHTables = TPCHTableNames.map {
table =>
val tablePath = new File(resourcePath, table).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.write.format("delta").mode("append").saveAsTable(table)
(table, tableDF)
}.toMap
TPCHTables = TPCHTable
.map(_.name)
.map {
table =>
val tablePath = new File(resourcePath, table).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.write.format("delta").mode("append").saveAsTable(table)
(table, tableDF)
}
.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
}

override protected def createTPCHNotNullTables(): Unit = {
TPCHTables = TPCHTableNames.map {
table =>
val tablePath = new File(resourcePath, table).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.write.format("iceberg").mode("append").saveAsTable(table)
(table, tableDF)
}.toMap
TPCHTables = TPCHTable
.map(_.name)
.map {
table =>
val tablePath = new File(resourcePath, table).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.write.format("iceberg").mode("append").saveAsTable(table)
(table, tableDF)
}
.toMap
}

test("iceberg transformer exists") {
Expand Down

0 comments on commit 211fee4

Please sign in to comment.