Skip to content

Commit

Permalink
Rename QueryContextManager => QueryContext (apache#7147)
Browse files Browse the repository at this point in the history
Move SerializedPlanParser::global_context to  QueryContext:Data
Move SerializedPlanParser::shared_context to QueryContext::Data
Remove SerializedPlanParser config
Cleanup #include <Parser/SerializedPlanParser.h>
  • Loading branch information
baibaichen authored and hengzhen.sq committed Sep 11, 2024
1 parent 42a8c5c commit ac8273d
Show file tree
Hide file tree
Showing 52 changed files with 322 additions and 345 deletions.
28 changes: 8 additions & 20 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/registerFunctions.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/SharedThreadPools.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Parser/RelParser.h>
Expand All @@ -73,6 +72,7 @@
#include <Common/CurrentThread.h>
#include <Common/GlutenSignalHandler.h>
#include <Common/LoggerExtend.h>
#include <Common/QueryContext.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>

Expand Down Expand Up @@ -815,14 +815,9 @@ void BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
{
/// Make sure global_context and shared_context are constructed only once.
auto & shared_context = SerializedPlanParser::shared_context;
if (!shared_context.get())
shared_context = SharedContextHolder(Context::createShared());

auto & global_context = SerializedPlanParser::global_context;
if (!global_context)
if (auto global_context = QueryContext::globalMutableContext(); !global_context)
{
global_context = Context::createGlobal(shared_context.get());
global_context = QueryContext::createGlobal();
global_context->makeGlobalContext();
global_context->setConfig(config);

Expand Down Expand Up @@ -878,9 +873,9 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
}
}

void BackendInitializerUtil::applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr config, DB::Settings & settings)
void BackendInitializerUtil::applyGlobalConfigAndSettings(const DB::Context::ConfigurationPtr & config, const DB::Settings & settings)
{
auto & global_context = SerializedPlanParser::global_context;
const auto global_context = QueryContext::globalMutableContext();
global_context->setConfig(config);
global_context->setSettings(settings);
}
Expand Down Expand Up @@ -974,8 +969,8 @@ void BackendInitializerUtil::init(const std::string_view plan)
// Init the table metadata cache map
StorageMergeTreeFactory::init_cache_map();

JobScheduler::initialize(SerializedPlanParser::global_context);
CacheManager::initialize(SerializedPlanParser::global_context);
JobScheduler::initialize(QueryContext::globalContext());
CacheManager::initialize(QueryContext::globalMutableContext());

std::call_once(
init_flag,
Expand Down Expand Up @@ -1025,14 +1020,7 @@ void BackendFinalizerUtil::finalizeGlobally()
// Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear();
auto & global_context = SerializedPlanParser::global_context;
auto & shared_context = SerializedPlanParser::shared_context;
if (global_context)
{
global_context->shutdown();
global_context.reset();
shared_context.reset();
}
QueryContext::resetGlobal();
std::lock_guard lock(paths_mutex);
std::ranges::for_each(paths_need_to_clean, [](const auto & path)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class BackendInitializerUtil
static void initContexts(DB::Context::ConfigurationPtr config);
static void initCompiledExpressionCache(DB::Context::ConfigurationPtr config);
static void registerAllFactories();
static void applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr, DB::Settings &);
static void applyGlobalConfigAndSettings(const DB::Context::ConfigurationPtr & config, const DB::Settings & settings);
static void updateNewSettings(const DB::ContextMutablePtr &, const DB::Settings &);
static std::vector<String>
wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config);
Expand Down
78 changes: 51 additions & 27 deletions cpp-ch/local-engine/Common/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <iomanip>
#include <sstream>
#include <Interpreters/Context.h>
#include <Parser/SerializedPlanParser.h>
#include <base/unit.h>
#include <Common/CHUtil.h>
#include <Common/ConcurrentMap.h>
Expand All @@ -35,21 +34,59 @@ extern const int LOGICAL_ERROR;
}
}

using namespace DB;

namespace local_engine
{
using namespace DB;

struct QueryContext
struct QueryContext::Data
{
std::shared_ptr<ThreadStatus> thread_status;
std::shared_ptr<ThreadGroup> thread_group;
ContextMutablePtr query_context;

static DB::ContextMutablePtr global_context;
static SharedContextHolder shared_context;
};

int64_t QueryContextManager::initializeQuery()
ContextMutablePtr QueryContext::Data::global_context{};
SharedContextHolder QueryContext::Data::shared_context{};

DB::ContextMutablePtr QueryContext::globalMutableContext()
{
return Data::global_context;
}
void QueryContext::resetGlobal()
{
if (Data::global_context)
{
Data::global_context->shutdown();
Data::global_context.reset();
}
Data::shared_context.reset();
}

DB::ContextMutablePtr QueryContext::createGlobal()
{
assert(Data::shared_context.get() == nullptr);

if (!Data::shared_context.get())
Data::shared_context = SharedContextHolder(Context::createShared());

assert(Data::global_context == nullptr);
Data::global_context = Context::createGlobal(Data::shared_context.get());
return globalMutableContext();
}

DB::ContextPtr QueryContext::globalContext()
{
return Data::global_context;
}

int64_t QueryContext::initializeQuery()
{
std::shared_ptr<QueryContext> query_context = std::make_shared<QueryContext>();
query_context->query_context = Context::createCopy(SerializedPlanParser::global_context);
std::shared_ptr<Data> query_context = std::make_shared<Data>();
query_context->query_context = Context::createCopy(globalContext());
query_context->query_context->makeQueryContext();

// empty input will trigger random query id to be set
Expand All @@ -72,27 +109,25 @@ int64_t QueryContextManager::initializeQuery()
return id;
}

DB::ContextMutablePtr QueryContextManager::currentQueryContext()
DB::ContextMutablePtr QueryContext::currentQueryContext()
{
auto thread_group = currentThreadGroup();
const int64_t id = reinterpret_cast<int64_t>(CurrentThread::getGroup().get());
return query_map_.get(id)->query_context;
}

std::shared_ptr<DB::ThreadGroup> QueryContextManager::currentThreadGroup()
std::shared_ptr<DB::ThreadGroup> QueryContext::currentThreadGroup()
{
if (auto thread_group = CurrentThread::getGroup())
return thread_group;

throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found.");
}

void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters & counters) const
void QueryContext::logCurrentPerformanceCounters(ProfileEvents::Counters & counters) const
{
if (!CurrentThread::getGroup())
{
return;
}
if (logger_->information())
{
std::ostringstream msg;
Expand All @@ -104,44 +139,37 @@ void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters
auto & count = counters[event];
if (count == 0)
continue;
msg << std::setw(50) << std::setfill(' ') << std::left << name << "|"
<< std::setw(20) << std::setfill(' ') << std::left << count.load()
<< " | (" << doc << ")\n";
msg << std::setw(50) << std::setfill(' ') << std::left << name << "|" << std::setw(20) << std::setfill(' ') << std::left
<< count.load() << " | (" << doc << ")\n";
}
LOG_INFO(logger_, "{}", msg.str());
}
}

size_t QueryContextManager::currentPeakMemory(int64_t id)
size_t QueryContext::currentPeakMemory(int64_t id)
{
if (!query_map_.contains(id))
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "context released {}", id);
return query_map_.get(id)->thread_group->memory_tracker.getPeak();
}

void QueryContextManager::finalizeQuery(int64_t id)
void QueryContext::finalizeQuery(int64_t id)
{
if (!CurrentThread::getGroup())
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found.");
}
std::shared_ptr<QueryContext> context;
std::shared_ptr<Data> context;
{
context = query_map_.get(id);
}
auto query_context = context->thread_status->getQueryContext();
if (!query_context)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "query context not found");
}
context->thread_status->flushUntrackedMemory();
context->thread_status->finalizePerformanceCounters();
LOG_INFO(logger_, "Task finished, peak memory usage: {} bytes", currentPeakMemory(id));

if (currentThreadGroupMemoryUsage() > 1_MiB)
{
LOG_WARNING(logger_, "{} bytes memory didn't release, There may be a memory leak!", currentThreadGroupMemoryUsage());
}
logCurrentPerformanceCounters(context->thread_group->performance_counters);
context->thread_status->detachFromGroup();
context->thread_group.reset();
Expand All @@ -155,18 +183,14 @@ void QueryContextManager::finalizeQuery(int64_t id)
size_t currentThreadGroupMemoryUsage()
{
if (!CurrentThread::getGroup())
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found, please call initializeQuery first.");
}
return CurrentThread::getGroup()->memory_tracker.get();
}

double currentThreadGroupMemoryUsageRatio()
{
if (!CurrentThread::getGroup())
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found, please call initializeQuery first.");
}
return static_cast<double>(CurrentThread::getGroup()->memory_tracker.get()) / CurrentThread::getGroup()->memory_tracker.getSoftLimit();
}
}
21 changes: 15 additions & 6 deletions cpp-ch/local-engine/Common/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,25 @@
#include <Common/ConcurrentMap.h>
#include <Common/ThreadStatus.h>

namespace DB
{
struct ContextSharedPart;
}
namespace local_engine
{
struct QueryContext;

class QueryContextManager
class QueryContext
{
struct Data;

public:
static QueryContextManager & instance()
static DB::ContextMutablePtr createGlobal();
static void resetGlobal();
static DB::ContextMutablePtr globalMutableContext();
static DB::ContextPtr globalContext();
static QueryContext & instance()
{
static QueryContextManager instance;
static QueryContext instance;
return instance;
}
int64_t initializeQuery();
Expand All @@ -39,9 +48,9 @@ class QueryContextManager
void finalizeQuery(int64_t id);

private:
QueryContextManager() = default;
QueryContext() = default;
LoggerPtr logger_ = getLogger("QueryContextManager");
ConcurrentMap<int64_t, std::shared_ptr<QueryContext>> query_map_{};
ConcurrentMap<int64_t, std::shared_ptr<Data>> query_map_{};
};

size_t currentThreadGroupMemoryUsage();
Expand Down
9 changes: 4 additions & 5 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

#include "GlutenDiskHDFS.h"
#include <ranges>

#include <Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h>
#include <Common/QueryContext.h>
#include <Common/Throttler.h>
#include <Parser/SerializedPlanParser.h>

#include "CompactObjectStorageDiskTransaction.h"
#if USE_HDFS

namespace local_engine
Expand All @@ -30,7 +29,7 @@ using namespace DB;

DiskTransactionPtr GlutenDiskHDFS::createTransaction()
{
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk());
}

void GlutenDiskHDFS::createDirectory(const String & path)
Expand Down Expand Up @@ -78,7 +77,7 @@ DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
object_key_prefix,
getMetadataStorage(),
getObjectStorage(),
SerializedPlanParser::global_context->getConfigRef(),
QueryContext::globalContext()->getConfigRef(),
config_prefix,
object_storage_creator);
}
Expand Down
14 changes: 8 additions & 6 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@

#pragma once


#include "GlutenDiskS3.h"
#include <Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Parser/SerializedPlanParser.h>
#include "CompactObjectStorageDiskTransaction.h"
#include <Interpreters/Context.h>
#include <Common/QueryContext.h>

using namespace DB;

#if USE_AWS_S3
namespace local_engine
{

DB::DiskTransactionPtr GlutenDiskS3::createTransaction()
{
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk());
}

std::unique_ptr<ReadBufferFromFileBase> GlutenDiskS3::readFile(
std::unique_ptr<DB::ReadBufferFromFileBase> GlutenDiskS3::readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
Expand All @@ -52,7 +54,7 @@ namespace local_engine
object_key_prefix,
getMetadataStorage(),
getObjectStorage(),
SerializedPlanParser::global_context->getConfigRef(),
QueryContext::globalContext()->getConfigRef(),
config_prefix,
object_storage_creator);
}
Expand Down
13 changes: 5 additions & 8 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@


#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Parser/SerializedPlanParser.h>
#include "CompactObjectStorageDiskTransaction.h"


#if USE_AWS_S3
namespace local_engine
Expand All @@ -41,13 +40,11 @@ class GlutenDiskS3 : public DB::DiskObjectStorage

DB::DiskTransactionPtr createTransaction() override;

std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;
std::unique_ptr<DB::ReadBufferFromFileBase>
readFile(const String & path, const DB::ReadSettings & settings, std::optional<size_t> read_hint, std::optional<size_t> file_size)
const override;

DiskObjectStoragePtr createDiskObjectStorage() override;
DB::DiskObjectStoragePtr createDiskObjectStorage() override;

private:
std::function<DB::ObjectStoragePtr(const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)> object_storage_creator;
Expand Down
Loading

0 comments on commit ac8273d

Please sign in to comment.