Skip to content

Commit

Permalink
fix date partition type and add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Dec 11, 2023
1 parent 4bb08c1 commit ff79ef8
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ object GlutenIcebergSourceUtil {
partitionFields.zipWithIndex.foreach {
case (field, index) =>
val partitionValue = partition.get(index, field.`type`().typeId().javaClass())
val partitionType = field.`type`()
if (partitionValue != null) {
partitionColumns.put(field.name(), partitionValue.toString)
partitionColumns.put(
field.name(),
TypeUtil.convertPartitionColumnDataType(partitionType, partitionValue))
} else {
partitionColumns.put(field.name(), ExternalCatalogUtils.DEFAULT_PARTITION_NAME)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@
*/
package org.apache.iceberg.spark.source

import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}

import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID

import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.time.ZoneOffset

object TypeUtil {
def validatePartitionColumnType(typeID: TypeID): Unit = typeID match {
case TypeID.BOOLEAN =>
Expand All @@ -33,4 +41,19 @@ object TypeUtil {
case _ =>
throw new UnsupportedOperationException(s"Unsupported partition column type $typeID")
}

def convertPartitionColumnDataType(icebergType: Type, partitionValue: Any): String = {
icebergType.typeId() match {
case TypeID.BINARY =>
new String(partitionValue.asInstanceOf[ByteBuffer].array(), StandardCharsets.UTF_8)
case TypeID.DATE =>
DateFormatter.apply().format(partitionValue.asInstanceOf[Integer])
case TypeID.TIMESTAMP | TypeID.TIME =>
TimestampFormatter
.getFractionFormatter(ZoneOffset.UTC)
.format(partitionValue.asInstanceOf[JLong])
case _ =>
icebergType.toString
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,54 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
}
}
}

test("iceberg partition table") {
withTable("lineitem_p") {
spark.sql("""
|CREATE TABLE lineitem_p (
| l_orderkey BIGINT,
| l_partkey BIGINT,
| l_suppkey BIGINT,
| l_linenumber INT,
| l_quantity DECIMAL(12,2),
| l_extendedprice DECIMAL(12,2),
| l_discount DECIMAL(12,2),
| l_tax DECIMAL(12,2),
| l_returnflag STRING,
| l_linestatus STRING,
| l_commitdate DATE,
| l_receiptdate DATE,
| l_shipinstruct STRING,
| l_shipmode STRING,
| l_comment STRING,
| l_shipdate DATE)
|USING iceberg
|PARTITIONED BY (l_shipdate);
|""".stripMargin)
val tablePath = new File(resourcePath, "lineitem").getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.write.format("iceberg").mode("append").saveAsTable("lineitem_p")
runQueryAndCompare("""
|SELECT
| sum(l_extendedprice * l_discount) AS revenue
|FROM
| lineitem_p
|WHERE
| l_shipdate >= '1994-01-01'
| AND l_shipdate < '1995-01-01'
| AND l_discount BETWEEN.06 - 0.01
| AND.06 + 0.01
| AND l_quantity < 24;
|""".stripMargin) {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[IcebergScanTransformer]
}) == 1)
}
}
}
}
}

0 comments on commit ff79ef8

Please sign in to comment.