-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-3378][CORE] Move getLocalFilesNode logic to scan transformer #3650
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/oap-project/gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
a745dee
to
aa46284
Compare
Run Gluten Clickhouse CI |
aa46284
to
852596e
Compare
Run Gluten Clickhouse CI |
852596e
to
9ac0137
Compare
Run Gluten Clickhouse CI |
9ac0137
to
ef5dd5a
Compare
Run Gluten Clickhouse CI |
ef5dd5a
to
8ece5c5
Compare
Run Gluten Clickhouse CI |
if (allScanPartitions.exists(_.size != partitionLength)) { | ||
val allScanLocalFilesNodes = basicScanExecTransformers.map(_.getLocalFilesNodes) | ||
val partitionLength = allScanLocalFilesNodes.head.size | ||
if (allScanLocalFilesNodes.exists(_.size != partitionLength)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe extract a private method to encapsulate this logic that contains getLocalFilesNodes
, the assertion of whether the number of partitions is consistent, and the transposition. And doc for this method.
@@ -54,6 +54,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with SupportFormat { | |||
// TODO: Remove this expensive call when CH support scan custom partition location. | |||
def getInputFilePaths: Seq[String] | |||
|
|||
def getLocalFilesNodes: Seq[(java.io.Serializable, Array[String])] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we define a case class for (java.io.Serializable, Array[String])
? To make Serializable
as return type is so wired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because CK is using ExtensionTable
instead of LocalFilesNode
, and their common superclass is Serializable
. Now, a new interface called ReadSplit
is introduced to represent the abstraction of both classes. Additionally, preferredLocations
have been included in the class to avoid using Tuple.
|
||
def getFilePartitionLocations( | ||
filePaths: Array[String], | ||
preferredLocations: Array[String]): Array[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain this definition a little?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, the input of this method was FilePartition
, but the InputPartitions
for lake formats are custom-defined. This method primarily processes FilePartition.files.filePath
and preferredLocations
. Modifying the input type of this method would allow the lake formats to directly reuse this interface.
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
@yma11 @zhztheplayer Could you help review these changes? |
fd2159f
to
4460e2f
Compare
Run Gluten Clickhouse CI |
@zzcclp Could you help review this PR? |
@@ -30,12 +30,13 @@ | |||
import java.util.List; | |||
import java.util.Map; | |||
|
|||
public class LocalFilesNode implements Serializable { | |||
public class LocalFilesNode implements ReadSplit, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a better name instead of ReadSplit
which can show the inheritance relationship here? maybe FilesNode
if there is kind of IceBergFilesNode
/ DeltaFilesNode
exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have also spent time thinking about the name of this interface. It mainly serves as an abstract interface for ExtensionTableNode
and LocalFilesNode
. In the future, both Iceberg and Delta will use LocalFilesNode
, and there will be no IcebergFilesNode
. We think that LocalFiles are split meant for handling by the underlying engine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Then maybe SplitInfo
is more suitable in this case and it has corresponding definition like "SplitInfo" or HiveConnectorSplit
at velox side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yma11 I think it makes sense. I will change it to SplitInfo
.
|
||
private def getReadSplitFromScanTransformer( | ||
basicScanExecTransformers: Seq[BasicScanExecTransformer]): Seq[Seq[ReadSplit]] = { | ||
// If these are two scan transformers, they must have same partitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will there are multi scan transformers? Will they be against same table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can check this test VeloxTPCHV1BhjSuite
, it will trigger multi scan transformer. This situation can occur with a broadcast join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not thinkg bhj have two scan transformers in one stage, the build side should have a broadcast exchange. Bucket table join is a valid case which contain two scan transformers in one stage.
@rui-mo can you also take a review on this PR as it has some substrait module related changes? |
import java.util.List; | ||
|
||
public interface ReadSplit { | ||
List<String> preferredLocations(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add some comments for this class and the member function preferredLocations
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments. Do you think the name ReadSplit
needs to be modified?
Run Gluten Clickhouse CI |
71a7480
to
cbafdc6
Compare
Run Gluten Clickhouse CI |
@@ -336,6 +330,32 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f | |||
|
|||
override protected def withNewChildInternal(newChild: SparkPlan): WholeStageTransformer = | |||
copy(child = newChild, materializeInput = materializeInput)(transformStageId) | |||
|
|||
private def getReadSplitFromScanTransformer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YannByron Add this private function.
@@ -30,12 +30,13 @@ | |||
import java.util.List; | |||
import java.util.Map; | |||
|
|||
public class LocalFilesNode implements Serializable { | |||
public class LocalFilesNode implements ReadSplit, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have also spent time thinking about the name of this interface. It mainly serves as an abstract interface for ExtensionTableNode
and LocalFilesNode
. In the future, both Iceberg and Delta will use LocalFilesNode
, and there will be no IcebergFilesNode
. We think that LocalFiles are split meant for handling by the underlying engine.
|
||
private def getReadSplitFromScanTransformer( | ||
basicScanExecTransformers: Seq[BasicScanExecTransformer]): Seq[Seq[ReadSplit]] = { | ||
// If these are two scan transformers, they must have same partitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can check this test VeloxTPCHV1BhjSuite
, it will trigger multi scan transformer. This situation can occur with a broadcast join.
import java.util.List; | ||
|
||
public interface ReadSplit { | ||
List<String> preferredLocations(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments. Do you think the name ReadSplit
needs to be modified?
cbafdc6
to
5ef217e
Compare
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
GlutenPartition( | ||
index, | ||
substraitPlan.toByteArray, | ||
splitInfos.head.preferredLocations().asScala.toArray) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
splitInfos(i).preferredLocations ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ulysses-you Currently, the GlutenPartition
only has one preferredLocations
. However, within GlutenPartition
, there are multiple SplitInfo
instances, and each SplitInfo
may have a different preferredLocations
. However, the existing code only takes the first preferredLocations
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it's for one partition but for several scans. I think the currently implementation loses some of scan preferredLocations. Shall we use splitInfos.flatMap(_.preferredLocations).distinct
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Merge all preferredLocations
is better. I will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional note: distinct is not need as Spark will count the locations to determine which locations are preferred.
partitionColumns, | ||
fileFormat, | ||
preferredLocations.toList.asJava) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: throw exception for default case
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
What changes were proposed in this pull request?
The Lake format has its own custom partition type, with a processing logic that is different from the existing
FilePartition
andGlutenMergeTreePartition
. In the unified design, the Lake format will have its own scan transformer. In order to ensure that the injection of the Lake format does not affect gluten-core, the process of generatingLocalFilesNode
from handling partitions needs to be moved to the scan transformer, allowing the Lake format's scan transformer to customize the processing logic of the partitions.Introduce ReadSplit interface for ExtensionTableNode and LocalFileNode, so that we can get rid of using java.io.Serializable.
How was this patch tested?
Exists CI.