diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala index c6e5ad878ebc..fbd40812a7d5 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala @@ -54,7 +54,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { */ override def genSplitInfo( partition: InputPartition, - partitionSchemas: StructType, + partitionSchema: StructType, fileFormat: ReadFileFormat): SplitInfo = { partition match { case p: GlutenMergeTreePartition => diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala index 6b98de77cc02..80d96e27ad49 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala @@ -59,12 +59,12 @@ class IteratorApiImpl extends IteratorApi with Logging { */ override def genSplitInfo( partition: InputPartition, - partitionSchemas: StructType, + partitionSchema: StructType, fileFormat: ReadFileFormat): SplitInfo = { partition match { case f: FilePartition => val (paths, starts, lengths, partitionColumns) = - constructSplitInfo(partitionSchemas, f.files) + constructSplitInfo(partitionSchema, f.files) val preferredLocations = SoftAffinity.getFilePartitionLocations(paths.asScala.toArray, f.preferredLocations()) LocalFilesBuilder.makeLocalFiles( diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala index 0c8fa7000173..03100cfa6e0f 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/IteratorApi.scala @@ -41,7 +41,7 @@ trait IteratorApi { */ def genSplitInfo( partition: InputPartition, - partitionSchemas: StructType, + partitionSchema: StructType, fileFormat: ReadFileFormat): SplitInfo /** diff --git a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala index 34df6d2a932e..caef2ba50cf1 100644 --- a/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/io/glutenproject/execution/IcebergScanTransformer.scala @@ -43,7 +43,7 @@ class IcebergScanTransformer( override def filterExprs(): Seq[Expression] = Seq.empty - override def getPartitionSchema: StructType = new StructType() + override def getPartitionSchema: StructType = GlutenIcebergSourceUtil.getPartitionSchema(scan) override def getDataSchema: StructType = new StructType() diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala index cf469d8d168e..6607ff3b9539 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -20,7 +20,9 @@ import io.glutenproject.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo} import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.spark.softaffinity.SoftAffinity +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.connector.read.{InputPartition, Scan} +import org.apache.spark.sql.types.StructType import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask} @@ -45,7 +47,7 @@ object GlutenIcebergSourceUtil { paths.add(task.file().path().toString) starts.add(task.start()) lengths.add(task.length()) - partitionColumns.add(new JHashMap[String, String]()) + partitionColumns.add(getPartitionColumns(task)) val currentFileFormat = task.file().format() match { case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat case FileFormat.ORC => ReadFileFormat.OrcReadFormat @@ -93,6 +95,31 @@ object GlutenIcebergSourceUtil { throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.") } + def getPartitionSchema(sparkScan: Scan): StructType = sparkScan match { + case scan: SparkBatchQueryScan => + val tasks = scan.tasks().asScala + asFileScanTask(tasks.toList).foreach { + task => + val spec = task.spec() + if (spec.isPartitioned) { + var partitionSchema = new StructType() + val partitionFields = spec.partitionType().fields().asScala + partitionFields.foreach { + field => + TypeUtil.validatePartitionColumnType(field.`type`().typeId()) + partitionSchema = partitionSchema.add(field.name(), field.`type`().toString) + } + return partitionSchema + } else { + return new StructType() + } + } + throw new UnsupportedOperationException( + "Failed to get partition schema from iceberg SparkBatchQueryScan.") + case _ => + throw new UnsupportedOperationException("Only support iceberg SparkBatchQueryScan.") + } + private def asFileScanTask(tasks: List[ScanTask]): List[FileScanTask] = { if (tasks.forall(_.isFileScanTask)) { tasks.map(_.asFileScanTask()) @@ -103,4 +130,26 @@ object GlutenIcebergSourceUtil { "Only support iceberg CombinedScanTask and FileScanTask.") } } + + private def getPartitionColumns(task: FileScanTask): JHashMap[String, String] = { + val partitionColumns = new JHashMap[String, String]() + val spec = task.spec() + val partition = task.partition() + if (spec.isPartitioned) { + val partitionFields = spec.partitionType().fields().asScala + partitionFields.zipWithIndex.foreach { + case (field, index) => + val partitionValue = partition.get(index, field.`type`().typeId().javaClass()) + val partitionType = field.`type`() + if (partitionValue != null) { + partitionColumns.put( + field.name(), + TypeUtil.getPartitionValueString(partitionType, partitionValue)) + } else { + partitionColumns.put(field.name(), ExternalCatalogUtils.DEFAULT_PARTITION_NAME) + } + } + } + partitionColumns + } } diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/TypeUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/TypeUtil.scala new file mode 100644 index 000000000000..ac98658974fe --- /dev/null +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/TypeUtil.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} + +import org.apache.iceberg.types.Type +import org.apache.iceberg.types.Type.TypeID + +import java.lang.{Long => JLong} +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.time.ZoneOffset + +object TypeUtil { + def validatePartitionColumnType(typeID: TypeID): Unit = typeID match { + case TypeID.BOOLEAN => + case TypeID.INTEGER => + case TypeID.LONG => + case TypeID.FLOAT => + case TypeID.DOUBLE => + case TypeID.DATE => + case TypeID.TIME | TypeID.TIMESTAMP => + case TypeID.STRING => + case TypeID.BINARY => + case TypeID.DECIMAL => + case _ => + throw new UnsupportedOperationException(s"Unsupported partition column type $typeID") + } + + def getPartitionValueString(partitionType: Type, partitionValue: Any): String = { + partitionType.typeId() match { + case TypeID.BINARY => + new String(partitionValue.asInstanceOf[ByteBuffer].array(), StandardCharsets.UTF_8) + case TypeID.DATE => + DateFormatter.apply().format(partitionValue.asInstanceOf[Integer]) + case TypeID.TIMESTAMP | TypeID.TIME => + TimestampFormatter + .getFractionFormatter(ZoneOffset.UTC) + .format(partitionValue.asInstanceOf[JLong]) + case _ => + partitionType.toString + } + } +} diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala index 0af3c875f69a..1c87851c311e 100644 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxTPCHIcebergSuite.scala @@ -18,6 +18,8 @@ package io.glutenproject.execution import org.apache.spark.SparkConf +import org.apache.iceberg.spark.SparkWriteOptions + import java.io.File class VeloxTPCHIcebergSuite extends VeloxTPCHSuite { @@ -51,12 +53,19 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite { table => val tablePath = new File(resourcePath, table).getAbsolutePath val tableDF = spark.read.format(fileFormat).load(tablePath) - tableDF.write.format("iceberg").mode("append").saveAsTable(table) + tableDF.write.format("iceberg").mode("overwrite").saveAsTable(table) (table, tableDF) } .toMap } + override protected def afterAll(): Unit = { + if (TPCHTables != null) { + TPCHTables.keys.foreach(v => spark.sql(s"DROP TABLE IF EXISTS $v")) + } + super.afterAll() + } + test("iceberg transformer exists") { runQueryAndCompare(""" |SELECT @@ -84,3 +93,21 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite { } } } + +class VeloxPartitionedTableTPCHIcebergSuite extends VeloxTPCHIcebergSuite { + override protected def createTPCHNotNullTables(): Unit = { + TPCHTables = TPCHTable.map { + table => + val tablePath = new File(resourcePath, table.name).getAbsolutePath + val tableDF = spark.read.format(fileFormat).load(tablePath) + + tableDF.write + .format("iceberg") + .partitionBy(table.partitionColumns: _*) + .option(SparkWriteOptions.FANOUT_ENABLED, "true") + .mode("overwrite") + .saveAsTable(table.name) + (table.name, tableDF) + }.toMap + } +}