diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala index d562ba3a807b8..c99abac1557ec 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala @@ -54,7 +54,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { */ override def genSplitInfo( partition: InputPartition, - partitionSchemas: StructType, + partitionSchema: StructType, fileFormat: ReadFileFormat): SplitInfo = { partition match { case p: GlutenMergeTreePartition => diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala index 0033fcc9fbd19..91863c10ea179 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala @@ -59,12 +59,12 @@ class IteratorApiImpl extends IteratorApi with Logging { */ override def genSplitInfo( partition: InputPartition, - partitionSchemas: StructType, + partitionSchema: StructType, fileFormat: ReadFileFormat): SplitInfo = { partition match { case f: FilePartition => val (paths, starts, lengths, partitionColumns) = - constructSplitInfo(partitionSchemas, f.files) + constructSplitInfo(partitionSchema, f.files) val preferredLocations = SoftAffinityUtil.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations()) LocalFilesBuilder.makeLocalFiles( diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala index 0c8fa70001734..03100cfa6e0f0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala @@ -41,7 +41,7 @@ trait IteratorApi { */ def genSplitInfo( partition: InputPartition, - partitionSchemas: StructType, + partitionSchema: StructType, fileFormat: ReadFileFormat): SplitInfo /** diff --git a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala index ee4835ddc20e4..7871bc6a27dad 100644 --- a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala @@ -43,7 +43,7 @@ class IcebergScanTransformer( override def filterExprs(): Seq[Expression] = Seq.empty - override def getPartitionSchema: StructType = new StructType() + override def getPartitionSchema: StructType = GlutenIcebergSourceUtil.getPartitionSchema(scan) override def getDataSchema: StructType = new StructType() diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala index 2dcfe832e1575..b865aa68adb68 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -20,7 +20,9 @@ import io.glutenproject.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo} import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.softaffinity.SoftAffinityUtil +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.connector.read.{InputPartition, Scan} +import org.apache.spark.sql.types.StructType import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask} @@ -45,7 +47,7 @@ object GlutenIcebergSourceUtil { paths.add(task.file().path().toString) starts.add(task.start()) lengths.add(task.length()) - partitionColumns.add(new JHashMap[String, String]()) + partitionColumns.add(getPartitionColumns(task)) val currentFileFormat = task.file().format() match { case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat case FileFormat.ORC => ReadFileFormat.OrcReadFormat @@ -93,6 +95,31 @@ object GlutenIcebergSourceUtil { throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.") } + def getPartitionSchema(sparkScan: Scan): StructType = sparkScan match { + case scan: SparkBatchQueryScan => + val tasks = scan.tasks().asScala + asFileScanTask(tasks.toList).foreach { + task => + val spec = task.spec() + if (spec.isPartitioned) { + var partitionSchema = new StructType() + val partitionFields = spec.partitionType().fields().asScala + partitionFields.foreach { + field => + TypeUtil.validatePartitionColumnType(field.`type`().typeId()) + partitionSchema = partitionSchema.add(field.name(), field.`type`().toString) + } + return partitionSchema + } else { + return new StructType() + } + } + throw new UnsupportedOperationException( + "Failed to get partition schema from iceberg SparkBatchQueryScan.") + case _ => + throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.") + } + private def asFileScanTask(tasks: List[ScanTask]): List[FileScanTask] = { if (tasks.forall(_.isFileScanTask)) { tasks.map(_.asFileScanTask()) @@ -103,4 +130,23 @@ object GlutenIcebergSourceUtil { "Only support iceberg CombinedScanTask and FileScanTask.") } } + + private def getPartitionColumns(task: FileScanTask): JHashMap[String, String] = { + val partitionColumns = new JHashMap[String, String]() + val spec = task.spec() + val partition = task.partition() + if (spec.isPartitioned) { + val partitionFields = spec.partitionType().fields().asScala + partitionFields.zipWithIndex.foreach { + case (field, index) => + val partitionValue = partition.get(index, field.`type`().typeId().javaClass()) + if (partitionValue != null) { + partitionColumns.put(field.name(), partitionValue.toString) + } else { + partitionColumns.put(field.name(), ExternalCatalogUtils.DEFAULT_PARTITION_NAME) + } + } + } + partitionColumns + } } diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/TypeUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/TypeUtil.scala new file mode 100644 index 0000000000000..2d9a8e4d09a26 --- /dev/null +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/TypeUtil.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import org.apache.iceberg.types.Type.TypeID + +object TypeUtil { + def validatePartitionColumnType(typeID: TypeID): Unit = typeID match { + case TypeID.BOOLEAN => + case TypeID.INTEGER => + case TypeID.LONG => + case TypeID.FLOAT => + case TypeID.DOUBLE => + case TypeID.DATE => + case TypeID.TIME | TypeID.TIMESTAMP => + case TypeID.STRING => + case TypeID.BINARY => + case TypeID.DECIMAL => + case _ => + throw new UnsupportedOperationException(s"Unsupported partition column type $typeID") + } +}