Skip to content

Commit

Permalink
[GLUTEN-6501][VL] Fix the missing fileReadProperties when constructin…
Browse files Browse the repository at this point in the history
…g a LocalFilesNode (apache#6503)
  • Loading branch information
kecookier authored and weiting-chen committed Jul 25, 2024
1 parent 25f40d4 commit ad97dea
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo = {
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
val partLists = new JArrayList[String]()
Expand Down Expand Up @@ -183,7 +184,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partitionColumns,
new JArrayList[JMap[String, String]](),
fileFormat,
preferredLocations.toList.asJava
preferredLocations.toList.asJava,
mapAsJavaMap(properties)
)
case _ =>
throw new UnsupportedOperationException(s"Unsupported input partition: $partition.")
Expand All @@ -209,7 +211,6 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
split match {
case filesNode: LocalFilesNode =>
setFileSchemaForLocalFiles(filesNode, scans(i))
filesNode.setFileReadProperties(mapAsJavaMap(scans(i).getProperties))
filesNode.getPaths.forEach(f => files += f)
filesNode.toProtobuf.toByteArray
case extensionTableNode: ExtensionTableNode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo = {
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
partition match {
case f: FilePartition =>
val (
Expand All @@ -78,7 +79,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionColumns,
metadataColumns,
fileFormat,
preferredLocations.toList.asJava)
preferredLocations.toList.asJava,
mapAsJavaMap(properties)
)
case _ =>
throw new UnsupportedOperationException(s"Unsupported input partition.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public static LocalFilesNode makeLocalFiles(
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
LocalFilesNode.ReadFileFormat fileFormat,
List<String> preferredLocations) {
List<String> preferredLocations,
Map<String, String> properties) {
return new LocalFilesNode(
index,
paths,
Expand All @@ -43,7 +44,8 @@ public static LocalFilesNode makeLocalFiles(
partitionColumns,
metadataColumns,
fileFormat,
preferredLocations);
preferredLocations,
properties);
}

public static LocalFilesNode makeLocalFiles(String iterPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public enum ReadFileFormat {
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
ReadFileFormat fileFormat,
List<String> preferredLocations) {
List<String> preferredLocations,
Map<String, String> properties) {
this.index = index;
this.paths.addAll(paths);
this.starts.addAll(starts);
Expand All @@ -78,6 +79,7 @@ public enum ReadFileFormat {
this.partitionColumns.addAll(partitionColumns);
this.metadataColumns.addAll(metadataColumns);
this.preferredLocations.addAll(preferredLocations);
this.fileReadProperties = properties;
}

LocalFilesNode(String iterPath) {
Expand Down Expand Up @@ -109,10 +111,6 @@ private NamedStruct buildNamedStruct() {
return namedStructBuilder.build();
}

public void setFileReadProperties(Map<String, String> fileReadProperties) {
this.fileReadProperties = fileReadProperties;
}

@Override
public List<String> preferredLocations() {
return this.preferredLocations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ trait IteratorApi {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo

/** Generate native row partition. */
def genPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(_, getPartitionSchema, fileFormat, getMetadataColumns.map(_.name)))
.genSplitInfo(
_,
getPartitionSchema,
fileFormat,
getMetadataColumns.map(_.name),
getProperties))
}

override protected def doValidateInternal(): ValidationResult = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iceberg.DeleteFile;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -47,7 +48,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
partitionColumns,
new ArrayList<>(),
fileFormat,
preferredLocations);
preferredLocations,
new HashMap<>());
this.deleteFilesList = deleteFilesList;
}

Expand Down

0 comments on commit ad97dea

Please sign in to comment.