From f56e60dfa5a215ca8748794cca97f1af43ee1af3 Mon Sep 17 00:00:00 2001 From: "joey.ljy" Date: Fri, 15 Mar 2024 15:22:35 +0800 Subject: [PATCH] Use List to store delete file info --- .../glutenproject/substrait/rel/LocalFilesNode.java | 6 ++++-- .../substrait/rel/IcebergLocalFilesBuilder.java | 4 ++-- .../substrait/rel/IcebergLocalFilesNode.java | 12 +++++------- .../spark/source/GlutenIcebergSourceUtil.scala | 8 +++----- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java index 852d7558eb786..aaece9dbd173f 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/rel/LocalFilesNode.java @@ -22,6 +22,7 @@ import io.substrait.proto.NamedStruct; import io.substrait.proto.ReadRel; import io.substrait.proto.Type; + import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -117,7 +118,8 @@ public List preferredLocations() { * inserting delete files information. Different lake formats should override this method to * implement their corresponding logic. */ - protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) {} + protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder, int index) { + } public ReadRel.LocalFiles toProtobuf() { ReadRel.LocalFiles.Builder localFilesBuilder = ReadRel.LocalFiles.newBuilder(); @@ -217,7 +219,7 @@ public ReadRel.LocalFiles toProtobuf() { default: break; } - processFileBuilder(fileBuilder); + processFileBuilder(fileBuilder, i); localFilesBuilder.addItems(fileBuilder.build()); } return localFilesBuilder.build(); diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java index 773f3073bbc4c..b6ee7058035cc 100644 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesBuilder.java @@ -30,7 +30,7 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles( List> partitionColumns, LocalFilesNode.ReadFileFormat fileFormat, List preferredLocations, - Map> deleteFilesMap) { + List> deleteFilesList) { return new IcebergLocalFilesNode( index, paths, @@ -39,6 +39,6 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles( partitionColumns, fileFormat, preferredLocations, - deleteFilesMap); + deleteFilesList); } } diff --git a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java index 903bd198a5b58..e25cd5a98942e 100644 --- a/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src/main/java/io/glutenproject/substrait/rel/IcebergLocalFilesNode.java @@ -22,12 +22,11 @@ import org.apache.iceberg.DeleteFile; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; public class IcebergLocalFilesNode extends LocalFilesNode { - private final Map> deleteFilesMap; + private final List> deleteFilesList; IcebergLocalFilesNode( Integer index, @@ -37,7 +36,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode { List> partitionColumns, ReadFileFormat fileFormat, List preferredLocations, - Map> deleteFilesMap) { + List> deleteFilesList) { super( index, paths, @@ -47,13 +46,12 @@ public class IcebergLocalFilesNode extends LocalFilesNode { new ArrayList<>(), fileFormat, preferredLocations); - this.deleteFilesMap = deleteFilesMap; + this.deleteFilesList = deleteFilesList; } @Override - protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) { - List deleteFiles = - deleteFilesMap.getOrDefault(fileBuilder.getUriFile(), Collections.emptyList()); + protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder, int index) { + List deleteFiles = deleteFilesList.get(index); ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.Builder icebergBuilder = ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.newBuilder(); diff --git a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala index 74add7a9a5b42..e85c9f58a7835 100644 --- a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala +++ b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala @@ -39,7 +39,7 @@ object GlutenIcebergSourceUtil { val starts = new JArrayList[JLong]() val lengths = new JArrayList[JLong]() val partitionColumns = new JArrayList[JMap[String, String]]() - val deleteFilesMap = new JHashMap[String, JList[DeleteFile]]() + val deleteFilesList = new JArrayList[JList[DeleteFile]]() var fileFormat = ReadFileFormat.UnknownFormat val tasks = partition.taskGroup[ScanTask]().tasks().asScala @@ -50,9 +50,7 @@ object GlutenIcebergSourceUtil { starts.add(task.start()) lengths.add(task.length()) partitionColumns.add(getPartitionColumns(task)) - if (!task.deletes().isEmpty) { - deleteFilesMap.put(filePath, task.deletes()) - } + deleteFilesList.add(task.deletes()); val currentFileFormat = convertFileFormat(task.file().format()) if (fileFormat == ReadFileFormat.UnknownFormat) { fileFormat = currentFileFormat @@ -73,7 +71,7 @@ object GlutenIcebergSourceUtil { partitionColumns, fileFormat, preferredLoc.toList.asJava, - deleteFilesMap + deleteFilesList ) case _ => throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.")