Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-3378][VL][FOLLOWUP] Use List to store Iceberg delete files #4971

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public List<String> preferredLocations() {
* inserting delete files information. Different lake formats should override this method to
* implement their corresponding logic.
*/
protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) {}
protected void processFileBuilder(
ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder, int index) {}

public ReadRel.LocalFiles toProtobuf() {
ReadRel.LocalFiles.Builder localFilesBuilder = ReadRel.LocalFiles.newBuilder();
Expand Down Expand Up @@ -217,7 +218,7 @@ public ReadRel.LocalFiles toProtobuf() {
default:
break;
}
processFileBuilder(fileBuilder);
processFileBuilder(fileBuilder, i);
localFilesBuilder.addItems(fileBuilder.build());
}
return localFilesBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
List<Map<String, String>> partitionColumns,
LocalFilesNode.ReadFileFormat fileFormat,
List<String> preferredLocations,
Map<String, List<DeleteFile>> deleteFilesMap) {
List<List<DeleteFile>> deleteFilesList) {
return new IcebergLocalFilesNode(
index,
paths,
Expand All @@ -39,6 +39,6 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
partitionColumns,
fileFormat,
preferredLocations,
deleteFilesMap);
deleteFilesList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import org.apache.iceberg.DeleteFile;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class IcebergLocalFilesNode extends LocalFilesNode {
private final Map<String, List<DeleteFile>> deleteFilesMap;
private final List<List<DeleteFile>> deleteFilesList;

IcebergLocalFilesNode(
Integer index,
Expand All @@ -37,7 +36,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
List<Map<String, String>> partitionColumns,
ReadFileFormat fileFormat,
List<String> preferredLocations,
Map<String, List<DeleteFile>> deleteFilesMap) {
List<List<DeleteFile>> deleteFilesList) {
super(
index,
paths,
Expand All @@ -47,13 +46,12 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
new ArrayList<>(),
fileFormat,
preferredLocations);
this.deleteFilesMap = deleteFilesMap;
this.deleteFilesList = deleteFilesList;
}

@Override
protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) {
List<DeleteFile> deleteFiles =
deleteFilesMap.getOrDefault(fileBuilder.getUriFile(), Collections.emptyList());
protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder, int index) {
List<DeleteFile> deleteFiles = deleteFilesList.get(index);
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.Builder icebergBuilder =
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.newBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object GlutenIcebergSourceUtil {
val starts = new JArrayList[JLong]()
val lengths = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]()
val deleteFilesMap = new JHashMap[String, JList[DeleteFile]]()
val deleteFilesList = new JArrayList[JList[DeleteFile]]()
var fileFormat = ReadFileFormat.UnknownFormat

val tasks = partition.taskGroup[ScanTask]().tasks().asScala
Expand All @@ -50,9 +50,7 @@ object GlutenIcebergSourceUtil {
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(getPartitionColumns(task))
if (!task.deletes().isEmpty) {
deleteFilesMap.put(filePath, task.deletes())
}
deleteFilesList.add(task.deletes());
val currentFileFormat = convertFileFormat(task.file().format())
if (fileFormat == ReadFileFormat.UnknownFormat) {
fileFormat = currentFileFormat
Expand All @@ -73,7 +71,7 @@ object GlutenIcebergSourceUtil {
partitionColumns,
fileFormat,
preferredLoc.toList.asJava,
deleteFilesMap
deleteFilesList
)
case _ =>
throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.")
Expand Down
Loading