Skip to content

Commit

Permalink
Make query_map_ as QueryContextManager member
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Aug 28, 2024
1 parent 2934a6f commit d579ff4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
39 changes: 18 additions & 21 deletions cpp-ch/local-engine/Common/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
*/
#include "QueryContext.h"

#include <iomanip>
#include <sstream>
#include <Interpreters/Context.h>
#include <Parser/SerializedPlanParser.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadStatus.h>
#include <base/unit.h>
#include <Common/CHUtil.h>
#include <Common/GlutenConfig.h>
#include <Common/ConcurrentMap.h>
#include <base/unit.h>
#include <sstream>
#include <iomanip>

#include <Common/CurrentThread.h>
#include <Common/GlutenConfig.h>
#include <Common/ThreadStatus.h>

namespace DB
{
Expand All @@ -47,8 +46,6 @@ struct QueryContext
ContextMutablePtr query_context;
};

ConcurrentMap<int64_t, std::shared_ptr<QueryContext>> query_map;

int64_t QueryContextManager::initializeQuery()
{
std::shared_ptr<QueryContext> query_context = std::make_shared<QueryContext>();
Expand All @@ -71,15 +68,15 @@ int64_t QueryContextManager::initializeQuery()
query_context->thread_group->memory_tracker.setSoftLimit(memory_limit);
query_context->thread_group->memory_tracker.setHardLimit(memory_limit + config.extra_memory_hard_limit);
int64_t id = reinterpret_cast<int64_t>(query_context->thread_group.get());
query_map.insert(id, query_context);
query_map_.insert(id, query_context);
return id;
}

DB::ContextMutablePtr QueryContextManager::currentQueryContext()
{
auto thread_group = currentThreadGroup();
int64_t id = reinterpret_cast<int64_t>(CurrentThread::getGroup().get());
return query_map.get(id)->query_context;
const int64_t id = reinterpret_cast<int64_t>(CurrentThread::getGroup().get());
return query_map_.get(id)->query_context;
}

std::shared_ptr<DB::ThreadGroup> QueryContextManager::currentThreadGroup()
Expand All @@ -90,13 +87,13 @@ std::shared_ptr<DB::ThreadGroup> QueryContextManager::currentThreadGroup()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found.");
}

void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters & counters)
void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters & counters) const
{
if (!CurrentThread::getGroup())
{
return;
}
if (logger->information())
if (logger_->information())
{
std::ostringstream msg;
msg << "\n---------------------Task Performance Counters-----------------------------\n";
Expand All @@ -111,15 +108,15 @@ void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters
<< std::setw(20) << std::setfill(' ') << std::left << count.load()
<< " | (" << doc << ")\n";
}
LOG_INFO(logger, "{}", msg.str());
LOG_INFO(logger_, "{}", msg.str());
}
}

size_t QueryContextManager::currentPeakMemory(int64_t id)
{
if (!query_map.contains(id))
if (!query_map_.contains(id))
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "context released {}", id);
return query_map.get(id)->thread_group->memory_tracker.getPeak();
return query_map_.get(id)->thread_group->memory_tracker.getPeak();
}

void QueryContextManager::finalizeQuery(int64_t id)
Expand All @@ -130,7 +127,7 @@ void QueryContextManager::finalizeQuery(int64_t id)
}
std::shared_ptr<QueryContext> context;
{
context = query_map.get(id);
context = query_map_.get(id);
}
auto query_context = context->thread_status->getQueryContext();
if (!query_context)
Expand All @@ -139,19 +136,19 @@ void QueryContextManager::finalizeQuery(int64_t id)
}
context->thread_status->flushUntrackedMemory();
context->thread_status->finalizePerformanceCounters();
LOG_INFO(logger, "Task finished, peak memory usage: {} bytes", currentPeakMemory(id));
LOG_INFO(logger_, "Task finished, peak memory usage: {} bytes", currentPeakMemory(id));

if (currentThreadGroupMemoryUsage() > 1_MiB)
{
LOG_WARNING(logger, "{} bytes memory didn't release, There may be a memory leak!", currentThreadGroupMemoryUsage());
LOG_WARNING(logger_, "{} bytes memory didn't release, There may be a memory leak!", currentThreadGroupMemoryUsage());
}
logCurrentPerformanceCounters(context->thread_group->performance_counters);
context->thread_status->detachFromGroup();
context->thread_group.reset();
context->thread_status.reset();
query_context.reset();
{
query_map.erase(id);
query_map_.erase(id);
}
}

Expand Down
8 changes: 6 additions & 2 deletions cpp-ch/local-engine/Common/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
*/
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Common/ConcurrentMap.h>
#include <Common/ThreadStatus.h>

namespace local_engine
{
struct QueryContext;

class QueryContextManager
{
public:
Expand All @@ -31,13 +34,14 @@ class QueryContextManager
int64_t initializeQuery();
DB::ContextMutablePtr currentQueryContext();
static std::shared_ptr<DB::ThreadGroup> currentThreadGroup();
void logCurrentPerformanceCounters(ProfileEvents::Counters& counters);
void logCurrentPerformanceCounters(ProfileEvents::Counters & counters) const;
size_t currentPeakMemory(int64_t id);
void finalizeQuery(int64_t id);

private:
QueryContextManager() = default;
LoggerPtr logger = getLogger("QueryContextManager");
LoggerPtr logger_ = getLogger("QueryContextManager");
ConcurrentMap<int64_t, std::shared_ptr<QueryContext>> query_map_{};
};

size_t currentThreadGroupMemoryUsage();
Expand Down

0 comments on commit d579ff4

Please sign in to comment.