Skip to content

Commit

Permalink
support memory sort shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed May 28, 2024
1 parent e6427b6 commit ee3ecb5
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ public long makeForRSS(
String hashAlgorithm,
Object pusher,
boolean throwIfMemoryExceed,
boolean flushBlockBufferBeforeEvict) {
boolean flushBlockBufferBeforeEvict,
boolean forceExternalSort,
boolean forceMemorySort
) {
return nativeMakeForRSS(
part.getShortName(),
part.getNumPartitions(),
Expand All @@ -86,7 +89,9 @@ public long makeForRSS(
hashAlgorithm,
pusher,
throwIfMemoryExceed,
flushBlockBufferBeforeEvict);
flushBlockBufferBeforeEvict,
forceExternalSort,
forceMemorySort);
}

public native long nativeMake(
Expand Down Expand Up @@ -124,7 +129,9 @@ public native long nativeMakeForRSS(
String hashAlgorithm,
Object pusher,
boolean throwIfMemoryExceed,
boolean flushBlockBufferBeforeEvict);
boolean flushBlockBufferBeforeEvict,
boolean forceSort,
boolean forceMemorySort);

public native void split(long splitterId, long block);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,6 @@ class CHTransformerApi extends TransformerApi with Logging {
val offHeapSize =
nativeConfMap.getOrDefault("spark.gluten.memory.offHeap.size.in.bytes", "0").toLong
if (offHeapSize > 0) {
// Only set default max_bytes_before_external_sort for CH when it is not set explicitly.
val sortSpillKey = settingPrefix + "max_bytes_before_external_sort";
if (!nativeConfMap.containsKey(sortSpillKey)) {
val sortSpillValue = offHeapSize * 0.5
nativeConfMap.put(sortSpillKey, sortSpillValue.toLong.toString)
}

// Only set default max_bytes_before_external_group_by for CH when it is not set explicitly.
val groupBySpillKey = settingPrefix + "max_bytes_before_external_group_by";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package org.apache.gluten.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.CoalescedPartitionSpec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, AQEShuffleReadExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

class GlutenClickHouseColumnarSortShuffleAQESuite
class GlutenClickHouseColumnarExternalSortShuffleSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {

Expand All @@ -36,29 +35,11 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.forceSortShuffle", "true")
.set("spark.gluten.sql.columnar.backend.ch.forceExternalSortShuffle", "true")
}

test("TPCH Q1") {
runTPCHQuery(1) {
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])

val colCustomShuffleReaderExecs = collect(df.queryExecution.executedPlan) {
case csr: AQEShuffleReadExec => csr
}
assert(colCustomShuffleReaderExecs.size == 2)
val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0)
.partitionSpecs(0)
.asInstanceOf[CoalescedPartitionSpec]
assert(coalescedPartitionSpec0.startReducerIndex == 0)
assert(coalescedPartitionSpec0.endReducerIndex == 5)
val coalescedPartitionSpec1 = colCustomShuffleReaderExecs(1)
.partitionSpecs(0)
.asInstanceOf[CoalescedPartitionSpec]
assert(coalescedPartitionSpec1.startReducerIndex == 0)
assert(coalescedPartitionSpec1.endReducerIndex == 5)
}
runTPCHQuery(1) { df => }
}

test("TPCH Q2") {
Expand Down Expand Up @@ -98,14 +79,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}

test("TPCH Q11") {
runTPCHQuery(11) {
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val adaptiveSparkPlanExec = collectWithSubqueries(df.queryExecution.executedPlan) {
case adaptive: AdaptiveSparkPlanExec => adaptive
}
assert(adaptiveSparkPlanExec.size == 2)
}
runTPCHQuery(11) { df => }
}

test("TPCH Q12") {
Expand All @@ -121,14 +95,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}

test("TPCH Q15") {
runTPCHQuery(15) {
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val adaptiveSparkPlanExec = collectWithSubqueries(df.queryExecution.executedPlan) {
case adaptive: AdaptiveSparkPlanExec => adaptive
}
assert(adaptiveSparkPlanExec.size == 2)
}
runTPCHQuery(15) { df => }
}

test("TPCH Q16") {
Expand All @@ -140,13 +107,7 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}

test("TPCH Q18") {
runTPCHQuery(18) {
df =>
val hashAggregates = collect(df.queryExecution.executedPlan) {
case hash: HashAggregateExecBaseTransformer => hash
}
assert(hashAggregates.size == 3)
}
runTPCHQuery(18) { df => }
}

test("TPCH Q19") {
Expand All @@ -162,14 +123,6 @@ class GlutenClickHouseColumnarSortShuffleAQESuite
}

test("TPCH Q22") {
runTPCHQuery(22) {
df =>
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
val adaptiveSparkPlanExec = collectWithSubqueries(df.queryExecution.executedPlan) {
case adaptive: AdaptiveSparkPlanExec => adaptive
}
assert(adaptiveSparkPlanExec.size == 3)
assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2))
}
runTPCHQuery(22) { df => }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

class GlutenClickHouseColumnarMemorySortShuffleSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {

override protected val tablesPath: String = basePath + "/tpch-data-ch"
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath + "mergetree-queries-output"

/** Run Gluten + ClickHouse Backend with ColumnarShuffleManager */
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle", "true")
}

test("TPCH Q1") {
runTPCHQuery(1) { df => }
}

test("TPCH Q2") {
runTPCHQuery(2) { df => }
}

test("TPCH Q3") {
runTPCHQuery(3) { df => }
}

test("TPCH Q4") {
runTPCHQuery(4) { df => }
}

test("TPCH Q5") {
runTPCHQuery(5) { df => }
}

test("TPCH Q6") {
runTPCHQuery(6) { df => }
}

test("TPCH Q7") {
runTPCHQuery(7) { df => }
}

test("TPCH Q8") {
runTPCHQuery(8) { df => }
}

test("TPCH Q9") {
runTPCHQuery(9) { df => }
}

test("TPCH Q10") {
runTPCHQuery(10) { df => }
}

test("TPCH Q11") {
runTPCHQuery(11) { df => }
}

test("TPCH Q12") {
runTPCHQuery(12) { df => }
}

test("TPCH Q13") {
runTPCHQuery(13) { df => }
}

test("TPCH Q14") {
runTPCHQuery(14) { df => }
}

test("TPCH Q15") {
runTPCHQuery(15) { df => }
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
}

test("TPCH Q17") {
runTPCHQuery(17) { df => }
}

test("TPCH Q18") {
runTPCHQuery(18) { df => }
}

test("TPCH Q19") {
runTPCHQuery(19) { df => }
}

test("TPCH Q20") {
runTPCHQuery(20) { df => }
}

test("TPCH Q21") {
runTPCHQuery(21, noFallBack = false) { df => }
}

test("TPCH Q22") {
runTPCHQuery(22) { df => }
}
}
10 changes: 3 additions & 7 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,15 +681,11 @@ void BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
if (backend_conf_map.contains(GLUTEN_TASK_OFFHEAP))
{
auto task_memory = std::stoull(backend_conf_map.at(GLUTEN_TASK_OFFHEAP));
if (!backend_conf_map.contains(CH_RUNTIME_SETTINGS + "max_bytes_before_external_sort"))
if (!backend_conf_map.contains(CH_RUNTIME_SETTINGS_PREFIX + "max_bytes_before_external_sort"))
{
double mem_size_gb = static_cast<double>(task_memory) / 1024 / 1024 / 1024;
// settings.max_bytes_before_external_sort = static_cast<size_t>(std::min(1 / (4.1 * mem_size_gb - 1.5) + 0.42, 0.8) *
// task_memory);
settings.max_bytes_before_external_sort = static_cast<size_t>(0.8 *
task_memory);
settings.max_bytes_before_external_sort = static_cast<size_t>(0.8 * task_memory);
}
if (!backend_conf_map.contains(CH_RUNTIME_SETTINGS + "prefer_external_sort_block_bytes"))
if (!backend_conf_map.contains(CH_RUNTIME_SETTINGS_PREFIX + "prefer_external_sort_block_bytes"))
{
auto mem_gb = task_memory / static_cast<double>(1_GiB);
// 2.8x+5, Heuristics calculate the block size of external sort, [8,16]
Expand Down
6 changes: 4 additions & 2 deletions cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const SplitO
jmethodID celeborn_push_partition_data_method =
GetMethodID(env, celeborn_partition_pusher_class, "pushPartitionData", "(I[BI)I");
CLEAN_JNIENV
auto celeborn_client = std::make_unique<CelebornClient>(rss_pusher, celeborn_push_partition_data_method);
celeborn_client = std::make_unique<CelebornClient>(rss_pusher, celeborn_push_partition_data_method);
}


Expand Down Expand Up @@ -141,8 +141,10 @@ void CachedShuffleWriter::lazyInitPartitionWriter(Block & input_sample)
sort_shuffle = use_memory_sort_shuffle || use_external_sort_shuffle;
if (celeborn_client)
{
if (sort_shuffle)
if (use_external_sort_shuffle)
partition_writer = std::make_unique<ExternalSortCelebornPartitionWriter>(this, std::move(celeborn_client));
else if (use_memory_sort_shuffle)
partition_writer = std::make_unique<MemorySortCelebornPartitionWriter>(this, std::move(celeborn_client));
else
partition_writer = std::make_unique<CelebornPartitionWriter>(this, std::move(celeborn_client));
}
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class CachedShuffleWriter : public ShuffleWriterBase
friend class CelebornPartitionWriter;
friend class SortBasedPartitionWriter;
friend class MemorySortLocalPartitionWriter;
friend class MemorySortCelebornPartitionWriter;
friend class ExternalSortLocalPartitionWriter;
friend class ExternalSortCelebornPartitionWriter;
friend class Spillable;
Expand Down
Loading

0 comments on commit ee3ecb5

Please sign in to comment.