Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Result mismatch in CollectList when partial sort is involved #8184

Open
NEUpanning opened this issue Dec 9, 2024 · 3 comments
Open
Labels
bug Something isn't working triage

Comments

@NEUpanning
Copy link
Contributor

NEUpanning commented Dec 9, 2024

Backend

VL (Velox)

Bug description

Describe the issue

Reproducing SQL:

CREATE OR REPLACE TEMP VIEW temp_table AS
SELECT * FROM VALUES
  (1, 'a'), (1, 'b'), (1, 'c'),
  (2, 'd'), (2, 'e'), (2, 'f'),
  (3, 'g'), (3, 'h'), (3, 'i')
AS t(id, value);

SELECT id, collect_list(value) AS values_list
FROM (
  SELECT * FROM
  (SELECT id, value
   FROM temp_table
   DISTRIBUTE BY rand())  -- Forces a shuffle
  DISTRIBUTE BY id SORT BY id, value
) t
GROUP BY id;

Results:

  • The vanilla result is deterministic and values_list is sorted by value column:

    id   values_list
    1    ["a", "b", "c"]
    2    ["d", "e", "f"]
    3    ["g", "h", "i"]
    
  • The gluten result is non-deterministic and values_list is not sorted, e.g. :

    id   values_list
    1    ["a", "c", "b"]
    3    ["g", "i", "h"]
    2    ["f", "e", "d"]
    

The gluten physical plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   VeloxColumnarToRowExec
   +- ^(9) HashAggregateTransformer(keys=[id#0], functions=[velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, values_list#22])
      +- ^(9) HashAggregateTransformer(keys=[id#0], functions=[partial_velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, buffer#29])
         +- ^(9) InputIteratorTransformer[id#0, value#1]
            +- ShuffleQueryStage 1
               +- ColumnarExchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM, [id#0, value#1], [id=#1293], [id=#1293], [OUTPUT] List(id:IntegerType, value:StringType), [OUTPUT] List(id:IntegerType, value:StringType)
                  +- VeloxAppendBatches 3276
                     +- ^(8) ProjectExecTransformer [hash(id#0, 42) AS hash_partition_key#31, id#0, value#1]
                        +- ^(8) InputIteratorTransformer[id#0, value#1, _nondeterministic#24]
                           +- ShuffleQueryStage 0
                              +- ColumnarExchange hashpartitioning(_nondeterministic#24, 20), REPARTITION_WITH_NUM, [id#0, value#1, _nondeterministic#24], [id=#1239], [id=#1239], [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType), [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType)
                                 +- VeloxAppendBatches 3276
                                    +- ^(7) ProjectExecTransformer [hash(_nondeterministic#24, 42) AS hash_partition_key#30, id#0, value#1, _nondeterministic#24]
                                       +- ^(7) InputIteratorTransformer[id#0, value#1, _nondeterministic#24]
                                          +- RowToVeloxColumnar
                                             +- LocalTableScan [id#0, value#1, _nondeterministic#24]
+- == Initial Plan ==
   SortAggregate(key=[id#0], functions=[velox_collect_list(value#1)], output=[id#0, values_list#22])
   +- SortAggregate(key=[id#0], functions=[partial_velox_collect_list(value#1)], output=[id#0, buffer#29])
      +- Sort [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM, [id=#1211]
            +- Project [id#0, value#1]
               +- Exchange hashpartitioning(_nondeterministic#24, 20), REPARTITION_WITH_NUM, [id=#1209]
                  +- LocalTableScan [id#0, value#1, _nondeterministic#24]

Even though the collect_list function is non-deterministic, as stated in the documentation, some ETL tasks in our production environment depend on this behavior in vanilla Spark.

Root cause for this issue

We can see the Sort operator is removed through the gluten plan. This change appears to be due to this code snippet: code link.

class ReplaceSingleNode() extends LogLevelUtil with Logging {

    def doReplace(p: SparkPlan): SparkPlan = {
// ....
    case plan: SortAggregateExec =>
      logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
      HashAggregateExecBaseTransformer.from(plan)(SortUtils.dropPartialSort)
// ...
}
object SortUtils {
  def dropPartialSort(plan: SparkPlan): SparkPlan = plan match {
    case RewrittenNodeWall(p) => RewrittenNodeWall(dropPartialSort(p))
    case PartialSortLike(child) => child
    // from pre/post project-pulling
    case ProjectLike(PartialSortLike(ProjectLike(child))) if plan.outputSet == child.outputSet =>
      child
    case ProjectLike(PartialSortLike(child)) => plan.withNewChildren(Seq(child))
    case _ => plan
  }
}          
          

I'm wondering why the partial sort added by SQL 'sort by' needs to be removed for SortAggregateExec. Would it be possible to retain the partial sort operator for resolving this issue?

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

@NEUpanning NEUpanning added bug Something isn't working triage labels Dec 9, 2024
@PHILO-HE
Copy link
Contributor

@NEUpanning, in Gluten sort agg is replaced with hash agg, which makes the prior sort operator unnecessary possibly. So Gluten will remove the sort operator unless it is needed for ensuring ordering requirement.

@PHILO-HE PHILO-HE changed the title Result mismatch in CollectList when partial sort is involved [VL] Result mismatch in CollectList when partial sort is involved Dec 10, 2024
@zhztheplayer
Copy link
Member

zhztheplayer commented Dec 11, 2024

Thank you for the details.

I believe the relevant sort-removal code is now here on main branch. I think we could preserve the sort as long as vanilla Spark plan has it with a hash agg. This requires for a fix.

@NEUpanning
Copy link
Contributor Author

I think we could preserve the sort as long as vanilla Spark plan has it with a hash agg.

@zhztheplayer I see this feature is implemented in gluten 1.2 branch, but main branch doesn't include it for some reason. For this issue, CollectList function is replaced by VeloxCollectList function in logical optimization phase. Here is the spark plan:

== Parsed Logical Plan ==
Aggregate [id#0], [id#0, collect_list(value#1, 0, 0) AS values_list#13]
+- SubqueryAlias t
   +- Project [id#0, value#1]
      +- RepartitionByExpression [_nondeterministic#15], 20
         +- Project [id#0, value#1, rand(5386921442550703776) AS _nondeterministic#15]
            +- Project [id#0, value#1]
               +- SubqueryAlias temp_table
                  +- Project [id#0, value#1]
                     +- SubqueryAlias t
                        +- LocalRelation [id#0, value#1]

== Analyzed Logical Plan ==
id: int, values_list: array<string>
Aggregate [id#0], [id#0, collect_list(value#1, 0, 0) AS values_list#13]
+- SubqueryAlias t
   +- Project [id#0, value#1]
      +- RepartitionByExpression [_nondeterministic#15], 20
         +- Project [id#0, value#1, rand(5386921442550703776) AS _nondeterministic#15]
            +- Project [id#0, value#1]
               +- SubqueryAlias temp_table
                  +- Project [id#0, value#1]
                     +- SubqueryAlias t
                        +- LocalRelation [id#0, value#1]

== Optimized Logical Plan ==
Aggregate [id#0], [id#0, velox_collect_list(value#1) AS values_list#13]
+- Project [id#0, value#1]
   +- RepartitionByExpression [_nondeterministic#15], 20
      +- LocalRelation [id#0, value#1, _nondeterministic#15]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   VeloxColumnarToRowExec
   +- ^(3) HashAggregateTransformer(keys=[id#0], functions=[velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, values_list#13])
      +- ^(3) InputIteratorTransformer[id#0, buffer#20]
         +- CustomShuffleReader coalesced
            +- ShuffleQueryStage 1
               +- ColumnarExchange hashpartitioning(id#0, 20), ENSURE_REQUIREMENTS, [id#0, buffer#20], [id=#1048], [id=#1048], [OUTPUT] List(id:IntegerType, buffer:ArrayType(StringType,false)), [OUTPUT] List(id:IntegerType, buffer:ArrayType(StringType,false))
                  +- VeloxAppendBatches 3276
                     +- ^(2) ProjectExecTransformer [hash(id#0, 42) AS hash_partition_key#22, id#0, buffer#20]
                        +- ^(2) FlushableHashAggregateTransformer(keys=[id#0], functions=[partial_velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, buffer#20])
                           +- ^(2) ProjectExecTransformer [id#0, value#1]
                              +- ^(2) InputIteratorTransformer[id#0, value#1, _nondeterministic#15]
                                 +- ShuffleQueryStage 0
                                    +- ColumnarExchange hashpartitioning(_nondeterministic#15, 20), REPARTITION_WITH_NUM, [id#0, value#1, _nondeterministic#15], [id=#966], [id=#966], [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType), [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType)
                                       +- VeloxAppendBatches 3276
                                          +- ^(1) ProjectExecTransformer [hash(_nondeterministic#15, 42) AS hash_partition_key#21, id#0, value#1, _nondeterministic#15]
                                             +- ^(1) InputIteratorTransformer[id#0, value#1, _nondeterministic#15]
                                                +- RowToVeloxColumnar
                                                   +- LocalTableScan [id#0, value#1, _nondeterministic#15]
+- == Initial Plan ==
   SortAggregate(key=[id#0], functions=[velox_collect_list(value#1)], output=[id#0, values_list#13])
   +- Sort [id#0 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#0, 20), ENSURE_REQUIREMENTS, [id=#936]
         +- SortAggregate(key=[id#0], functions=[partial_velox_collect_list(value#1)], output=[id#0, buffer#20])
            +- Sort [id#0 ASC NULLS FIRST], false, 0
               +- Project [id#0, value#1]
                  +- Exchange hashpartitioning(_nondeterministic#15, 20), REPARTITION_WITH_NUM, [id=#928]
                     +- LocalTableScan [id#0, value#1, _nondeterministic#15]

This leads to Spark using SortAggregateExec
instead of ObjectHashAggregateExec as aggregate operator. So I think the sort also should be preserved if aggregate expressions include VeloxCollectList or VeloxCollectSet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

3 participants