From ec8e5eaa1fab4f7f55e9bdb66137d97ed757106d Mon Sep 17 00:00:00 2001 From: zhaokuo Date: Sat, 20 Jul 2024 11:42:17 +0800 Subject: [PATCH] [GLUTEN-6501][VL] Fix the missing fileReadProperties when constructing a LocalFilesNode (#6503) --- .../gluten/backendsapi/clickhouse/CHIteratorApi.scala | 7 ++++--- .../gluten/backendsapi/velox/VeloxIteratorApi.scala | 7 +++++-- .../apache/gluten/substrait/rel/LocalFilesBuilder.java | 6 ++++-- .../org/apache/gluten/substrait/rel/LocalFilesNode.java | 8 +++----- .../scala/org/apache/gluten/backendsapi/IteratorApi.scala | 3 ++- .../gluten/execution/BasicScanExecTransformer.scala | 7 ++++++- .../gluten/substrait/rel/IcebergLocalFilesNode.java | 4 +++- 7 files changed, 27 insertions(+), 15 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 6c86583f4c7c..cd829d04f2cf 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -122,7 +122,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partition: InputPartition, partitionSchema: StructType, fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String]): SplitInfo = { + metadataColumnNames: Seq[String], + properties: Map[String, String]): SplitInfo = { partition match { case p: GlutenMergeTreePartition => val partLists = new JArrayList[String]() @@ -183,7 +184,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partitionColumns, new JArrayList[JMap[String, String]](), fileFormat, - preferredLocations.toList.asJava + preferredLocations.toList.asJava, + mapAsJavaMap(properties) ) case _ => throw new UnsupportedOperationException(s"Unsupported input partition: $partition.") @@ -209,7 +211,6 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { split match { case filesNode: LocalFilesNode => setFileSchemaForLocalFiles(filesNode, scans(i)) - filesNode.setFileReadProperties(mapAsJavaMap(scans(i).getProperties)) filesNode.getPaths.forEach(f => files += f) filesNode.toProtobuf.toByteArray case extensionTableNode: ExtensionTableNode => diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 1c7f913481ec..d8355e1c419f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -54,7 +54,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { partition: InputPartition, partitionSchema: StructType, fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String]): SplitInfo = { + metadataColumnNames: Seq[String], + properties: Map[String, String]): SplitInfo = { partition match { case f: FilePartition => val ( @@ -78,7 +79,9 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionColumns, metadataColumns, fileFormat, - preferredLocations.toList.asJava) + preferredLocations.toList.asJava, + mapAsJavaMap(properties) + ) case _ => throw new UnsupportedOperationException(s"Unsupported input partition.") } diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java index 7e085f81f4e6..a58f5e043503 100644 --- a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesBuilder.java @@ -32,7 +32,8 @@ public static LocalFilesNode makeLocalFiles( List> partitionColumns, List> metadataColumns, LocalFilesNode.ReadFileFormat fileFormat, - List preferredLocations) { + List preferredLocations, + Map properties) { return new LocalFilesNode( index, paths, @@ -43,7 +44,8 @@ public static LocalFilesNode makeLocalFiles( partitionColumns, metadataColumns, fileFormat, - preferredLocations); + preferredLocations, + properties); } public static LocalFilesNode makeLocalFiles(String iterPath) { diff --git a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index fa9f3d51612b..172a6e8cca69 100644 --- a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -67,7 +67,8 @@ public enum ReadFileFormat { List> partitionColumns, List> metadataColumns, ReadFileFormat fileFormat, - List preferredLocations) { + List preferredLocations, + Map properties) { this.index = index; this.paths.addAll(paths); this.starts.addAll(starts); @@ -78,6 +79,7 @@ public enum ReadFileFormat { this.partitionColumns.addAll(partitionColumns); this.metadataColumns.addAll(metadataColumns); this.preferredLocations.addAll(preferredLocations); + this.fileReadProperties = properties; } LocalFilesNode(String iterPath) { @@ -109,10 +111,6 @@ private NamedStruct buildNamedStruct() { return namedStructBuilder.build(); } - public void setFileReadProperties(Map fileReadProperties) { - this.fileReadProperties = fileReadProperties; - } - @Override public List preferredLocations() { return this.preferredLocations; diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 495b91c50757..b78064973123 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -36,7 +36,8 @@ trait IteratorApi { partition: InputPartition, partitionSchema: StructType, fileFormat: ReadFileFormat, - metadataColumnNames: Seq[String]): SplitInfo + metadataColumnNames: Seq[String], + properties: Map[String, String]): SplitInfo /** Generate native row partition. */ def genPartitions( diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 99f145eeab1c..04697280d799 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -70,7 +70,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { partitions.map( BackendsApiManager.getIteratorApiInstance - .genSplitInfo(_, getPartitionSchema, fileFormat, getMetadataColumns.map(_.name))) + .genSplitInfo( + _, + getPartitionSchema, + fileFormat, + getMetadataColumns.map(_.name), + getProperties)) } override protected def doValidateInternal(): ValidationResult = { diff --git a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java index ba6b0ac4a029..398bdbdb5f03 100644 --- a/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java +++ b/gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java @@ -22,6 +22,7 @@ import org.apache.iceberg.DeleteFile; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,7 +48,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode { partitionColumns, new ArrayList<>(), fileFormat, - preferredLocations); + preferredLocations, + new HashMap<>()); this.deleteFilesList = deleteFilesList; }