Skip to content

Commit

Permalink
iceberg support set partition columns in split info
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Dec 9, 2023
1 parent a935707 commit 4bb08c1
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
*/
override def genSplitInfo(
partition: InputPartition,
partitionSchemas: StructType,
partitionSchema: StructType,
fileFormat: ReadFileFormat): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class IteratorApiImpl extends IteratorApi with Logging {
*/
override def genSplitInfo(
partition: InputPartition,
partitionSchemas: StructType,
partitionSchema: StructType,
fileFormat: ReadFileFormat): SplitInfo = {
partition match {
case f: FilePartition =>
val (paths, starts, lengths, partitionColumns) =
constructSplitInfo(partitionSchemas, f.files)
constructSplitInfo(partitionSchema, f.files)
val preferredLocations =
SoftAffinityUtil.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations())
LocalFilesBuilder.makeLocalFiles(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait IteratorApi {
*/
def genSplitInfo(
partition: InputPartition,
partitionSchemas: StructType,
partitionSchema: StructType,
fileFormat: ReadFileFormat): SplitInfo

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class IcebergScanTransformer(

override def filterExprs(): Seq[Expression] = Seq.empty

override def getPartitionSchema: StructType = new StructType()
override def getPartitionSchema: StructType = GlutenIcebergSourceUtil.getPartitionSchema(scan)

override def getDataSchema: StructType = new StructType()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import io.glutenproject.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo}
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.softaffinity.SoftAffinityUtil
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.types.StructType

import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask}

Expand All @@ -45,7 +47,7 @@ object GlutenIcebergSourceUtil {
paths.add(task.file().path().toString)
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(new JHashMap[String, String]())
partitionColumns.add(getPartitionColumns(task))
val currentFileFormat = task.file().format() match {
case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat
case FileFormat.ORC => ReadFileFormat.OrcReadFormat
Expand Down Expand Up @@ -93,6 +95,31 @@ object GlutenIcebergSourceUtil {
throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.")
}

def getPartitionSchema(sparkScan: Scan): StructType = sparkScan match {
case scan: SparkBatchQueryScan =>
val tasks = scan.tasks().asScala
asFileScanTask(tasks.toList).foreach {
task =>
val spec = task.spec()
if (spec.isPartitioned) {
var partitionSchema = new StructType()
val partitionFields = spec.partitionType().fields().asScala
partitionFields.foreach {
field =>
TypeUtil.validatePartitionColumnType(field.`type`().typeId())
partitionSchema = partitionSchema.add(field.name(), field.`type`().toString)
}
return partitionSchema
} else {
return new StructType()
}
}
throw new UnsupportedOperationException(
"Failed to get partition schema from iceberg SparkBatchQueryScan.")
case _ =>
throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.")
}

private def asFileScanTask(tasks: List[ScanTask]): List[FileScanTask] = {
if (tasks.forall(_.isFileScanTask)) {
tasks.map(_.asFileScanTask())
Expand All @@ -103,4 +130,23 @@ object GlutenIcebergSourceUtil {
"Only support iceberg CombinedScanTask and FileScanTask.")
}
}

private def getPartitionColumns(task: FileScanTask): JHashMap[String, String] = {
val partitionColumns = new JHashMap[String, String]()
val spec = task.spec()
val partition = task.partition()
if (spec.isPartitioned) {
val partitionFields = spec.partitionType().fields().asScala
partitionFields.zipWithIndex.foreach {
case (field, index) =>
val partitionValue = partition.get(index, field.`type`().typeId().javaClass())
if (partitionValue != null) {
partitionColumns.put(field.name(), partitionValue.toString)
} else {
partitionColumns.put(field.name(), ExternalCatalogUtils.DEFAULT_PARTITION_NAME)
}
}
}
partitionColumns
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iceberg.spark.source

import org.apache.iceberg.types.Type.TypeID

object TypeUtil {
def validatePartitionColumnType(typeID: TypeID): Unit = typeID match {
case TypeID.BOOLEAN =>
case TypeID.INTEGER =>
case TypeID.LONG =>
case TypeID.FLOAT =>
case TypeID.DOUBLE =>
case TypeID.DATE =>
case TypeID.TIME | TypeID.TIMESTAMP =>
case TypeID.STRING =>
case TypeID.BINARY =>
case TypeID.DECIMAL =>
case _ =>
throw new UnsupportedOperationException(s"Unsupported partition column type $typeID")
}
}

0 comments on commit 4bb08c1

Please sign in to comment.