diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeExceptionSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeExceptionSuite.scala index f7f636b0bf139..cac1a8c5b3464 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeExceptionSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeExceptionSuite.scala @@ -17,7 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig -import org.apache.gluten.utils.{ExceptionUtils, UTSystemParameters} +import org.apache.gluten.utils.{TestExceptionUtils, UTSystemParameters} import org.apache.spark.SparkConf @@ -30,7 +30,7 @@ class GlutenClickHouseNativeExceptionSuite extends GlutenClickHouseWholeStageTra test("native exception caught by jvm") { try { - ExceptionUtils.generateNativeException() + TestExceptionUtils.generateNativeException() assert(false) } catch { case e: Exception => diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index b74c18dd14af1..6fc142431d959 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -627,7 +627,7 @@ DB::Context::ConfigurationPtr BackendInitializerUtil::initConfig(std::mapsetString(CH_TASK_MEMORY, backend_conf_map.at(GLUTEN_TASK_OFFHEAP)); + config->setString(MemoryConfig::CH_TASK_MEMORY, backend_conf_map.at(GLUTEN_TASK_OFFHEAP)); } const bool use_current_directory_as_tmp = config->getBool("use_current_directory_as_tmp", false); diff --git a/cpp-ch/local-engine/Common/CHUtil.h b/cpp-ch/local-engine/Common/CHUtil.h index 7be3f86dc2303..d27acf4604d0e 100644 --- a/cpp-ch/local-engine/Common/CHUtil.h +++ b/cpp-ch/local-engine/Common/CHUtil.h @@ -186,7 +186,6 @@ class BackendInitializerUtil inline static const std::string SPARK_SESSION_TIME_ZONE = "spark.sql.session.timeZone"; inline static const String GLUTEN_TASK_OFFHEAP = "spark.gluten.memory.task.offHeap.size.in.bytes"; - inline static const String CH_TASK_MEMORY = "off_heap_per_task"; /// On yarn mode, native writing on hdfs cluster takes yarn container user as the user passed to libhdfs3, which /// will cause permission issue because yarn container user is not the owner of the hdfs dir to be written. diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h b/cpp-ch/local-engine/Common/GlutenConfig.h new file mode 100644 index 0000000000000..093f2d1607837 --- /dev/null +++ b/cpp-ch/local-engine/Common/GlutenConfig.h @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace local_engine +{ +struct MemoryConfig +{ + inline static const String EXTRA_MEMORY_HARD_LIMIT = "extra_memory_hard_limit"; + inline static const String CH_TASK_MEMORY = "off_heap_per_task"; + inline static const String SPILL_MEM_RATIO = "spill_mem_ratio"; + + size_t extra_memory_hard_limit = 0; + size_t off_heap_per_task = 0; + double spill_mem_ratio = 0.9; + + static MemoryConfig loadFromContext(DB::ContextPtr context) + { + MemoryConfig config; + config.extra_memory_hard_limit = context->getConfigRef().getUInt64(EXTRA_MEMORY_HARD_LIMIT, 0); + config.off_heap_per_task = context->getConfigRef().getUInt64(CH_TASK_MEMORY, 0); + config.spill_mem_ratio = context->getConfigRef().getUInt64(SPILL_MEM_RATIO, 0.9); + return config; + } +}; + +struct GraceMergingAggregateConfig +{ + inline static const String MAX_GRACE_AGGREGATE_MERGING_BUCKETS = "max_grace_aggregate_merging_buckets"; + inline static const String THROW_ON_OVERFLOW_GRACE_AGGREGATE_MERGING_BUCKETS = "throw_on_overflow_grace_aggregate_merging_buckets"; + inline static const String AGGREGATED_KEYS_BEFORE_EXTEND_GRACE_AGGREGATE_MERGING_BUCKETS = "aggregated_keys_before_extend_grace_aggregate_merging_buckets"; + inline static const String MAX_PENDING_FLUSH_BLOCKS_PER_GRACE_AGGREGATE_MERGING_BUCKET = "max_pending_flush_blocks_per_grace_aggregate_merging_bucket"; + inline static const String MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING = "max_allowed_memory_usage_ratio_for_aggregate_merging"; + + size_t max_grace_aggregate_merging_buckets = 32; + bool throw_on_overflow_grace_aggregate_merging_buckets = false; + size_t aggregated_keys_before_extend_grace_aggregate_merging_buckets = 8192; + size_t max_pending_flush_blocks_per_grace_aggregate_merging_bucket = 1_MiB; + size_t max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9; + + static GraceMergingAggregateConfig loadFromContext(DB::ContextPtr context) + { + GraceMergingAggregateConfig config; + config.max_grace_aggregate_merging_buckets = context->getConfigRef().getUInt64(MAX_GRACE_AGGREGATE_MERGING_BUCKETS, 32); + config.throw_on_overflow_grace_aggregate_merging_buckets = context->getConfigRef().getBool(THROW_ON_OVERFLOW_GRACE_AGGREGATE_MERGING_BUCKETS, false); + config.aggregated_keys_before_extend_grace_aggregate_merging_buckets = context->getConfigRef().getUInt64(AGGREGATED_KEYS_BEFORE_EXTEND_GRACE_AGGREGATE_MERGING_BUCKETS, 8192); + config.max_pending_flush_blocks_per_grace_aggregate_merging_bucket = context->getConfigRef().getUInt64(MAX_PENDING_FLUSH_BLOCKS_PER_GRACE_AGGREGATE_MERGING_BUCKET, 1_MiB); + config.max_allowed_memory_usage_ratio_for_aggregate_merging = context->getConfigRef().getDouble(MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING, 0.9); + return config; + } +}; + +struct StreamingAggregateConfig +{ + inline static const String AGGREGATED_KEYS_BEFORE_STREAMING_AGGREGATING_EVICT = "aggregated_keys_before_streaming_aggregating_evict"; + inline static const String MAX_MEMORY_USAGE_RATIO_FOR_STREAMING_AGGREGATING = "max_memory_usage_ratio_for_streaming_aggregating"; + inline static const String HIGH_CARDINALITY_THRESHOLD_FOR_STREAMING_AGGREGATING = "high_cardinality_threshold_for_streaming_aggregating"; + inline static const String ENABLE_STREAMING_AGGREGATING = "enable_streaming_aggregating"; + + size_t aggregated_keys_before_streaming_aggregating_evict = 1024; + double max_memory_usage_ratio_for_streaming_aggregating = 0.9; + double high_cardinality_threshold_for_streaming_aggregating = 0.8; + bool enable_streaming_aggregating = true; + + static StreamingAggregateConfig loadFromContext(DB::ContextPtr context) + { + StreamingAggregateConfig config; + config.aggregated_keys_before_streaming_aggregating_evict = context->getConfigRef().getUInt64(AGGREGATED_KEYS_BEFORE_STREAMING_AGGREGATING_EVICT, 1024); + config.max_memory_usage_ratio_for_streaming_aggregating = context->getConfigRef().getDouble(MAX_MEMORY_USAGE_RATIO_FOR_STREAMING_AGGREGATING, 0.9); + config.high_cardinality_threshold_for_streaming_aggregating = context->getConfigRef().getDouble(HIGH_CARDINALITY_THRESHOLD_FOR_STREAMING_AGGREGATING, 0.8); + config.enable_streaming_aggregating = context->getConfigRef().getBool(ENABLE_STREAMING_AGGREGATING, true); + return config; + } +}; + +struct ExecutorConfig +{ + inline static const String DUMP_PIPELINE = "dump_pipeline"; + inline static const String USE_LOCAL_FORMAT = "use_local_format"; + + bool dump_pipeline = false; + bool use_local_format = false; + + static ExecutorConfig loadFromContext(DB::ContextPtr context) + { + ExecutorConfig config; + config.dump_pipeline = context->getConfigRef().getBool(DUMP_PIPELINE, false); + config.use_local_format = context->getConfigRef().getBool(USE_LOCAL_FORMAT, false); + return config; + } +}; + +struct HdfsConfig +{ + inline static const String HDFS_ASYNC = "hdfs.enable_async_io"; + + bool hdfs_async = true; + + static HdfsConfig loadFromContext(DB::ContextPtr context) + { + HdfsConfig config; + config.hdfs_async = context->getConfigRef().getBool(HDFS_ASYNC, true); + return config; + } +}; + +struct S3Config +{ + inline static const String S3_LOCAL_CACHE_ENABLE = "s3.local_cache.enabled"; + inline static const String S3_LOCAL_CACHE_MAX_SIZE = "s3.local_cache.max_size"; + inline static const String S3_LOCAL_CACHE_CACHE_PATH = "s3.local_cache.cache_path"; + inline static const String S3_GCS_ISSUE_COMPOSE_REQUEST = "s3.gcs_issue_compose_request"; + + bool s3_local_cache_enabled = false; + size_t s3_local_cache_max_size = 100_GiB; + String s3_local_cache_cache_path = ""; + bool s3_gcs_issue_compose_request = false; + + static S3Config loadFromContext(DB::ContextPtr context) + { + S3Config config; + config.s3_local_cache_enabled = context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false); + config.s3_local_cache_max_size = context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB); + config.s3_local_cache_cache_path = context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, ""); + config.s3_gcs_issue_compose_request = context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false); + return config; + } +}; + +struct MergeTreeConfig +{ + inline static const String TABLE_PART_METADATA_CACHE_MAX_COUNT = "table_part_metadata_cache_max_count"; + inline static const String TABLE_METADATA_CACHE_MAX_COUNT = "table_metadata_cache_max_count"; + + size_t table_part_metadata_cache_max_count = 1000; + size_t table_metadata_cache_max_count = 100; + + static MergeTreeConfig loadFromContext(DB::ContextPtr context) + { + MergeTreeConfig config; + config.table_part_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_PART_METADATA_CACHE_MAX_COUNT, 1000); + config.table_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_METADATA_CACHE_MAX_COUNT, 100); + return config; + } +}; +} + diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index 44a9247785071..6417c70ed099b 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -40,15 +41,6 @@ thread_local std::shared_ptr thread_status; thread_local std::shared_ptr thread_group; thread_local ContextMutablePtr query_context; -struct QueryContextSettings -{ - size_t extra_memory_hard_limit = 0; - - void loadFromContext(ContextPtr context) - { - extra_memory_hard_limit = context->getConfigRef().getUInt64("extra_memory_hard_limit", 0); - } -}; ContextMutablePtr QueryContextManager::initializeQuery() { @@ -62,15 +54,14 @@ ContextMutablePtr QueryContextManager::initializeQuery() // Notice: // this generated random query id a qualified global queryid for the spark query query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4())); - QueryContextSettings settings; - settings.loadFromContext(query_context); + auto config = MemoryConfig::loadFromContext(query_context); thread_status = std::make_shared(); thread_group = std::make_shared(query_context); CurrentThread::attachToGroup(thread_group); - auto memory_limit = query_context->getConfigRef().getUInt64(BackendInitializerUtil::CH_TASK_MEMORY); + auto memory_limit = config.off_heap_per_task; thread_group->memory_tracker.setSoftLimit(memory_limit); - thread_group->memory_tracker.setHardLimit(memory_limit + settings.extra_memory_hard_limit); + thread_group->memory_tracker.setHardLimit(memory_limit + config.extra_memory_hard_limit); return query_context; } diff --git a/cpp-ch/local-engine/Common/QueryContext.h b/cpp-ch/local-engine/Common/QueryContext.h index ef39f921713d9..4d572ee088be3 100644 --- a/cpp-ch/local-engine/Common/QueryContext.h +++ b/cpp-ch/local-engine/Common/QueryContext.h @@ -34,7 +34,9 @@ class QueryContextManager size_t currentPeakMemory(); void finalizeQuery(); + private: + QueryContextManager() = default; LoggerPtr logger = getLogger("QueryContextManager"); }; diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp index a9a2df276a594..c615a9b8d851b 100644 --- a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp +++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp @@ -22,6 +22,7 @@ #include #include #include +#include namespace DB { @@ -114,12 +115,13 @@ GraceMergingAggregatedTransform::GraceMergingAggregatedTransform(const DB::Block , no_pre_aggregated(no_pre_aggregated_) , tmp_data_disk(std::make_unique(context_->getTempDataOnDisk())) { - max_buckets = context->getConfigRef().getUInt64("max_grace_aggregate_merging_buckets", 32); - throw_on_overflow_buckets = context->getConfigRef().getBool("throw_on_overflow_grace_aggregate_merging_buckets", false); - aggregated_keys_before_extend_buckets = context->getConfigRef().getUInt64("aggregated_keys_before_extend_grace_aggregate_merging_buckets", 8196); + auto config = GraceMergingAggregateConfig::loadFromContext(context); + max_buckets = config.max_grace_aggregate_merging_buckets; + throw_on_overflow_buckets = config.throw_on_overflow_grace_aggregate_merging_buckets; + aggregated_keys_before_extend_buckets = config.aggregated_keys_before_extend_grace_aggregate_merging_buckets; aggregated_keys_before_extend_buckets = PODArrayUtil::adjustMemoryEfficientSize(aggregated_keys_before_extend_buckets); - max_pending_flush_blocks_per_bucket = context->getConfigRef().getUInt64("max_pending_flush_blocks_per_grace_aggregate_merging_bucket", 1024 * 1024); - max_allowed_memory_usage_ratio = context->getConfigRef().getDouble("max_allowed_memory_usage_ratio_for_aggregate_merging", 0.9); + max_pending_flush_blocks_per_bucket = config.max_pending_flush_blocks_per_grace_aggregate_merging_bucket; + max_allowed_memory_usage_ratio = config.max_allowed_memory_usage_ratio_for_aggregate_merging; // bucket 0 is for in-memory data, it's just a placeholder. buckets.emplace(0, BufferFileStream()); diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp index 65d77f8e968f0..f0db6324f0e12 100644 --- a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp +++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include namespace DB @@ -41,10 +42,11 @@ StreamingAggregatingTransform::StreamingAggregatingTransform(DB::ContextPtr cont , aggregate_columns(params_->params.aggregates_size) , params(params_) { - aggregated_keys_before_evict = context->getConfigRef().getUInt64("aggregated_keys_before_streaming_aggregating_evict", 1024); + auto config = StreamingAggregateConfig::loadFromContext(context); + aggregated_keys_before_evict = config.aggregated_keys_before_streaming_aggregating_evict; aggregated_keys_before_evict = PODArrayUtil::adjustMemoryEfficientSize(aggregated_keys_before_evict); - max_allowed_memory_usage_ratio = context->getConfigRef().getDouble("max_memory_usage_ratio_for_streaming_aggregating", 0.9); - high_cardinality_threshold = context->getConfigRef().getDouble("high_cardinality_threshold_for_streaming_aggregating", 0.8); + max_allowed_memory_usage_ratio = config.max_memory_usage_ratio_for_streaming_aggregating; + high_cardinality_threshold = config.high_cardinality_threshold_for_streaming_aggregating; } StreamingAggregatingTransform::~StreamingAggregatingTransform() diff --git a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp index 0857995571d45..532b4114b8f07 100644 --- a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp +++ b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp @@ -29,6 +29,7 @@ #include #include #include +#include namespace DB { @@ -287,8 +288,8 @@ void AggregateRelParser::addMergingAggregatedStep() settings.max_threads, PODArrayUtil::adjustMemoryEfficientSize(settings.max_block_size), settings.min_hit_rate_to_use_consecutive_keys_optimization); - bool enable_streaming_aggregating = getContext()->getConfigRef().getBool("enable_streaming_aggregating", true); - if (enable_streaming_aggregating) + auto config = StreamingAggregateConfig::loadFromContext(getContext()); + if (config.enable_streaming_aggregating) { params.group_by_two_level_threshold = settings.group_by_two_level_threshold; auto merging_step = std::make_unique(getContext(), plan->getCurrentDataStream(), params, false); @@ -319,8 +320,8 @@ void AggregateRelParser::addCompleteModeAggregatedStep() AggregateDescriptions aggregate_descriptions; buildAggregateDescriptions(aggregate_descriptions); const auto & settings = getContext()->getSettingsRef(); - bool enable_streaming_aggregating = getContext()->getConfigRef().getBool("enable_streaming_aggregating", true); - if (enable_streaming_aggregating) + auto config = StreamingAggregateConfig::loadFromContext(getContext()); + if (config.enable_streaming_aggregating) { Aggregator::Params params( grouping_keys, @@ -397,9 +398,9 @@ void AggregateRelParser::addAggregatingStep() AggregateDescriptions aggregate_descriptions; buildAggregateDescriptions(aggregate_descriptions); const auto & settings = getContext()->getSettingsRef(); - bool enable_streaming_aggregating = getContext()->getConfigRef().getBool("enable_streaming_aggregating", true); - if (enable_streaming_aggregating) + auto config = StreamingAggregateConfig::loadFromContext(getContext()); + if (config.enable_streaming_aggregating) { // Disable spilling to disk. // If group_by_two_level_threshold_bytes != 0, `Aggregator` will use memory usage as a condition to convert diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 50431abd9fdba..68f3a82933de6 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -78,6 +78,7 @@ #include #include #include +#include #include #include #include @@ -1689,7 +1690,8 @@ SharedContextHolder SerializedPlanParser::shared_context; LocalExecutor::~LocalExecutor() { - if (context->getConfigRef().getBool("dump_pipeline", false)) + auto config = ExecutorConfig::loadFromContext(context); + if (config.dump_pipeline) LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Dump pipeline:\n{}", dumpPipeline()); if (spark_buffer) diff --git a/cpp-ch/local-engine/Parser/SortRelParser.cpp b/cpp-ch/local-engine/Parser/SortRelParser.cpp index 0371aff4200c7..8fb97d6da5dd3 100644 --- a/cpp-ch/local-engine/Parser/SortRelParser.cpp +++ b/cpp-ch/local-engine/Parser/SortRelParser.cpp @@ -15,6 +15,8 @@ * limitations under the License. */ #include "SortRelParser.h" + +#include #include #include #include @@ -41,7 +43,8 @@ SortRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, st const auto & sort_rel = rel.sort(); auto sort_descr = parseSortDescription(sort_rel.sorts(), query_plan->getCurrentDataStream().header); SortingStep::Settings settings(*getContext()); - double spill_mem_ratio = getContext()->getConfigRef().getDouble("spill_mem_ratio", 0.9); + auto config = MemoryConfig::loadFromContext(getContext()); + double spill_mem_ratio = config.spill_mem_ratio; settings.worth_external_sort = [spill_mem_ratio]() -> bool { return currentThreadGroupMemoryUsageRatio() > spill_mem_ratio; diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 8ee44a0f78f36..6582e2eadbc29 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -312,11 +312,6 @@ void LocalPartitionWriter::stop() shuffle_writer->split_result.partition_lengths = offsets; } -void PartitionWriterSettings::loadFromContext(DB::ContextPtr context) -{ - spill_mem_ratio = context->getConfigRef().getDouble("spill_mem_ratio", 0.9); -} - PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_, LoggerPtr logger_) : shuffle_writer(shuffle_writer_) , options(&shuffle_writer->options) diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index a648026b6d55c..0c3c0be50f2d6 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -59,13 +60,6 @@ class Partition size_t cached_bytes = 0; }; -struct PartitionWriterSettings -{ - double spill_mem_ratio = 0.9; - - void loadFromContext(DB::ContextPtr context); -}; - class CachedShuffleWriter; using PartitionPtr = std::shared_ptr; class PartitionWriter : boost::noncopyable @@ -95,7 +89,7 @@ class PartitionWriter : boost::noncopyable CachedShuffleWriter * shuffle_writer; const SplitOptions * options; - PartitionWriterSettings settings; + MemoryConfig settings; std::vector partition_block_buffer; std::vector partition_buffer; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h index 0d44d598956eb..d398362aa4b64 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h @@ -45,7 +45,7 @@ struct SplitOptions std::string out_exprs; std::string compress_method = "zstd"; int compress_level; - size_t spill_threshold = 500 * 1024 * 1024; + size_t spill_threshold = 300 * 1024 * 1024; std::string hash_algorithm; size_t max_sort_buffer_size = 1_GiB; bool force_memory_sort = false; diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp index 0731ac92cd078..c59d6ddb4bd41 100644 --- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp +++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp @@ -16,6 +16,8 @@ */ #include "StorageMergeTreeFactory.h" +#include + namespace local_engine { @@ -67,14 +69,12 @@ DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i { DataPartsVector res; auto table_name = getTableName(id, snapshot_id); - + auto config = MergeTreeConfig::loadFromContext(SerializedPlanParser::global_context); std::lock_guard lock(datapart_mutex); std::unordered_set missing_names; if (!datapart_map->has(table_name)) [[unlikely]] { - auto cache = std::make_shared>( - SerializedPlanParser::global_context->getConfigRef().getInt64("table_part_metadata_cache_max_count", 1000000) - ); + auto cache = std::make_shared>(config.table_part_metadata_cache_max_count); datapart_map->add(table_name, cache); } diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h index d7bcb93c07d7d..f372175bb02ce 100644 --- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h +++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h @@ -15,6 +15,7 @@ * limitations under the License. */ #pragma once +#include #include #include #include @@ -34,11 +35,11 @@ class StorageMergeTreeFactory static DataPartsVector getDataPartsByNames(const StorageID & id, const String & snapshot_id, std::unordered_set part_name); static void init_cache_map() { + auto config = MergeTreeConfig::loadFromContext(SerializedPlanParser::global_context); auto & storage_map_v = storage_map; if (!storage_map_v) { - storage_map_v = std::make_unique>( - SerializedPlanParser::global_context->getConfigRef().getInt64("table_metadata_cache_max_count", 100)); + storage_map_v = std::make_unique>(config.table_metadata_cache_max_count); } else { @@ -48,7 +49,7 @@ class StorageMergeTreeFactory if (!datapart_map_v) { datapart_map_v = std::make_unique>>>( - SerializedPlanParser::global_context->getConfigRef().getInt64("table_metadata_cache_max_count", 100)); + config.table_metadata_cache_max_count); } else { diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp index 0221afd885141..00acaf58398cc 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp @@ -36,6 +36,7 @@ #include #endif +#include #include namespace DB @@ -81,8 +82,8 @@ FormatFilePtr FormatFileUtil::createFile( #if USE_PARQUET if (file.has_parquet()) { - bool useLocalFormat = context->getConfigRef().getBool("use_local_format", false); - return std::make_shared(context, file, read_buffer_builder, useLocalFormat); + auto config = ExecutorConfig::loadFromContext(context); + return std::make_shared(context, file, read_buffer_builder, config.use_local_format); } #endif diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp index e73ca8ecee2bb..da15890070b09 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -211,7 +212,7 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder std::unique_ptr build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override { - bool enable_async_io = context->getConfigRef().getBool("hdfs.enable_async_io", true); + auto config = HdfsConfig::loadFromContext(context); Poco::URI file_uri(file_info.uri_file()); std::string uri_path = "hdfs://" + file_uri.getHost(); if (file_uri.getPort()) @@ -233,7 +234,7 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder auto read_buffer_impl = std::make_unique( uri_path, file_uri.getPath(), context->getConfigRef(), read_settings, start_end_pos.second, true); - if (enable_async_io) + if (config.hdfs_async) { auto & pool_reader = context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); read_buffer = std::make_unique(pool_reader, read_settings, std::move(read_buffer_impl)); @@ -249,7 +250,7 @@ class HDFSFileReadBufferBuilder : public ReadBufferBuilder { auto read_buffer_impl = std::make_unique(uri_path, file_uri.getPath(), context->getConfigRef(), read_settings, 0, true); - if (enable_async_io) + if (config.hdfs_async) { read_buffer = std::make_unique( context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), @@ -380,14 +381,15 @@ class S3FileReadBufferBuilder : public ReadBufferBuilder public: explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) { + auto config = S3Config::loadFromContext(context); new_settings = context->getReadSettings(); - new_settings.enable_filesystem_cache = context->getConfigRef().getBool("s3.local_cache.enabled", false); + new_settings.enable_filesystem_cache = config.s3_local_cache_enabled; if (new_settings.enable_filesystem_cache) { DB::FileCacheSettings file_cache_settings; - file_cache_settings.max_size = static_cast(context->getConfigRef().getUInt64("s3.local_cache.max_size", 100L << 30)); - auto cache_base_path = context->getConfigRef().getString("s3.local_cache.cache_path", "/tmp/gluten/local_cache"); + file_cache_settings.max_size = config.s3_local_cache_max_size; + auto cache_base_path = config.s3_local_cache_cache_path; if (!fs::exists(cache_base_path)) fs::create_directories(cache_base_path);