Skip to content

Commit

Permalink
Unified configuration entry
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Jul 24, 2024
1 parent 2f69cba commit 04bf0cd
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.{ExceptionUtils, UTSystemParameters}
import org.apache.gluten.utils.{TestExceptionUtils, UTSystemParameters}

import org.apache.spark.SparkConf

Expand All @@ -30,7 +30,7 @@ class GlutenClickHouseNativeExceptionSuite extends GlutenClickHouseWholeStageTra

test("native exception caught by jvm") {
try {
ExceptionUtils.generateNativeException()
TestExceptionUtils.generateNativeException()
assert(false)
} catch {
case e: Exception =>
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ DB::Context::ConfigurationPtr BackendInitializerUtil::initConfig(std::map<std::s

if (backend_conf_map.contains(GLUTEN_TASK_OFFHEAP))
{
config->setString(CH_TASK_MEMORY, backend_conf_map.at(GLUTEN_TASK_OFFHEAP));
config->setString(MemoryConfig::CH_TASK_MEMORY, backend_conf_map.at(GLUTEN_TASK_OFFHEAP));
}

const bool use_current_directory_as_tmp = config->getBool("use_current_directory_as_tmp", false);
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ class BackendInitializerUtil
inline static const std::string SPARK_SESSION_TIME_ZONE = "spark.sql.session.timeZone";

inline static const String GLUTEN_TASK_OFFHEAP = "spark.gluten.memory.task.offHeap.size.in.bytes";
inline static const String CH_TASK_MEMORY = "off_heap_per_task";

/// On yarn mode, native writing on hdfs cluster takes yarn container user as the user passed to libhdfs3, which
/// will cause permission issue because yarn container user is not the owner of the hdfs dir to be written.
Expand Down
166 changes: 166 additions & 0 deletions cpp-ch/local-engine/Common/GlutenConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <base/unit.h>
#include <base/types.h>
#include <Interpreters/Context.h>

namespace local_engine
{
struct MemoryConfig
{
inline static const String EXTRA_MEMORY_HARD_LIMIT = "extra_memory_hard_limit";
inline static const String CH_TASK_MEMORY = "off_heap_per_task";
inline static const String SPILL_MEM_RATIO = "spill_mem_ratio";

size_t extra_memory_hard_limit = 0;
size_t off_heap_per_task = 0;
double spill_mem_ratio = 0.9;

static MemoryConfig loadFromContext(DB::ContextPtr context)
{
MemoryConfig config;
config.extra_memory_hard_limit = context->getConfigRef().getUInt64(EXTRA_MEMORY_HARD_LIMIT, 0);
config.off_heap_per_task = context->getConfigRef().getUInt64(CH_TASK_MEMORY, 0);
config.spill_mem_ratio = context->getConfigRef().getUInt64(SPILL_MEM_RATIO, 0.9);
return config;
}
};

struct GraceMergingAggregateConfig
{
inline static const String MAX_GRACE_AGGREGATE_MERGING_BUCKETS = "max_grace_aggregate_merging_buckets";
inline static const String THROW_ON_OVERFLOW_GRACE_AGGREGATE_MERGING_BUCKETS = "throw_on_overflow_grace_aggregate_merging_buckets";
inline static const String AGGREGATED_KEYS_BEFORE_EXTEND_GRACE_AGGREGATE_MERGING_BUCKETS = "aggregated_keys_before_extend_grace_aggregate_merging_buckets";
inline static const String MAX_PENDING_FLUSH_BLOCKS_PER_GRACE_AGGREGATE_MERGING_BUCKET = "max_pending_flush_blocks_per_grace_aggregate_merging_bucket";
inline static const String MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING = "max_allowed_memory_usage_ratio_for_aggregate_merging";

size_t max_grace_aggregate_merging_buckets = 32;
bool throw_on_overflow_grace_aggregate_merging_buckets = false;
size_t aggregated_keys_before_extend_grace_aggregate_merging_buckets = 8192;
size_t max_pending_flush_blocks_per_grace_aggregate_merging_bucket = 1_MiB;
size_t max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9;

static GraceMergingAggregateConfig loadFromContext(DB::ContextPtr context)
{
GraceMergingAggregateConfig config;
config.max_grace_aggregate_merging_buckets = context->getConfigRef().getUInt64(MAX_GRACE_AGGREGATE_MERGING_BUCKETS, 32);
config.throw_on_overflow_grace_aggregate_merging_buckets = context->getConfigRef().getBool(THROW_ON_OVERFLOW_GRACE_AGGREGATE_MERGING_BUCKETS, false);
config.aggregated_keys_before_extend_grace_aggregate_merging_buckets = context->getConfigRef().getUInt64(AGGREGATED_KEYS_BEFORE_EXTEND_GRACE_AGGREGATE_MERGING_BUCKETS, 8192);
config.max_pending_flush_blocks_per_grace_aggregate_merging_bucket = context->getConfigRef().getUInt64(MAX_PENDING_FLUSH_BLOCKS_PER_GRACE_AGGREGATE_MERGING_BUCKET, 1_MiB);
config.max_allowed_memory_usage_ratio_for_aggregate_merging = context->getConfigRef().getDouble(MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING, 0.9);
return config;
}
};

struct StreamingAggregateConfig
{
inline static const String AGGREGATED_KEYS_BEFORE_STREAMING_AGGREGATING_EVICT = "aggregated_keys_before_streaming_aggregating_evict";
inline static const String MAX_MEMORY_USAGE_RATIO_FOR_STREAMING_AGGREGATING = "max_memory_usage_ratio_for_streaming_aggregating";
inline static const String HIGH_CARDINALITY_THRESHOLD_FOR_STREAMING_AGGREGATING = "high_cardinality_threshold_for_streaming_aggregating";
inline static const String ENABLE_STREAMING_AGGREGATING = "enable_streaming_aggregating";

size_t aggregated_keys_before_streaming_aggregating_evict = 1024;
double max_memory_usage_ratio_for_streaming_aggregating = 0.9;
double high_cardinality_threshold_for_streaming_aggregating = 0.8;
bool enable_streaming_aggregating = true;

static StreamingAggregateConfig loadFromContext(DB::ContextPtr context)
{
StreamingAggregateConfig config;
config.aggregated_keys_before_streaming_aggregating_evict = context->getConfigRef().getUInt64(AGGREGATED_KEYS_BEFORE_STREAMING_AGGREGATING_EVICT, 1024);
config.max_memory_usage_ratio_for_streaming_aggregating = context->getConfigRef().getDouble(MAX_MEMORY_USAGE_RATIO_FOR_STREAMING_AGGREGATING, 0.9);
config.high_cardinality_threshold_for_streaming_aggregating = context->getConfigRef().getDouble(HIGH_CARDINALITY_THRESHOLD_FOR_STREAMING_AGGREGATING, 0.8);
config.enable_streaming_aggregating = context->getConfigRef().getBool(ENABLE_STREAMING_AGGREGATING, true);
return config;
}
};

struct ExecutorConfig
{
inline static const String DUMP_PIPELINE = "dump_pipeline";
inline static const String USE_LOCAL_FORMAT = "use_local_format";

bool dump_pipeline = false;
bool use_local_format = false;

static ExecutorConfig loadFromContext(DB::ContextPtr context)
{
ExecutorConfig config;
config.dump_pipeline = context->getConfigRef().getBool(DUMP_PIPELINE, false);
config.use_local_format = context->getConfigRef().getBool(USE_LOCAL_FORMAT, false);
return config;
}
};

struct HdfsConfig
{
inline static const String HDFS_ASYNC = "hdfs.enable_async_io";

bool hdfs_async = true;

static HdfsConfig loadFromContext(DB::ContextPtr context)
{
HdfsConfig config;
config.hdfs_async = context->getConfigRef().getBool(HDFS_ASYNC, true);
return config;
}
};

struct S3Config
{
inline static const String S3_LOCAL_CACHE_ENABLE = "s3.local_cache.enabled";
inline static const String S3_LOCAL_CACHE_MAX_SIZE = "s3.local_cache.max_size";
inline static const String S3_LOCAL_CACHE_CACHE_PATH = "s3.local_cache.cache_path";
inline static const String S3_GCS_ISSUE_COMPOSE_REQUEST = "s3.gcs_issue_compose_request";

bool s3_local_cache_enabled = false;
size_t s3_local_cache_max_size = 100_GiB;
String s3_local_cache_cache_path = "";
bool s3_gcs_issue_compose_request = false;

static S3Config loadFromContext(DB::ContextPtr context)
{
S3Config config;
config.s3_local_cache_enabled = context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false);
config.s3_local_cache_max_size = context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB);
config.s3_local_cache_cache_path = context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, "");
config.s3_gcs_issue_compose_request = context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false);
return config;
}
};

struct MergeTreeConfig
{
inline static const String TABLE_PART_METADATA_CACHE_MAX_COUNT = "table_part_metadata_cache_max_count";
inline static const String TABLE_METADATA_CACHE_MAX_COUNT = "table_metadata_cache_max_count";

size_t table_part_metadata_cache_max_count = 1000;
size_t table_metadata_cache_max_count = 100;

static MergeTreeConfig loadFromContext(DB::ContextPtr context)
{
MergeTreeConfig config;
config.table_part_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_PART_METADATA_CACHE_MAX_COUNT, 1000);
config.table_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_METADATA_CACHE_MAX_COUNT, 100);
return config;
}
};
}

17 changes: 4 additions & 13 deletions cpp-ch/local-engine/Common/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Common/CurrentThread.h>
#include <Common/ThreadStatus.h>
#include <Common/CHUtil.h>
#include <Common/GlutenConfig.h>
#include <base/unit.h>
#include <sstream>
#include <iomanip>
Expand All @@ -40,15 +41,6 @@ thread_local std::shared_ptr<ThreadStatus> thread_status;
thread_local std::shared_ptr<ThreadGroup> thread_group;
thread_local ContextMutablePtr query_context;

struct QueryContextSettings
{
size_t extra_memory_hard_limit = 0;

void loadFromContext(ContextPtr context)
{
extra_memory_hard_limit = context->getConfigRef().getUInt64("extra_memory_hard_limit", 0);
}
};

ContextMutablePtr QueryContextManager::initializeQuery()
{
Expand All @@ -62,15 +54,14 @@ ContextMutablePtr QueryContextManager::initializeQuery()
// Notice:
// this generated random query id a qualified global queryid for the spark query
query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4()));
QueryContextSettings settings;
settings.loadFromContext(query_context);
auto config = MemoryConfig::loadFromContext(query_context);
thread_status = std::make_shared<ThreadStatus>();
thread_group = std::make_shared<ThreadGroup>(query_context);
CurrentThread::attachToGroup(thread_group);
auto memory_limit = query_context->getConfigRef().getUInt64(BackendInitializerUtil::CH_TASK_MEMORY);
auto memory_limit = config.off_heap_per_task;

thread_group->memory_tracker.setSoftLimit(memory_limit);
thread_group->memory_tracker.setHardLimit(memory_limit + settings.extra_memory_hard_limit);
thread_group->memory_tracker.setHardLimit(memory_limit + config.extra_memory_hard_limit);
return query_context;
}

Expand Down
2 changes: 2 additions & 0 deletions cpp-ch/local-engine/Common/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class QueryContextManager
size_t currentPeakMemory();
void finalizeQuery();


private:
QueryContextManager() = default;
LoggerPtr logger = getLogger("QueryContextManager");
};

Expand Down
12 changes: 7 additions & 5 deletions cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Common/CurrentThread.h>
#include <Common/formatReadable.h>
#include <Common/BitHelpers.h>
#include <Common/GlutenConfig.h>

namespace DB
{
Expand Down Expand Up @@ -114,12 +115,13 @@ GraceMergingAggregatedTransform::GraceMergingAggregatedTransform(const DB::Block
, no_pre_aggregated(no_pre_aggregated_)
, tmp_data_disk(std::make_unique<DB::TemporaryDataOnDisk>(context_->getTempDataOnDisk()))
{
max_buckets = context->getConfigRef().getUInt64("max_grace_aggregate_merging_buckets", 32);
throw_on_overflow_buckets = context->getConfigRef().getBool("throw_on_overflow_grace_aggregate_merging_buckets", false);
aggregated_keys_before_extend_buckets = context->getConfigRef().getUInt64("aggregated_keys_before_extend_grace_aggregate_merging_buckets", 8196);
auto config = GraceMergingAggregateConfig::loadFromContext(context);
max_buckets = config.max_grace_aggregate_merging_buckets;
throw_on_overflow_buckets = config.throw_on_overflow_grace_aggregate_merging_buckets;
aggregated_keys_before_extend_buckets = config.aggregated_keys_before_extend_grace_aggregate_merging_buckets;
aggregated_keys_before_extend_buckets = PODArrayUtil::adjustMemoryEfficientSize(aggregated_keys_before_extend_buckets);
max_pending_flush_blocks_per_bucket = context->getConfigRef().getUInt64("max_pending_flush_blocks_per_grace_aggregate_merging_bucket", 1024 * 1024);
max_allowed_memory_usage_ratio = context->getConfigRef().getDouble("max_allowed_memory_usage_ratio_for_aggregate_merging", 0.9);
max_pending_flush_blocks_per_bucket = config.max_pending_flush_blocks_per_grace_aggregate_merging_bucket;
max_allowed_memory_usage_ratio = config.max_allowed_memory_usage_ratio_for_aggregate_merging;
// bucket 0 is for in-memory data, it's just a placeholder.
buckets.emplace(0, BufferFileStream());

Expand Down
8 changes: 5 additions & 3 deletions cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Common/CHUtil.h>
#include <Common/CurrentThread.h>
#include <Common/formatReadable.h>
#include <Common/GlutenConfig.h>
#include <Common/Stopwatch.h>

namespace DB
Expand All @@ -41,10 +42,11 @@ StreamingAggregatingTransform::StreamingAggregatingTransform(DB::ContextPtr cont
, aggregate_columns(params_->params.aggregates_size)
, params(params_)
{
aggregated_keys_before_evict = context->getConfigRef().getUInt64("aggregated_keys_before_streaming_aggregating_evict", 1024);
auto config = StreamingAggregateConfig::loadFromContext(context);
aggregated_keys_before_evict = config.aggregated_keys_before_streaming_aggregating_evict;
aggregated_keys_before_evict = PODArrayUtil::adjustMemoryEfficientSize(aggregated_keys_before_evict);
max_allowed_memory_usage_ratio = context->getConfigRef().getDouble("max_memory_usage_ratio_for_streaming_aggregating", 0.9);
high_cardinality_threshold = context->getConfigRef().getDouble("high_cardinality_threshold_for_streaming_aggregating", 0.8);
max_allowed_memory_usage_ratio = config.max_memory_usage_ratio_for_streaming_aggregating;
high_cardinality_threshold = config.high_cardinality_threshold_for_streaming_aggregating;
}

StreamingAggregatingTransform::~StreamingAggregatingTransform()
Expand Down
13 changes: 7 additions & 6 deletions cpp-ch/local-engine/Parser/AggregateRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Common/CHUtil.h>
#include <Common/GlutenConfig.h>

namespace DB
{
Expand Down Expand Up @@ -287,8 +288,8 @@ void AggregateRelParser::addMergingAggregatedStep()
settings.max_threads,
PODArrayUtil::adjustMemoryEfficientSize(settings.max_block_size),
settings.min_hit_rate_to_use_consecutive_keys_optimization);
bool enable_streaming_aggregating = getContext()->getConfigRef().getBool("enable_streaming_aggregating", true);
if (enable_streaming_aggregating)
auto config = StreamingAggregateConfig::loadFromContext(getContext());
if (config.enable_streaming_aggregating)
{
params.group_by_two_level_threshold = settings.group_by_two_level_threshold;
auto merging_step = std::make_unique<GraceMergingAggregatedStep>(getContext(), plan->getCurrentDataStream(), params, false);
Expand Down Expand Up @@ -319,8 +320,8 @@ void AggregateRelParser::addCompleteModeAggregatedStep()
AggregateDescriptions aggregate_descriptions;
buildAggregateDescriptions(aggregate_descriptions);
const auto & settings = getContext()->getSettingsRef();
bool enable_streaming_aggregating = getContext()->getConfigRef().getBool("enable_streaming_aggregating", true);
if (enable_streaming_aggregating)
auto config = StreamingAggregateConfig::loadFromContext(getContext());
if (config.enable_streaming_aggregating)
{
Aggregator::Params params(
grouping_keys,
Expand Down Expand Up @@ -397,9 +398,9 @@ void AggregateRelParser::addAggregatingStep()
AggregateDescriptions aggregate_descriptions;
buildAggregateDescriptions(aggregate_descriptions);
const auto & settings = getContext()->getSettingsRef();
bool enable_streaming_aggregating = getContext()->getConfigRef().getBool("enable_streaming_aggregating", true);

if (enable_streaming_aggregating)
auto config = StreamingAggregateConfig::loadFromContext(getContext());
if (config.enable_streaming_aggregating)
{
// Disable spilling to disk.
// If group_by_two_level_threshold_bytes != 0, `Aggregator` will use memory usage as a condition to convert
Expand Down
4 changes: 3 additions & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
#include <Poco/Util/MapConfiguration.h>
#include <Common/CHUtil.h>
#include <Common/Exception.h>
#include <Common/GlutenConfig.h>
#include <Common/JNIUtils.h>
#include <Common/MergeTreeTool.h>
#include <Common/logger_useful.h>
Expand Down Expand Up @@ -1689,7 +1690,8 @@ SharedContextHolder SerializedPlanParser::shared_context;

LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
auto config = ExecutorConfig::loadFromContext(context);
if (config.dump_pipeline)
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Dump pipeline:\n{}", dumpPipeline());

if (spark_buffer)
Expand Down
5 changes: 4 additions & 1 deletion cpp-ch/local-engine/Parser/SortRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* limitations under the License.
*/
#include "SortRelParser.h"

#include <Common/GlutenConfig.h>
#include <Parser/RelParser.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Common/logger_useful.h>
Expand All @@ -41,7 +43,8 @@ SortRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, st
const auto & sort_rel = rel.sort();
auto sort_descr = parseSortDescription(sort_rel.sorts(), query_plan->getCurrentDataStream().header);
SortingStep::Settings settings(*getContext());
double spill_mem_ratio = getContext()->getConfigRef().getDouble("spill_mem_ratio", 0.9);
auto config = MemoryConfig::loadFromContext(getContext());
double spill_mem_ratio = config.spill_mem_ratio;
settings.worth_external_sort = [spill_mem_ratio]() -> bool
{
return currentThreadGroupMemoryUsageRatio() > spill_mem_ratio;
Expand Down
5 changes: 0 additions & 5 deletions cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,6 @@ void LocalPartitionWriter::stop()
shuffle_writer->split_result.partition_lengths = offsets;
}

void PartitionWriterSettings::loadFromContext(DB::ContextPtr context)
{
spill_mem_ratio = context->getConfigRef().getDouble("spill_mem_ratio", 0.9);
}

PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_, LoggerPtr logger_)
: shuffle_writer(shuffle_writer_)
, options(&shuffle_writer->options)
Expand Down
Loading

0 comments on commit 04bf0cd

Please sign in to comment.