Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-4213][CORE] Refactoring insertion process of pre/post projection #4245

Closed
wants to merge 12 commits into from

Conversation

liujiayi771
Copy link
Contributor

@liujiayi771 liujiayi771 commented Jan 2, 2024

What changes were proposed in this pull request?

Implement #4213.
Introduced three kinds of Rules.

  • PullOutProject. Pulling out pre-project at the LogicalPlan level. Currently, it only supports the Velox backend and can reduce the number of pre-projects when agg includes distinct.
  • ColumnarPullOutProject (ColumnarPullOutPostProject + ColumnarPullOutPreProject). Pulling out pre/post-project at the SparkPlan level. PullOutProject cannot handle all scenarios (e.g., Aggregate introduced by InjectRuntimeFilter will be executed before PullOutProject, and some Expressions will be generated in Strategy). The missing parts will be handled completely by ColumnarPullOutProject. Some information required for post-project is more easily obtained at the physical plan level, hence it is handled there.
  • GlutenPlanPullOutProject. Handling the case of constructing a Gluten transformer directly in TakeOrderedAndProjectExecTransformer.

Currently, only agg and sort have been incorporated into this framework. In the future, support for operators such as join and window that require pre/post projection will be added.

Next steps:

  1. Support join/window in this framework
  2. Add a rule for row_constructor required by velox backend.

How was this patch tested?

Exists CI.

Copy link

github-actions bot commented Jan 2, 2024

#4213

Copy link

github-actions bot commented Jan 2, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented Jan 2, 2024

Run Gluten Clickhouse CI

@liujiayi771
Copy link
Contributor Author

@zhztheplayer @rui-mo Could you help review? I have already validated this modification on TPCDS. Using this framework to insert pre/post projection can eliminate a significant amount of redundant code in the transformer. The previous approach required many if-else branches based on whether to insert projection and whether it was for validation. It also eliminated the need to construct projection based on an index.

@liujiayi771
Copy link
Contributor Author

@waitinfuture Could you help review?

Copy link

github-actions bot commented Jan 2, 2024

Run Gluten Clickhouse CI

@rui-mo rui-mo requested a review from zzcclp January 2, 2024 06:46
@liujiayi771
Copy link
Contributor Author

Design doc #4213 (comment)

Copy link

github-actions bot commented Jan 2, 2024

Run Gluten Clickhouse CI

Copy link
Contributor

@rui-mo rui-mo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the nice refactor. In the meantime, could you also check if metrics work well? Some relevant code to handle the metrics of pre/post projection could be removed.

expr =>
expr.filter match {
case None | Some(_: Attribute) | Some(_: Literal) =>
case None | Some(_: Attribute) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why literal is removed here?

Copy link
Contributor Author

@liujiayi771 liujiayi771 Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two reasons for this:

  • If the filter condition is a Literal, it can only be of boolean type. Such filters are typically removed during the optimization process in Spark. You can refer to the "EliminateAggregateFilter" Rule in Spark for more information.
  • filter in velox only support FieldAccessTypedExpr, but it is possible that CK supports Literal filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Makes sense.

@rui-mo
Copy link
Contributor

rui-mo commented Jan 2, 2024

Seems there is a PR #3649 proposing the similar refactor. @ulysses-you @liujiayi771 Could you help check on that? Thanks.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Jan 2, 2024

The work done by these two PRs is essentially the same, with the difference being that #3649 modifies the logical plan, while my PR modifies the physical plan, and my PR also support post-projection for agg. For the sort operator, both pre and post projection can be modified in the logical plan. For agg, the pre projection can be modified in the logical plan, but the post projection can only be modified in the physical plan if the native output doesn't match with the resultExpressions in Spark's output.

Initially, I also considered doing it in the logical plan to avoid impacting validation and AQE. I think we can combine both approaches, doing the parts that can be done in the logical plan in the logical rule, but for the sort in TakeOrderedAndProjectExecTransformer, it should be done only in the physical plan. @ulysses-you I didn't notice your PR before. I searched for issues related to project but did not check the pull requests. I would like to hear your opinion, as our approaches are fairly similar.

Copy link

github-actions bot commented Jan 2, 2024

Run Gluten Clickhouse CI

@ulysses-you
Copy link
Contributor

I think the main goal to pull out pre/post project is:

  1. make the transformer plan tree align with native plan tree; e.g., if we have a native project then we must have a project transformer
  2. decouple pre/post project fallback with the original operator; e.g., if we support transform aggregate but does not support post project, then we should only fallback post project
  3. avoid expression multi-evaluation; e.g., t1 join t2 on c1 + 1 = c2, say it's a shuffled hash join then we will evaluate c1 + 1 multi-times, one for shuffle, one for pre-project

One option is that, we can do pull out pre-project at logical side and do pull out post-project at columnar side.

@liujiayi771
Copy link
Contributor Author

@ulysses-you Aggree with you. I can continue to modify the pre-projection part into logical rule if you'd like, or would you prefer to continue working on #3649?

@ulysses-you
Copy link
Contributor

@liujiayi771 it's fine to go ahead in this pr, thank you

@liujiayi771 liujiayi771 marked this pull request as draft January 3, 2024 07:13
@liujiayi771
Copy link
Contributor Author

@ulysses-you I have identified an issue where, if we modify the logical plan, the extendedOperatorOptimizationRules we insert is placed before DecimalAggregates. DecimalAggregates converts sum/avg(decimal attr) into sum/decimal(unscaledValue(decimal attr)), but the unscaledValue cannot be seen in our rule. This results in the required pre-project not being added.

Maybe we should use ExperimentalMethods.extraOptimizations or postHocOptimizationBatches? I currently do not know how to use postHocOptimizationBatches.

@ulysses-you
Copy link
Contributor

sparkSession.experimental.extraOptimizations = sparkSession.experimental.extraOptimizations ++ Seq(yourRule)
Does it work ?

@liujiayi771
Copy link
Contributor Author

sparkSession.experimental.extraOptimizations ++

Yes, this approach will work, but do you think it is reasonable to use ExperimentalMethods? Will Spark remove this class in the future? However, there is no other way to add rule at the end. Currently, it seems that there is no mechanism in Gluten that allows modifying the extraOptimizations in the spark session right after it is launched.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented Jan 3, 2024

@ulysses-you One method I can think of is to add a rule through injectCheckRule, before the optimization step. This rule would only perform the modifications on the sparkSession. However, this approach might be considered as a hack.

case class AddExtraOptimizations(sparkSession: SparkSession) extends (LogicalPlan => Unit) {

  override def apply(plan: LogicalPlan): Unit = {
    sparkSession.experimental.extraOptimizations = sparkSession.experimental.extraOptimizations ++
      Seq(InsertPreProject)
  }
}

@ulysses-you
Copy link
Contributor

I think it's ok, Spark won't remove public interface in general. We can argue that if someone create a pr to remove it.

Copy link

github-actions bot commented Jan 5, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented Jan 5, 2024

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

1 similar comment
Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

And would you like to update the PR description to add some words to summarize the newly added rules? This would help others understand the changes.

(probably including)

  1. ColumnarPullOutProject (ColumnarPullOutPostProject + ColumnarPullOutPreProject)
  2. GlutenPlanPullOutProject
  3. PullOutProject

Comment on lines +50 to +64
/**
* Merge the results of two ValidationResult objects, including combining the reasons message for
* invalid ValidationResult.
* - valid merge valid = valid
* - invalid merge valid = invalid
* - invalid merge invalid = invalid
*/
def merge(first: ValidationResult, second: ValidationResult): ValidationResult = {
if (first.isValid && second.isValid) {
ok
} else {
val reasonStr = first.reason.getOrElse("") + second.reason.getOrElse("")
notOk(reasonStr)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find usage of this method. Am I missing some thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can remove it now.

Comment on lines +772 to +773
val pulledOutSortExec =
ColumnarPullOutProject.getPulledOutPlanLocally[SortExec](sortExec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need for this kind of statement would mean rule AddTransformHintRule is coupled with ColumnarPullOutProject.

So do we have chance to make ColumnarPullOutProject more independent? I think we had the design to allow a rule rely on tags generated by AddTransformHintRule but probably we'd better to avoid AddTransformHintRule from depending on other rules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnarPullOutProject will pull out ProjectExec and need to verify if ProjectExec can be converted to native plan. If we want to decouple ColumnarPullOutProject from TransformHintRule, we can place it before TransformHintRule, which is also feasible. Initially, I implemented it this way, but I encountered an issue where ClickHouse's custom agg #3629 (comment) throws an exception when determining if post-project is needed. It may require ClickHouse's assistance to redesign the API for custom agg and not rely on throwing exceptions in getAttrsIndexForExtensionAggregateExpr for fallback, but instead trigger the fallback logic in doValidationInternal.

We can proceed with the modifications step by step. For now, let's place the validation of ProjectExec within the rule itself. ColumnarPullOutProject will only validate ProjectExec. I understand that there are similar codes in other places in Gluten that tag hints, and we can handle them together later.

Copy link
Member

@zhztheplayer zhztheplayer Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that there are similar codes in other places in Gluten that tag hints, and we can handle them together later.

Actually I think we may allow adding validation tags to the original physical plan in rules other than AddTransformHintRule as of now, just as if multiple rules are doing validation together. However I am not sure whether the code you mentioned above can be a good case so it might still be needed to optimize, I'll take a look as well.

I encountered an issue where ClickHouse's custom agg #3629 (comment) throws an exception when determining if post-project is needed.

This is interesting... So my feeling is we may have to re-think how should we handle backend-specific pre/post project creation code when doing the refactor. Say, if backend A has some specialized conditions to decide whether a project should be pulled out from a plan node, the we'd provide extensibility to have it customized?

Also in the patch, code of the new feature is currently located into several places, including logical optimization, validation (transform hint), and physical optimization (the actual pulling logic). So I slightly feel that the complexity added to Gluten is a little bit higher than what we expected? Do we have chance to reduce? At the same time I am just more worried about coupling of the rules in this PR. Do you think we can add some new methods to backend API to deal with the CH Agg issue you mentioned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part about "custom agg" may not have been expressed clearly. Currently, both CH and Velox require a "post project" process. The logic is the same, which is to convert the output on the native side to a consistent output for Spark. What I mean is that CH throws an exception when retrieving the native output, which requires validation before pulling it out. It doesn't mean that CH "post project" process is different from Velox's and has custom requirements. In fact, it is about the getAttrForAggregateExprs method that retrieves the actual output of the aggregation. Based on this output, a "post-project" is constructed. Velox also has validation logic that throws exceptions. I also hope to include this logic in the doValidationInternal method like this. CH has custom aggregation requirements, and validation is also performed when retrieving the output for custom agg. For example, the CustomSum only supports Final code. Since I am not familiar with the specific logic of other custom agg in CH, I cannot move the validation logic into doValidationInternal like in Velox. It may require CH's developers to redesign this part. However, this part is not essential and can be improved in future development.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I implemented it this way, but I encountered an issue where ClickHouse's custom agg #3629 (comment) throws an exception when determining if post-project is needed.

@liujiayi771 Maybe we could try to call getAttrForAggregateExprs method in doValidationInternal for CH backend. With this issue solved, can we make ColumnarPullOutProject independent?

Copy link

Run Gluten Clickhouse CI

@liujiayi771
Copy link
Contributor Author

@JkSelf Could you also help to take a look?

@@ -52,123 +51,36 @@ case class SortExecTransformer(

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = sortOrder
override def outputOrdering: Seq[SortOrder] = child match {
case project: ProjectExecTransformer if ProjectTypeHint.isPreProject(project) =>
Copy link
Contributor Author

@liujiayi771 liujiayi771 Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf The issue with outputOrdering encountered earlier is currently resolved as this.

Copy link

Run Gluten Clickhouse CI

true
case _ => false
}.isDefined)
case Sort(order, _, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liujiayi771 We have added a sort check in the needPreProject method. However, it appears that the logic for handling sort operators is not being added in this context here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to pullout project for Sort in LogicalPlan rule. But in this way, the outputOrdering issue cannot be solved easily. So I move this logical to SparkPlan rule, and this will not have performance issues like agg. We can remove the Sort case in this place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide more information about the outputOrdering issue you mentioned? Maybe i missed some context. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf You can check this discussion.

// post-projection is needed.
true
}
case _ => false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the sort check here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort is different from Agg, if it has pre-project, it always needs a post-project, so I pullout pre and post project together in ColumnarPullOutPreProject for SortExec.

Copy link
Contributor

@rui-mo rui-mo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work. Added several comments.

Comment on lines +772 to +773
val pulledOutSortExec =
ColumnarPullOutProject.getPulledOutPlanLocally[SortExec](sortExec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I implemented it this way, but I encountered an issue where ClickHouse's custom agg #3629 (comment) throws an exception when determining if post-project is needed.

@liujiayi771 Maybe we could try to call getAttrForAggregateExprs method in doValidationInternal for CH backend. With this issue solved, can we make ColumnarPullOutProject independent?

* This rule will insert a pre-project in the child of operators such as Aggregate, Sort, Join,
* etc., when they involve expressions that need to be evaluated in advance.
*/
case class PullOutProject(session: SparkSession)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this class is named as PullOutProject if it aims to insert a pre-project? I also feel we are lacking some key information in the class descriptions. E.g. PullOutProject works on logical plan level, what cases are covered in this rule, and what are the steps to insert a project.

}
}

object ColumnarPullOutProject extends Rule[SparkPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the newly introduced rules, maybe we can provide more information about their functionality and usage. For this one, especially the difference with PullOutPreProject.

override def apply(plan: SparkPlan): SparkPlan = applyPullOutColumnarPreRules(plan)
}

case class ColumnarPullOutPostProject(validation: Boolean = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

child = preProject
)
newSort.copyTagsFrom(sort)
ProjectExecTransformer(sort.child.output, newSort).fallbackIfInvalid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw for other plan?

}

/** This rule only used for situation that directly create GlutenPlan. */
object GlutenPlanPullOutProject extends Rule[SparkPlan] with PullOutProjectHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does directly create GlutenPlan mean? Better to clarify a bit. It seems only Sort is covered in this rule, can we add the reason?

@@ -234,7 +236,19 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch

override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer =
copy(child = newChild)

def fallbackIfInvalid: SparkPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If validation fails, fallback to vanilla Spark and add NotTransformable tag.

The same functionality should be covered by existing rules. Is it possible to remove the duplicate check here?

.isDefined && plan.getTagValue(TAG).get.isInstanceOf[PRE_PROJECT]
}

def tagPostProject(plan: SparkPlan): Unit = {
Copy link
Contributor

@rui-mo rui-mo Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of project is regarded as post-project? Maybe add a clear definition here. Same for pre-project.

Copy link

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the stale stale label Mar 10, 2024
Copy link

This PR was auto-closed because it has been stalled for 10 days with no activity. Please feel free to reopen if it is still valid. Thanks.

@github-actions github-actions bot closed this Mar 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale stale
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants