Skip to content

Commit

Permalink
add partitioned table tpch test case
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Dec 15, 2023
1 parent 5ae51fe commit 9d5e9b8
Showing 1 changed file with 24 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,19 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
table =>
val tablePath = new File(resourcePath, table).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.write.format("iceberg").mode("append").saveAsTable(table)
tableDF.write.format("iceberg").mode("overwrite").saveAsTable(table)
(table, tableDF)
}
.toMap
}

override protected def afterAll(): Unit = {
if (TPCHTables != null) {
TPCHTables.keys.foreach(v => spark.sql(s"DROP TABLE IF EXISTS $v"))
}
super.afterAll()
}

test("iceberg transformer exists") {
runQueryAndCompare("""
|SELECT
Expand All @@ -85,37 +92,22 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
}
}
}
}

test("iceberg partition table") {
withTable("lineitem_p") {
val tablePath = new File(resourcePath, "lineitem").getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.write
.format("iceberg")
.option(SparkWriteOptions.FANOUT_ENABLED, "true")
.mode("append")
.saveAsTable("lineitem_p")
runQueryAndCompare("""
|SELECT
| sum(l_extendedprice * l_discount) AS revenue
|FROM
| lineitem_p
|WHERE
| l_shipdate >= '1994-01-01'
| AND l_shipdate < '1995-01-01'
| AND l_discount BETWEEN.06 - 0.01
| AND.06 + 0.01
| AND l_quantity < 24;
|""".stripMargin) {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[IcebergScanTransformer]
}) == 1)
}
}
}
class VeloxPartitionedTableTPCHIcebergSuite extends VeloxTPCHIcebergSuite {
override protected def createTPCHNotNullTables(): Unit = {
TPCHTables = TPCHTable.map {
table =>
val tablePath = new File(resourcePath, table.name).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)

tableDF.write
.format("iceberg")
.partitionBy(table.partitionColumns: _*)
.option(SparkWriteOptions.FANOUT_ENABLED, "true")
.mode("overwrite")
.saveAsTable(table.name)
(table.name, tableDF)
}.toMap
}
}

0 comments on commit 9d5e9b8

Please sign in to comment.