Skip to content

Commit

Permalink
Unify the way to obtain current memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Jul 24, 2024
1 parent 1f3ef17 commit 04c8ec6
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 31 deletions.
11 changes: 0 additions & 11 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1051,17 +1051,6 @@ String DateTimeUtil::convertTimeZone(const String & time_zone)
return res;
}

UInt64 MemoryUtil::getCurrentMemoryUsage(size_t depth)
{
Int64 current_memory_usage = 0;
auto * current_mem_tracker = DB::CurrentThread::getMemoryTracker();
for (size_t i = 0; i < depth && current_mem_tracker; ++i)
current_mem_tracker = current_mem_tracker->getParent();
if (current_mem_tracker)
current_memory_usage = current_mem_tracker->get();
return current_memory_usage < 0 ? 0 : current_memory_usage;
}

UInt64 MemoryUtil::getMemoryRSS()
{
long rss = 0L;
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ class DateTimeUtil
class MemoryUtil
{
public:
static UInt64 getCurrentMemoryUsage(size_t depth = 1);
static UInt64 getMemoryRSS();
};

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct GraceMergingAggregateConfig
bool throw_on_overflow_grace_aggregate_merging_buckets = false;
size_t aggregated_keys_before_extend_grace_aggregate_merging_buckets = 8192;
size_t max_pending_flush_blocks_per_grace_aggregate_merging_bucket = 1_MiB;
size_t max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9;
double max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9;

static GraceMergingAggregateConfig loadFromContext(DB::ContextPtr context)
{
Expand Down
19 changes: 11 additions & 8 deletions cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Common/formatReadable.h>
#include <Common/BitHelpers.h>
#include <Common/GlutenConfig.h>
#include <Common/QueryContext.h>

namespace DB
{
Expand Down Expand Up @@ -162,7 +163,7 @@ GraceMergingAggregatedTransform::Status GraceMergingAggregatedTransform::prepare
"Output one chunk. rows: {}, bytes: {}, current memory usage: {}",
output_chunk.getNumRows(),
ReadableSize(output_chunk.bytes()),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
ReadableSize(currentThreadGroupMemoryUsage()));
total_output_rows += output_chunk.getNumRows();
total_output_blocks++;
output.push(std::move(output_chunk));
Expand Down Expand Up @@ -191,7 +192,7 @@ GraceMergingAggregatedTransform::Status GraceMergingAggregatedTransform::prepare
"Input one new chunk. rows: {}, bytes: {}, current memory usage: {}",
input_chunk.getNumRows(),
ReadableSize(input_chunk.bytes()),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
ReadableSize(currentThreadGroupMemoryUsage()));
total_input_rows += input_chunk.getNumRows();
total_input_blocks++;
has_input = true;
Expand Down Expand Up @@ -279,7 +280,7 @@ bool GraceMergingAggregatedTransform::extendBuckets()

void GraceMergingAggregatedTransform::rehashDataVariants()
{
auto before_memoery_usage = MemoryUtil::getCurrentMemoryUsage();
auto before_memoery_usage = currentThreadGroupMemoryUsage();

auto converter = currentDataVariantToBlockConverter(false);
checkAndSetupCurrentDataVariants();
Expand Down Expand Up @@ -320,7 +321,7 @@ void GraceMergingAggregatedTransform::rehashDataVariants()
current_bucket_index,
getBucketsNum(),
ReadableSize(before_memoery_usage),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
ReadableSize(currentThreadGroupMemoryUsage()));
};

DB::Blocks GraceMergingAggregatedTransform::scatterBlock(const DB::Block & block)
Expand Down Expand Up @@ -541,7 +542,7 @@ void GraceMergingAggregatedTransform::mergeOneBlock(const DB::Block &block, bool
block.info.bucket_num,
current_bucket_index,
getBucketsNum(),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
ReadableSize(currentThreadGroupMemoryUsage()));

/// the block could be one read from disk. block.info.bucket_num stores the number of buckets when it was scattered.
/// so if the buckets number is not changed since it was scattered, we don't need to scatter it again.
Expand Down Expand Up @@ -592,11 +593,13 @@ bool GraceMergingAggregatedTransform::isMemoryOverflow()
/// More greedy memory usage strategy.
if (!current_data_variants)
return false;
if (!context->getSettingsRef().max_memory_usage)

auto memory_soft_limit = DB::CurrentThread::getGroup()->memory_tracker.getSoftLimit();
if (!memory_soft_limit)
return false;
auto max_mem_used = static_cast<size_t>(context->getSettingsRef().max_memory_usage * max_allowed_memory_usage_ratio);
auto max_mem_used = static_cast<size_t>(memory_soft_limit * max_allowed_memory_usage_ratio);
auto current_result_rows = current_data_variants->size();
auto current_mem_used = MemoryUtil::getCurrentMemoryUsage();
auto current_mem_used = currentThreadGroupMemoryUsage();
if (per_key_memory_usage > 0)
{
if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used)
Expand Down
16 changes: 8 additions & 8 deletions cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <Processors/Transforms/AggregatingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/CHUtil.h>
#include <Common/CurrentThread.h>
#include <Common/formatReadable.h>
#include <Common/GlutenConfig.h>
#include <Common/QueryContext.h>
#include <Common/Stopwatch.h>

namespace DB
Expand Down Expand Up @@ -62,7 +62,7 @@ StreamingAggregatingTransform::~StreamingAggregatingTransform()
total_clear_data_variants_num,
total_aggregate_time,
total_convert_data_variants_time,
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
ReadableSize(currentThreadGroupMemoryUsage()));
}

StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare()
Expand All @@ -84,7 +84,7 @@ StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare()
"Output one chunk. rows: {}, bytes: {}, current memory usage: {}",
output_chunk.getNumRows(),
ReadableSize(output_chunk.bytes()),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
ReadableSize(currentThreadGroupMemoryUsage()));
total_output_rows += output_chunk.getNumRows();
total_output_blocks++;
if (!output_chunk.getNumRows())
Expand Down Expand Up @@ -127,7 +127,7 @@ StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare()
"Input one new chunk. rows: {}, bytes: {}, current memory usage: {}",
input_chunk.getNumRows(),
ReadableSize(input_chunk.bytes()),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
ReadableSize(currentThreadGroupMemoryUsage()));
total_input_rows += input_chunk.getNumRows();
total_input_blocks++;
has_input = true;
Expand All @@ -138,10 +138,10 @@ bool StreamingAggregatingTransform::needEvict()
{
if (input_finished)
return true;
if (!context->getSettingsRef().max_memory_usage)
auto memory_soft_limit = DB::CurrentThread::getGroup()->memory_tracker.getSoftLimit();
if (!memory_soft_limit)
return false;

auto max_mem_used = static_cast<size_t>(context->getSettingsRef().max_memory_usage * max_allowed_memory_usage_ratio);
auto max_mem_used = static_cast<size_t>(memory_soft_limit * max_allowed_memory_usage_ratio);
auto current_result_rows = data_variants->size();
/// avoid evict empty or too small aggregated results.
if (current_result_rows < aggregated_keys_before_evict)
Expand All @@ -152,7 +152,7 @@ bool StreamingAggregatingTransform::needEvict()
if (static_cast<double>(total_output_rows)/total_input_rows > high_cardinality_threshold)
return true;

auto current_mem_used = MemoryUtil::getCurrentMemoryUsage();
auto current_mem_used = currentThreadGroupMemoryUsage();
if (per_key_memory_usage > 0)
{
/// When we know each key memory usage, we can take a more greedy memory usage strategy
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ int64_t searchLastPartitionIdIndex(ColumnPtr column, size_t start, size_t partit

bool PartitionWriter::worthToSpill(size_t cache_size) const
{
// std::cerr << std::format("mem: {}, usage ratio : {}, spill_mem_ratio: {}, cache size: {}, spill_threashold: {}\n", currentThreadGroupMemoryUsage(), currentThreadGroupMemoryUsageRatio(), settings.spill_mem_ratio, cache_size, options->spill_threshold);
return (options->spill_threshold > 0 && cache_size >= options->spill_threshold) ||
currentThreadGroupMemoryUsageRatio() > settings.spill_mem_ratio;
}
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,6 @@ JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JN
JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_threadGroupPeakMemory(JNIEnv * env, jclass, jlong id)
{
LOCAL_ENGINE_JNI_METHOD_START
std::cerr << std::format("CHThreadGroup_threadGroupPeakMemory {}", id);
return local_engine::QueryContextManager::instance().currentPeakMemory(id);
LOCAL_ENGINE_JNI_METHOD_END(env, 0l)
}
Expand Down

0 comments on commit 04c8ec6

Please sign in to comment.