Skip to content

Commit

Permalink
[GLUTEN-3378][VL] Iceberg support set partition columns in split info (
Browse files Browse the repository at this point in the history
…#3987)

* iceberg support set partition columns in split info

* fix date partition type and add test case

fix

* use iceberg FANOUT_ENABLED option

* add partitioned table tpch test case
  • Loading branch information
liujiayi771 authored Dec 19, 2023
1 parent 04f9b3a commit 203e715
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
*/
override def genSplitInfo(
partition: InputPartition,
partitionSchemas: StructType,
partitionSchema: StructType,
fileFormat: ReadFileFormat): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class IteratorApiImpl extends IteratorApi with Logging {
*/
override def genSplitInfo(
partition: InputPartition,
partitionSchemas: StructType,
partitionSchema: StructType,
fileFormat: ReadFileFormat): SplitInfo = {
partition match {
case f: FilePartition =>
val (paths, starts, lengths, partitionColumns) =
constructSplitInfo(partitionSchemas, f.files)
constructSplitInfo(partitionSchema, f.files)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations())
LocalFilesBuilder.makeLocalFiles(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait IteratorApi {
*/
def genSplitInfo(
partition: InputPartition,
partitionSchemas: StructType,
partitionSchema: StructType,
fileFormat: ReadFileFormat): SplitInfo

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class IcebergScanTransformer(

override def filterExprs(): Seq[Expression] = Seq.empty

override def getPartitionSchema: StructType = new StructType()
override def getPartitionSchema: StructType = GlutenIcebergSourceUtil.getPartitionSchema(scan)

override def getDataSchema: StructType = new StructType()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import io.glutenproject.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo}
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.types.StructType

import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask}

Expand All @@ -45,7 +47,7 @@ object GlutenIcebergSourceUtil {
paths.add(task.file().path().toString)
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(new JHashMap[String, String]())
partitionColumns.add(getPartitionColumns(task))
val currentFileFormat = task.file().format() match {
case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat
case FileFormat.ORC => ReadFileFormat.OrcReadFormat
Expand Down Expand Up @@ -93,6 +95,31 @@ object GlutenIcebergSourceUtil {
throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.")
}

def getPartitionSchema(sparkScan: Scan): StructType = sparkScan match {
case scan: SparkBatchQueryScan =>
val tasks = scan.tasks().asScala
asFileScanTask(tasks.toList).foreach {
task =>
val spec = task.spec()
if (spec.isPartitioned) {
var partitionSchema = new StructType()
val partitionFields = spec.partitionType().fields().asScala
partitionFields.foreach {
field =>
TypeUtil.validatePartitionColumnType(field.`type`().typeId())
partitionSchema = partitionSchema.add(field.name(), field.`type`().toString)
}
return partitionSchema
} else {
return new StructType()
}
}
throw new UnsupportedOperationException(
"Failed to get partition schema from iceberg SparkBatchQueryScan.")
case _ =>
throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.")
}

private def asFileScanTask(tasks: List[ScanTask]): List[FileScanTask] = {
if (tasks.forall(_.isFileScanTask)) {
tasks.map(_.asFileScanTask())
Expand All @@ -103,4 +130,26 @@ object GlutenIcebergSourceUtil {
"Only support iceberg CombinedScanTask and FileScanTask.")
}
}

private def getPartitionColumns(task: FileScanTask): JHashMap[String, String] = {
val partitionColumns = new JHashMap[String, String]()
val spec = task.spec()
val partition = task.partition()
if (spec.isPartitioned) {
val partitionFields = spec.partitionType().fields().asScala
partitionFields.zipWithIndex.foreach {
case (field, index) =>
val partitionValue = partition.get(index, field.`type`().typeId().javaClass())
val partitionType = field.`type`()
if (partitionValue != null) {
partitionColumns.put(
field.name(),
TypeUtil.getPartitionValueString(partitionType, partitionValue))
} else {
partitionColumns.put(field.name(), ExternalCatalogUtils.DEFAULT_PARTITION_NAME)
}
}
}
partitionColumns
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iceberg.spark.source

import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}

import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID

import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.time.ZoneOffset

object TypeUtil {
def validatePartitionColumnType(typeID: TypeID): Unit = typeID match {
case TypeID.BOOLEAN =>
case TypeID.INTEGER =>
case TypeID.LONG =>
case TypeID.FLOAT =>
case TypeID.DOUBLE =>
case TypeID.DATE =>
case TypeID.TIME | TypeID.TIMESTAMP =>
case TypeID.STRING =>
case TypeID.BINARY =>
case TypeID.DECIMAL =>
case _ =>
throw new UnsupportedOperationException(s"Unsupported partition column type $typeID")
}

def getPartitionValueString(partitionType: Type, partitionValue: Any): String = {
partitionType.typeId() match {
case TypeID.BINARY =>
new String(partitionValue.asInstanceOf[ByteBuffer].array(), StandardCharsets.UTF_8)
case TypeID.DATE =>
DateFormatter.apply().format(partitionValue.asInstanceOf[Integer])
case TypeID.TIMESTAMP | TypeID.TIME =>
TimestampFormatter
.getFractionFormatter(ZoneOffset.UTC)
.format(partitionValue.asInstanceOf[JLong])
case _ =>
partitionType.toString
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package io.glutenproject.execution

import org.apache.spark.SparkConf

import org.apache.iceberg.spark.SparkWriteOptions

import java.io.File

class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
Expand Down Expand Up @@ -51,12 +53,19 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
table =>
val tablePath = new File(resourcePath, table).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)
tableDF.write.format("iceberg").mode("append").saveAsTable(table)
tableDF.write.format("iceberg").mode("overwrite").saveAsTable(table)
(table, tableDF)
}
.toMap
}

override protected def afterAll(): Unit = {
if (TPCHTables != null) {
TPCHTables.keys.foreach(v => spark.sql(s"DROP TABLE IF EXISTS $v"))
}
super.afterAll()
}

test("iceberg transformer exists") {
runQueryAndCompare("""
|SELECT
Expand Down Expand Up @@ -84,3 +93,21 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
}
}
}

class VeloxPartitionedTableTPCHIcebergSuite extends VeloxTPCHIcebergSuite {
override protected def createTPCHNotNullTables(): Unit = {
TPCHTables = TPCHTable.map {
table =>
val tablePath = new File(resourcePath, table.name).getAbsolutePath
val tableDF = spark.read.format(fileFormat).load(tablePath)

tableDF.write
.format("iceberg")
.partitionBy(table.partitionColumns: _*)
.option(SparkWriteOptions.FANOUT_ENABLED, "true")
.mode("overwrite")
.saveAsTable(table.name)
(table.name, tableDF)
}.toMap
}
}

0 comments on commit 203e715

Please sign in to comment.