From 76242120f81fac7e92cbe85a6a5505d0e30b4689 Mon Sep 17 00:00:00 2001 From: Joey Date: Fri, 15 Mar 2024 11:48:11 +0800 Subject: [PATCH] [GLUTEN-3378][VL] Feat: Support read iceberg mor table for Velox backend (#4779) Velox add iceberg mor table read support in facebookincubator/velox#7847. This PR supports read iceberg mor table for Velox backend. --- cpp/velox/CMakeLists.txt | 1 + cpp/velox/compute/VeloxPlanConverter.cc | 4 + cpp/velox/compute/WholeStageResultIterator.cc | 44 ++++-- cpp/velox/compute/WholeStageResultIterator.h | 2 + .../compute/iceberg/IcebergPlanConverter.cc | 84 +++++++++++ .../compute/iceberg/IcebergPlanConverter.h | 42 ++++++ cpp/velox/substrait/SubstraitToVeloxPlan.h | 3 + .../substrait/rel/LocalFilesNode.java | 10 +- .../substrait/proto/substrait/algebra.proto | 29 +++- .../rel/IcebergLocalFilesBuilder.java | 17 ++- .../substrait/rel/IcebergLocalFilesNode.java | 110 ++++++++++---- .../source/GlutenIcebergSourceUtil.scala | 28 ++-- .../execution/VeloxIcebergSuite.scala | 140 +++++++++++++++++- 13 files changed, 444 insertions(+), 70 deletions(-) create mode 100644 cpp/velox/compute/iceberg/IcebergPlanConverter.cc create mode 100644 cpp/velox/compute/iceberg/IcebergPlanConverter.h diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index b15fd395ff235..35d05d4426dc0 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -296,6 +296,7 @@ set(VELOX_SRCS compute/VeloxRuntime.cc compute/WholeStageResultIterator.cc compute/VeloxPlanConverter.cc + compute/iceberg/IcebergPlanConverter.cc jni/VeloxJniWrapper.cc jni/JniFileSystem.cc jni/JniUdf.cc diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 8ca9f85cd870f..370655c3b8574 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -20,6 +20,7 @@ #include "compute/ResultIterator.h" #include "config/GlutenConfig.h" +#include "iceberg/IcebergPlanConverter.h" #include "operators/plannodes/RowVectorStream.h" #include "velox/common/file/FileSystems.h" @@ -93,6 +94,9 @@ std::shared_ptr parseScanSplitInfo( case SubstraitFileFormatCase::kText: splitInfo->format = dwio::common::FileFormat::TEXT; break; + case SubstraitFileFormatCase::kIceberg: + splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file, std::move(splitInfo)); + break; default: splitInfo->format = dwio::common::FileFormat::UNKNOWN; break; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 86431819ba948..f645661b7fbbe 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -154,19 +154,37 @@ WholeStageResultIterator::WholeStageResultIterator( auto metadataColumn = metadataColumns[idx]; std::unordered_map> partitionKeys; constructPartitionColumns(partitionKeys, partitionColumn); - auto split = std::make_shared( - kHiveConnectorId, - paths[idx], - format, - starts[idx], - lengths[idx], - partitionKeys, - std::nullopt, - std::unordered_map(), - nullptr, - std::unordered_map(), - 0, - metadataColumn); + std::shared_ptr split; + if (auto icebergSplitInfo = std::dynamic_pointer_cast(scanInfo)) { + // Set Iceberg split. + std::unordered_map customSplitInfo{{"table_format", "hive-iceberg"}}; + auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx]; + split = std::make_shared( + kHiveConnectorId, + paths[idx], + format, + starts[idx], + lengths[idx], + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deleteFiles); + } else { + split = std::make_shared( + kHiveConnectorId, + paths[idx], + format, + starts[idx], + lengths[idx], + partitionKeys, + std::nullopt, + std::unordered_map(), + nullptr, + std::unordered_map(), + 0, + metadataColumn); + } connectorSplits.emplace_back(split); } diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 082cc6397d738..10c1937b78efd 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -17,11 +17,13 @@ #pragma once #include "compute/Runtime.h" +#include "iceberg/IcebergPlanConverter.h" #include "memory/ColumnarBatchIterator.h" #include "memory/VeloxColumnarBatch.h" #include "substrait/SubstraitToVeloxPlan.h" #include "substrait/plan.pb.h" #include "utils/metrics.h" +#include "velox/connectors/hive/iceberg/IcebergSplit.h" #include "velox/core/Config.h" #include "velox/core/PlanNode.h" #include "velox/exec/Task.h" diff --git a/cpp/velox/compute/iceberg/IcebergPlanConverter.cc b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc new file mode 100644 index 0000000000000..07c40e6e1c7b2 --- /dev/null +++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.cc @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "IcebergPlanConverter.h" + +namespace gluten { + +std::shared_ptr IcebergPlanConverter::parseIcebergSplitInfo( + substrait::ReadRel_LocalFiles_FileOrFiles file, + std::shared_ptr splitInfo) { + using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::FileFormatCase; + using SubstraitDeleteFileFormatCase = + ::substrait::ReadRel_LocalFiles_FileOrFiles::IcebergReadOptions::DeleteFile::FileFormatCase; + auto icebergSplitInfo = std::dynamic_pointer_cast(splitInfo) + ? std::dynamic_pointer_cast(splitInfo) + : std::make_shared(*splitInfo); + auto icebergReadOption = file.iceberg(); + switch (icebergReadOption.file_format_case()) { + case SubstraitFileFormatCase::kParquet: + icebergSplitInfo->format = dwio::common::FileFormat::PARQUET; + break; + case SubstraitFileFormatCase::kOrc: + icebergSplitInfo->format = dwio::common::FileFormat::ORC; + break; + default: + icebergSplitInfo->format = dwio::common::FileFormat::UNKNOWN; + break; + } + if (icebergReadOption.delete_files_size() > 0) { + auto deleteFiles = icebergReadOption.delete_files(); + std::vector deletes; + deletes.reserve(icebergReadOption.delete_files_size()); + for (auto i = 0; i < icebergReadOption.delete_files_size(); i++) { + auto deleteFile = icebergReadOption.delete_files().Get(i); + dwio::common::FileFormat format; + FileContent fileContent; + switch (deleteFile.file_format_case()) { + case SubstraitDeleteFileFormatCase::kParquet: + format = dwio::common::FileFormat::PARQUET; + break; + case SubstraitDeleteFileFormatCase::kOrc: + format = dwio::common::FileFormat::ORC; + break; + default: + format = dwio::common::FileFormat::UNKNOWN; + } + switch (deleteFile.filecontent()) { + case ::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_POSITION_DELETES: + fileContent = FileContent::kPositionalDeletes; + break; + case ::substrait::ReadRel_LocalFiles_FileOrFiles_IcebergReadOptions_FileContent_EQUALITY_DELETES: + fileContent = FileContent::kEqualityDeletes; + break; + default: + fileContent = FileContent::kData; + break; + } + deletes.emplace_back(IcebergDeleteFile( + fileContent, deleteFile.filepath(), format, deleteFile.recordcount(), deleteFile.filesize())); + } + icebergSplitInfo->deleteFilesVec.emplace_back(deletes); + } else { + // Add an empty delete files vector to indicate that this data file has no delete file. + icebergSplitInfo->deleteFilesVec.emplace_back(std::vector{}); + } + + return icebergSplitInfo; +} + +} // namespace gluten diff --git a/cpp/velox/compute/iceberg/IcebergPlanConverter.h b/cpp/velox/compute/iceberg/IcebergPlanConverter.h new file mode 100644 index 0000000000000..d634a861fe3a7 --- /dev/null +++ b/cpp/velox/compute/iceberg/IcebergPlanConverter.h @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "substrait/SubstraitToVeloxPlan.h" +#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" + +using namespace facebook::velox::connector::hive::iceberg; + +namespace gluten { +struct IcebergSplitInfo : SplitInfo { + std::vector> deleteFilesVec; + + IcebergSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) { + // Reserve the actual size of the deleteFilesVec. + deleteFilesVec.reserve(splitInfo.paths.capacity()); + } +}; + +class IcebergPlanConverter { + public: + static std::shared_ptr parseIcebergSplitInfo( + substrait::ReadRel_LocalFiles_FileOrFiles file, + std::shared_ptr splitInfo); +}; + +} // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 895c1d24e7687..59d3312cbb610 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -50,6 +50,9 @@ struct SplitInfo { /// The file format of the files to be scanned. dwio::common::FileFormat format; + + /// Make SplitInfo polymorphic + virtual ~SplitInfo() = default; }; /// This class is used to convert the Substrait plan into Velox plan. diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java index e0700ded29c78..852d7558eb786 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java @@ -50,7 +50,7 @@ public enum ReadFileFormat { UnknownFormat() } - private ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat; + protected ReadFileFormat fileFormat = ReadFileFormat.UnknownFormat; private Boolean iterAsInput = false; private StructType fileSchema; private Map fileReadProperties; @@ -112,6 +112,13 @@ public List preferredLocations() { return this.preferredLocations; } + /** + * Data Lake formats require some additional processing to be done on the FileBuilder, such as + * inserting delete files information. Different lake formats should override this method to + * implement their corresponding logic. + */ + protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) {} + public ReadRel.LocalFiles toProtobuf() { ReadRel.LocalFiles.Builder localFilesBuilder = ReadRel.LocalFiles.newBuilder(); // The input is iterator, and the path is in the format of: Iterator:index. @@ -210,6 +217,7 @@ public ReadRel.LocalFiles toProtobuf() { default: break; } + processFileBuilder(fileBuilder); localFilesBuilder.addItems(fileBuilder.build()); } return localFilesBuilder.build(); diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto index 63a0f36ea66b8..e9ed0f5ef4e26 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto @@ -149,6 +149,28 @@ message ReadRel { uint64 max_block_size = 1; NamedStruct schema = 2 [deprecated=true]; } + message IcebergReadOptions { + enum FileContent { + DATA = 0; + POSITION_DELETES = 1; + EQUALITY_DELETES = 2; + } + message DeleteFile { + FileContent fileContent = 1; + string filePath = 2; + uint64 fileSize = 3; + uint64 recordCount = 4; + oneof file_format { + ParquetReadOptions parquet = 5; + OrcReadOptions orc = 6; + } + } + oneof file_format { + ParquetReadOptions parquet = 1; + OrcReadOptions orc = 2; + } + repeated DeleteFile delete_files = 3; + } // File reading options oneof file_format { @@ -159,22 +181,23 @@ message ReadRel { DwrfReadOptions dwrf = 13; TextReadOptions text = 14; JsonReadOptions json = 15; + IcebergReadOptions iceberg = 16; } message partitionColumn { string key = 1; string value = 2; } - repeated partitionColumn partition_columns = 16; + repeated partitionColumn partition_columns = 17; /// File schema - NamedStruct schema = 17; + NamedStruct schema = 18; message metadataColumn { string key = 1; string value = 2; } - repeated metadataColumn metadata_columns = 18; + repeated metadataColumn metadata_columns = 19; } } } diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java index 3452836cfd833..773f3073bbc4c 100644 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java @@ -16,13 +16,12 @@ */ package io.glutenproject.substrait.rel; +import org.apache.iceberg.DeleteFile; + import java.util.List; import java.util.Map; public class IcebergLocalFilesBuilder { - - // TODO: Add makeIcebergLocalFiles for MOR iceberg table - public static IcebergLocalFilesNode makeIcebergLocalFiles( Integer index, List paths, @@ -30,8 +29,16 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles( List lengths, List> partitionColumns, LocalFilesNode.ReadFileFormat fileFormat, - List preferredLocations) { + List preferredLocations, + Map> deleteFilesMap) { return new IcebergLocalFilesNode( - index, paths, starts, lengths, partitionColumns, fileFormat, preferredLocations); + index, + paths, + starts, + lengths, + partitionColumns, + fileFormat, + preferredLocations, + deleteFilesMap); } } diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java index 98cc0d90e8fef..903bd198a5b58 100644 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java @@ -16,40 +16,18 @@ */ package io.glutenproject.substrait.rel; +import io.glutenproject.GlutenConfig; + +import io.substrait.proto.ReadRel; +import org.apache.iceberg.DeleteFile; + import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; public class IcebergLocalFilesNode extends LocalFilesNode { - - class DeleteFile { - private final String path; - private final Integer fileContent; - private final ReadFileFormat fileFormat; - private final Long fileSize; - private final Long recordCount; - private final Map lowerBounds; - private final Map upperBounds; - - DeleteFile( - String path, - Integer fileContent, - ReadFileFormat fileFormat, - Long fileSize, - Long recordCount, - Map lowerBounds, - Map upperBounds) { - this.path = path; - this.fileContent = fileContent; - this.fileFormat = fileFormat; - this.fileSize = fileSize; - this.recordCount = recordCount; - this.lowerBounds = lowerBounds; - this.upperBounds = upperBounds; - } - } - - // TODO: Add delete file support for MOR iceberg table + private final Map> deleteFilesMap; IcebergLocalFilesNode( Integer index, @@ -58,7 +36,8 @@ class DeleteFile { List lengths, List> partitionColumns, ReadFileFormat fileFormat, - List preferredLocations) { + List preferredLocations, + Map> deleteFilesMap) { super( index, paths, @@ -68,5 +47,76 @@ class DeleteFile { new ArrayList<>(), fileFormat, preferredLocations); + this.deleteFilesMap = deleteFilesMap; + } + + @Override + protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) { + List deleteFiles = + deleteFilesMap.getOrDefault(fileBuilder.getUriFile(), Collections.emptyList()); + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.Builder icebergBuilder = + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.newBuilder(); + + switch (fileFormat) { + case ParquetReadFormat: + ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions = + ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder() + .setEnableRowGroupMaxminIndex( + GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex()) + .build(); + icebergBuilder.setParquet(parquetReadOptions); + break; + case OrcReadFormat: + ReadRel.LocalFiles.FileOrFiles.OrcReadOptions orcReadOptions = + ReadRel.LocalFiles.FileOrFiles.OrcReadOptions.newBuilder().build(); + icebergBuilder.setOrc(orcReadOptions); + break; + default: + throw new UnsupportedOperationException( + "Unsupported file format " + fileFormat.name() + " for iceberg data file."); + } + + for (DeleteFile delete : deleteFiles) { + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.DeleteFile.Builder deleteFileBuilder = + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.DeleteFile.newBuilder(); + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent fileContent; + switch (delete.content()) { + case EQUALITY_DELETES: + fileContent = + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent.EQUALITY_DELETES; + break; + case POSITION_DELETES: + fileContent = + ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.FileContent.POSITION_DELETES; + break; + default: + throw new UnsupportedOperationException( + "Unsupported FileCount " + delete.content().name() + " for delete file."); + } + deleteFileBuilder.setFileContent(fileContent); + deleteFileBuilder.setFilePath(delete.path().toString()); + deleteFileBuilder.setFileSize(delete.fileSizeInBytes()); + deleteFileBuilder.setRecordCount(delete.recordCount()); + switch (delete.format()) { + case PARQUET: + ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions parquetReadOptions = + ReadRel.LocalFiles.FileOrFiles.ParquetReadOptions.newBuilder() + .setEnableRowGroupMaxminIndex( + GlutenConfig.getConf().enableParquetRowGroupMaxMinIndex()) + .build(); + deleteFileBuilder.setParquet(parquetReadOptions); + break; + case ORC: + ReadRel.LocalFiles.FileOrFiles.OrcReadOptions orcReadOptions = + ReadRel.LocalFiles.FileOrFiles.OrcReadOptions.newBuilder().build(); + deleteFileBuilder.setOrc(orcReadOptions); + break; + default: + throw new UnsupportedOperationException( + "Unsupported format " + delete.format().name() + " for delete file."); + } + icebergBuilder.addDeleteFiles(deleteFileBuilder); + } + fileBuilder.setIceberg(icebergBuilder); } } diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala index 6607ff3b9539f..74add7a9a5b42 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -24,10 +24,10 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.types.StructType -import org.apache.iceberg.{CombinedScanTask, FileFormat, FileScanTask, ScanTask} +import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat, FileScanTask, ScanTask} import java.lang.{Long => JLong} -import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap} +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} import scala.collection.JavaConverters._ @@ -39,22 +39,21 @@ object GlutenIcebergSourceUtil { val starts = new JArrayList[JLong]() val lengths = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]]() + val deleteFilesMap = new JHashMap[String, JList[DeleteFile]]() var fileFormat = ReadFileFormat.UnknownFormat val tasks = partition.taskGroup[ScanTask]().tasks().asScala asFileScanTask(tasks.toList).foreach { task => - paths.add(task.file().path().toString) + val filePath = task.file().path().toString + paths.add(filePath) starts.add(task.start()) lengths.add(task.length()) partitionColumns.add(getPartitionColumns(task)) - val currentFileFormat = task.file().format() match { - case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat - case FileFormat.ORC => ReadFileFormat.OrcReadFormat - case _ => - throw new UnsupportedOperationException( - "Iceberg Only support parquet and orc file format.") + if (!task.deletes().isEmpty) { + deleteFilesMap.put(filePath, task.deletes()) } + val currentFileFormat = convertFileFormat(task.file().format()) if (fileFormat == ReadFileFormat.UnknownFormat) { fileFormat = currentFileFormat } else if (fileFormat != currentFileFormat) { @@ -73,7 +72,8 @@ object GlutenIcebergSourceUtil { lengths, partitionColumns, fileFormat, - preferredLoc.toList.asJava + preferredLoc.toList.asJava, + deleteFilesMap ) case _ => throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.") @@ -152,4 +152,12 @@ object GlutenIcebergSourceUtil { } partitionColumns } + + def convertFileFormat(icebergFileFormat: FileFormat): ReadFileFormat = + icebergFileFormat match { + case FileFormat.PARQUET => ReadFileFormat.ParquetReadFormat + case FileFormat.ORC => ReadFileFormat.OrcReadFormat + case _ => + throw new UnsupportedOperationException("Iceberg Only support parquet and orc file format.") + } } diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala index 019c602957fe8..6b332641eb8bc 100644 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala @@ -59,15 +59,17 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { } test("iceberg transformer exists") { - spark.sql(""" - |create table iceberg_tb using iceberg as - |(select 1 as col1, 2 as col2, 3 as col3) - |""".stripMargin) + withTable("iceberg_tb") { + spark.sql(""" + |create table iceberg_tb using iceberg as + |(select 1 as col1, 2 as col2, 3 as col3) + |""".stripMargin) - runQueryAndCompare(""" - |select * from iceberg_tb; - |""".stripMargin) { - checkOperatorMatch[IcebergScanTransformer] + runQueryAndCompare(""" + |select * from iceberg_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } } } @@ -314,4 +316,126 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { } } } + + test("iceberg read mor table - delete and update") { + withTable("iceberg_mor_tb") { + withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { + spark.sql(""" + |create table iceberg_mor_tb ( + | id int, + | name string, + | p string + |) using iceberg + |tblproperties ( + | 'format-version' = '2', + | 'write.delete.mode' = 'merge-on-read', + | 'write.update.mode' = 'merge-on-read', + | 'write.merge.mode' = 'merge-on-read' + |) + |partitioned by (p); + |""".stripMargin) + + // Insert some test rows. + spark.sql(""" + |insert into table iceberg_mor_tb + |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'), + | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1'); + |""".stripMargin) + + // Delete row. + spark.sql( + """ + |delete from iceberg_mor_tb where name = 'a1'; + |""".stripMargin + ) + // Update row. + spark.sql( + """ + |update iceberg_mor_tb set name = 'new_a2' where id = 'a2'; + |""".stripMargin + ) + // Delete row again. + spark.sql( + """ + |delete from iceberg_mor_tb where id = 6; + |""".stripMargin + ) + } + runQueryAndCompare(""" + |select * from iceberg_mor_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } + } + } + + test("iceberg read mor table - merge into") { + withTable("iceberg_mor_tb", "merge_into_source_tb") { + withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { + spark.sql(""" + |create table iceberg_mor_tb ( + | id int, + | name string, + | p string + |) using iceberg + |tblproperties ( + | 'format-version' = '2', + | 'write.delete.mode' = 'merge-on-read', + | 'write.update.mode' = 'merge-on-read', + | 'write.merge.mode' = 'merge-on-read' + |) + |partitioned by (p); + |""".stripMargin) + spark.sql(""" + |create table merge_into_source_tb ( + | id int, + | name string, + | p string + |) using iceberg; + |""".stripMargin) + + // Insert some test rows. + spark.sql(""" + |insert into table iceberg_mor_tb + |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'); + |""".stripMargin) + spark.sql(""" + |insert into table merge_into_source_tb + |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'), + | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2'); + |""".stripMargin) + + // Delete row. + spark.sql( + """ + |delete from iceberg_mor_tb where name = 'a1'; + |""".stripMargin + ) + // Update row. + spark.sql( + """ + |update iceberg_mor_tb set name = 'new_a2' where id = 'a2'; + |""".stripMargin + ) + + // Merge into. + spark.sql( + """ + |merge into iceberg_mor_tb t + |using (select * from merge_into_source_tb) s + |on t.id = s.id + |when matched then + | update set t.name = s.name, t.p = s.p + |when not matched then + | insert (id, name, p) values (s.id, s.name, s.p); + |""".stripMargin + ) + } + runQueryAndCompare(""" + |select * from iceberg_mor_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } + } + } }