diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 6fc142431d959..f08dd4c418162 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -1051,17 +1051,6 @@ String DateTimeUtil::convertTimeZone(const String & time_zone) return res; } -UInt64 MemoryUtil::getCurrentMemoryUsage(size_t depth) -{ - Int64 current_memory_usage = 0; - auto * current_mem_tracker = DB::CurrentThread::getMemoryTracker(); - for (size_t i = 0; i < depth && current_mem_tracker; ++i) - current_mem_tracker = current_mem_tracker->getParent(); - if (current_mem_tracker) - current_memory_usage = current_mem_tracker->get(); - return current_memory_usage < 0 ? 0 : current_memory_usage; -} - UInt64 MemoryUtil::getMemoryRSS() { long rss = 0L; diff --git a/cpp-ch/local-engine/Common/CHUtil.h b/cpp-ch/local-engine/Common/CHUtil.h index d27acf4604d0e..b9be3af227d3b 100644 --- a/cpp-ch/local-engine/Common/CHUtil.h +++ b/cpp-ch/local-engine/Common/CHUtil.h @@ -249,7 +249,6 @@ class DateTimeUtil class MemoryUtil { public: - static UInt64 getCurrentMemoryUsage(size_t depth = 1); static UInt64 getMemoryRSS(); }; diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h b/cpp-ch/local-engine/Common/GlutenConfig.h index 093f2d1607837..782df7f5413d4 100644 --- a/cpp-ch/local-engine/Common/GlutenConfig.h +++ b/cpp-ch/local-engine/Common/GlutenConfig.h @@ -55,7 +55,7 @@ struct GraceMergingAggregateConfig bool throw_on_overflow_grace_aggregate_merging_buckets = false; size_t aggregated_keys_before_extend_grace_aggregate_merging_buckets = 8192; size_t max_pending_flush_blocks_per_grace_aggregate_merging_bucket = 1_MiB; - size_t max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9; + double max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9; static GraceMergingAggregateConfig loadFromContext(DB::ContextPtr context) { diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp index c615a9b8d851b..82b498e58ff90 100644 --- a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp +++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace DB { @@ -162,7 +163,7 @@ GraceMergingAggregatedTransform::Status GraceMergingAggregatedTransform::prepare "Output one chunk. rows: {}, bytes: {}, current memory usage: {}", output_chunk.getNumRows(), ReadableSize(output_chunk.bytes()), - ReadableSize(MemoryUtil::getCurrentMemoryUsage())); + ReadableSize(currentThreadGroupMemoryUsage())); total_output_rows += output_chunk.getNumRows(); total_output_blocks++; output.push(std::move(output_chunk)); @@ -191,7 +192,7 @@ GraceMergingAggregatedTransform::Status GraceMergingAggregatedTransform::prepare "Input one new chunk. rows: {}, bytes: {}, current memory usage: {}", input_chunk.getNumRows(), ReadableSize(input_chunk.bytes()), - ReadableSize(MemoryUtil::getCurrentMemoryUsage())); + ReadableSize(currentThreadGroupMemoryUsage())); total_input_rows += input_chunk.getNumRows(); total_input_blocks++; has_input = true; @@ -279,7 +280,7 @@ bool GraceMergingAggregatedTransform::extendBuckets() void GraceMergingAggregatedTransform::rehashDataVariants() { - auto before_memoery_usage = MemoryUtil::getCurrentMemoryUsage(); + auto before_memoery_usage = currentThreadGroupMemoryUsage(); auto converter = currentDataVariantToBlockConverter(false); checkAndSetupCurrentDataVariants(); @@ -320,7 +321,7 @@ void GraceMergingAggregatedTransform::rehashDataVariants() current_bucket_index, getBucketsNum(), ReadableSize(before_memoery_usage), - ReadableSize(MemoryUtil::getCurrentMemoryUsage())); + ReadableSize(currentThreadGroupMemoryUsage())); }; DB::Blocks GraceMergingAggregatedTransform::scatterBlock(const DB::Block & block) @@ -541,7 +542,7 @@ void GraceMergingAggregatedTransform::mergeOneBlock(const DB::Block &block, bool block.info.bucket_num, current_bucket_index, getBucketsNum(), - ReadableSize(MemoryUtil::getCurrentMemoryUsage())); + ReadableSize(currentThreadGroupMemoryUsage())); /// the block could be one read from disk. block.info.bucket_num stores the number of buckets when it was scattered. /// so if the buckets number is not changed since it was scattered, we don't need to scatter it again. @@ -592,11 +593,13 @@ bool GraceMergingAggregatedTransform::isMemoryOverflow() /// More greedy memory usage strategy. if (!current_data_variants) return false; - if (!context->getSettingsRef().max_memory_usage) + + auto memory_soft_limit = DB::CurrentThread::getGroup()->memory_tracker.getSoftLimit(); + if (!memory_soft_limit) return false; - auto max_mem_used = static_cast(context->getSettingsRef().max_memory_usage * max_allowed_memory_usage_ratio); + auto max_mem_used = static_cast(memory_soft_limit * max_allowed_memory_usage_ratio); auto current_result_rows = current_data_variants->size(); - auto current_mem_used = MemoryUtil::getCurrentMemoryUsage(); + auto current_mem_used = currentThreadGroupMemoryUsage(); if (per_key_memory_usage > 0) { if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used) diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp index f0db6324f0e12..2235f4cbe45f5 100644 --- a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp +++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp @@ -19,9 +19,9 @@ #include #include #include -#include #include #include +#include #include namespace DB @@ -62,7 +62,7 @@ StreamingAggregatingTransform::~StreamingAggregatingTransform() total_clear_data_variants_num, total_aggregate_time, total_convert_data_variants_time, - ReadableSize(MemoryUtil::getCurrentMemoryUsage())); + ReadableSize(currentThreadGroupMemoryUsage())); } StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() @@ -84,7 +84,7 @@ StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() "Output one chunk. rows: {}, bytes: {}, current memory usage: {}", output_chunk.getNumRows(), ReadableSize(output_chunk.bytes()), - ReadableSize(MemoryUtil::getCurrentMemoryUsage())); + ReadableSize(currentThreadGroupMemoryUsage())); total_output_rows += output_chunk.getNumRows(); total_output_blocks++; if (!output_chunk.getNumRows()) @@ -127,7 +127,7 @@ StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() "Input one new chunk. rows: {}, bytes: {}, current memory usage: {}", input_chunk.getNumRows(), ReadableSize(input_chunk.bytes()), - ReadableSize(MemoryUtil::getCurrentMemoryUsage())); + ReadableSize(currentThreadGroupMemoryUsage())); total_input_rows += input_chunk.getNumRows(); total_input_blocks++; has_input = true; @@ -138,10 +138,10 @@ bool StreamingAggregatingTransform::needEvict() { if (input_finished) return true; - if (!context->getSettingsRef().max_memory_usage) + auto memory_soft_limit = DB::CurrentThread::getGroup()->memory_tracker.getSoftLimit(); + if (!memory_soft_limit) return false; - - auto max_mem_used = static_cast(context->getSettingsRef().max_memory_usage * max_allowed_memory_usage_ratio); + auto max_mem_used = static_cast(memory_soft_limit * max_allowed_memory_usage_ratio); auto current_result_rows = data_variants->size(); /// avoid evict empty or too small aggregated results. if (current_result_rows < aggregated_keys_before_evict) @@ -152,7 +152,7 @@ bool StreamingAggregatingTransform::needEvict() if (static_cast(total_output_rows)/total_input_rows > high_cardinality_threshold) return true; - auto current_mem_used = MemoryUtil::getCurrentMemoryUsage(); + auto current_mem_used = currentThreadGroupMemoryUsage(); if (per_key_memory_usage > 0) { /// When we know each key memory usage, we can take a more greedy memory usage strategy diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 6582e2eadbc29..a2ef0888aeff5 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -68,7 +68,6 @@ int64_t searchLastPartitionIdIndex(ColumnPtr column, size_t start, size_t partit bool PartitionWriter::worthToSpill(size_t cache_size) const { - // std::cerr << std::format("mem: {}, usage ratio : {}, spill_mem_ratio: {}, cache size: {}, spill_threashold: {}\n", currentThreadGroupMemoryUsage(), currentThreadGroupMemoryUsageRatio(), settings.spill_mem_ratio, cache_size, options->spill_threshold); return (options->spill_threshold > 0 && cache_size >= options->spill_threshold) || currentThreadGroupMemoryUsageRatio() > settings.spill_mem_ratio; } diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 739ea130fea3e..1dd3ad1156882 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1251,7 +1251,6 @@ JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JN JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_threadGroupPeakMemory(JNIEnv * env, jclass, jlong id) { LOCAL_ENGINE_JNI_METHOD_START - std::cerr << std::format("CHThreadGroup_threadGroupPeakMemory {}", id); return local_engine::QueryContextManager::instance().currentPeakMemory(id); LOCAL_ENGINE_JNI_METHOD_END(env, 0l) }