Skip to content

Commit

Permalink
[GLUTEN-3841][CH] Support spill in 2nd aggregate stage
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)

Fixes: #3841

How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

unit tests

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
  • Loading branch information
lgbo-ustc authored Dec 11, 2023
1 parent 22638f5 commit 71a29be
Show file tree
Hide file tree
Showing 11 changed files with 1,083 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class CHTransformerApi extends TransformerApi with Logging {
nativeConfMap.put(groupBySpillKey, groupBySpillValue.toLong.toString)
}

val maxMemoryUsageKey = settingPrefix + "max_memory_usage";
if (!nativeConfMap.containsKey(maxMemoryUsageKey)) {
val maxMemoryUsageValue = offHeapSize
nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toLong.toString)
}

// Only set default max_bytes_before_external_join for CH when join_algorithm is grace_hash
val joinAlgorithmKey = settingPrefix + "join_algorithm";
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric])
}

object HashAggregateMetricsUpdater {
val INCLUDING_PROCESSORS = Array("AggregatingTransform", "MergingAggregatedTransform")
val CH_PLAN_NODE_NAME = Array("AggregatingTransform", "MergingAggregatedTransform")
val INCLUDING_PROCESSORS = Array(
"AggregatingTransform",
"StreamingAggregatingTransform",
"MergingAggregatedTransform",
"GraceMergingAggregatedTransform")
val CH_PLAN_NODE_NAME = Array(
"AggregatingTransform",
"StreamingAggregatingTransform",
"MergingAggregatedTransform",
"GraceMergingAggregatedTransform")
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans(2).metrics("outputRows").value === 600572)

assert(plans(1).metrics("inputRows").value === 591673)
assert(plans(1).metrics("resizeInputRows").value === 4)
assert(plans(1).metrics("resizeOutputRows").value === 4)
assert(plans(1).metrics("outputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)

Expand All @@ -93,8 +91,6 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans(2).metrics("filesSize").value === 17777735)

assert(plans(1).metrics("inputRows").value === 591673)
assert(plans(1).metrics("resizeInputRows").value === 4)
assert(plans(1).metrics("resizeOutputRows").value === 4)
assert(plans(1).metrics("outputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,14 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
.get(0)
.getProcessors
.get(0)
.getInputRows == 591677)

.getInputRows == 591673)
assert(
nativeMetricsData.metricsDataList
.get(4)
.getSteps
.get(0)
.getProcessors
.get(1)
.get(0)
.getOutputRows == 4)

assert(
Expand Down Expand Up @@ -356,7 +355,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
.getSteps
.get(0)
.getName
.equals("MergingAggregated"))
.equals("GraceMergingAggregatedStep"))
assert(
nativeMetricsDataFinal.metricsDataList.get(1).getSteps.get(1).getName.equals("Expression"))
assert(nativeMetricsDataFinal.metricsDataList.get(2).getName.equals("kProject"))
Expand Down
Loading

0 comments on commit 71a29be

Please sign in to comment.