Skip to content

Commit

Permalink
Use List to store delete file info
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Mar 19, 2024
1 parent a1e0367 commit f56e60d
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.substrait.proto.NamedStruct;
import io.substrait.proto.ReadRel;
import io.substrait.proto.Type;

import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -117,7 +118,8 @@ public List<String> preferredLocations() {
* inserting delete files information. Different lake formats should override this method to
* implement their corresponding logic.
*/
protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) {}
protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder, int index) {
}

public ReadRel.LocalFiles toProtobuf() {
ReadRel.LocalFiles.Builder localFilesBuilder = ReadRel.LocalFiles.newBuilder();
Expand Down Expand Up @@ -217,7 +219,7 @@ public ReadRel.LocalFiles toProtobuf() {
default:
break;
}
processFileBuilder(fileBuilder);
processFileBuilder(fileBuilder, i);
localFilesBuilder.addItems(fileBuilder.build());
}
return localFilesBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
List<Map<String, String>> partitionColumns,
LocalFilesNode.ReadFileFormat fileFormat,
List<String> preferredLocations,
Map<String, List<DeleteFile>> deleteFilesMap) {
List<List<DeleteFile>> deleteFilesList) {
return new IcebergLocalFilesNode(
index,
paths,
Expand All @@ -39,6 +39,6 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
partitionColumns,
fileFormat,
preferredLocations,
deleteFilesMap);
deleteFilesList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import org.apache.iceberg.DeleteFile;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class IcebergLocalFilesNode extends LocalFilesNode {
private final Map<String, List<DeleteFile>> deleteFilesMap;
private final List<List<DeleteFile>> deleteFilesList;

IcebergLocalFilesNode(
Integer index,
Expand All @@ -37,7 +36,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
List<Map<String, String>> partitionColumns,
ReadFileFormat fileFormat,
List<String> preferredLocations,
Map<String, List<DeleteFile>> deleteFilesMap) {
List<List<DeleteFile>> deleteFilesList) {
super(
index,
paths,
Expand All @@ -47,13 +46,12 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
new ArrayList<>(),
fileFormat,
preferredLocations);
this.deleteFilesMap = deleteFilesMap;
this.deleteFilesList = deleteFilesList;
}

@Override
protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder) {
List<DeleteFile> deleteFiles =
deleteFilesMap.getOrDefault(fileBuilder.getUriFile(), Collections.emptyList());
protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder fileBuilder, int index) {
List<DeleteFile> deleteFiles = deleteFilesList.get(index);
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.Builder icebergBuilder =
ReadRel.LocalFiles.FileOrFiles.IcebergReadOptions.newBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object GlutenIcebergSourceUtil {
val starts = new JArrayList[JLong]()
val lengths = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]()
val deleteFilesMap = new JHashMap[String, JList[DeleteFile]]()
val deleteFilesList = new JArrayList[JList[DeleteFile]]()
var fileFormat = ReadFileFormat.UnknownFormat

val tasks = partition.taskGroup[ScanTask]().tasks().asScala
Expand All @@ -50,9 +50,7 @@ object GlutenIcebergSourceUtil {
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(getPartitionColumns(task))
if (!task.deletes().isEmpty) {
deleteFilesMap.put(filePath, task.deletes())
}
deleteFilesList.add(task.deletes());
val currentFileFormat = convertFileFormat(task.file().format())
if (fileFormat == ReadFileFormat.UnknownFormat) {
fileFormat = currentFileFormat
Expand All @@ -73,7 +71,7 @@ object GlutenIcebergSourceUtil {
partitionColumns,
fileFormat,
preferredLoc.toList.asJava,
deleteFilesMap
deleteFilesList
)
case _ =>
throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.")
Expand Down

0 comments on commit f56e60d

Please sign in to comment.