From 5915c9b6bca2e3ca2412f3ed29a5f5b418a21602 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Mon, 9 Sep 2024 10:20:21 +0800 Subject: [PATCH] Rename QueryContextManager => QueryContext (#7147) Move SerializedPlanParser::global_context to QueryContext:Data Move SerializedPlanParser::shared_context to QueryContext::Data Remove SerializedPlanParser config Cleanup #include --- cpp-ch/local-engine/Common/CHUtil.cpp | 28 ++----- cpp-ch/local-engine/Common/CHUtil.h | 2 +- cpp-ch/local-engine/Common/QueryContext.cpp | 78 ++++++++++++------- cpp-ch/local-engine/Common/QueryContext.h | 21 +++-- .../Disks/ObjectStorages/GlutenDiskHDFS.cpp | 9 +-- .../Disks/ObjectStorages/GlutenDiskS3.cpp | 14 ++-- .../Disks/ObjectStorages/GlutenDiskS3.h | 13 ++-- cpp-ch/local-engine/Parser/CrossRelParser.cpp | 17 ++-- .../local-engine/Parser/ExpandRelParser.cpp | 2 - cpp-ch/local-engine/Parser/ExpandRelParser.h | 3 +- .../Parser/MergeTreeRelParser.cpp | 4 +- .../local-engine/Parser/MergeTreeRelParser.h | 7 +- cpp-ch/local-engine/Parser/RelMetric.cpp | 6 +- .../Parser/SerializedPlanParser.cpp | 6 -- .../Parser/SerializedPlanParser.h | 22 ++---- cpp-ch/local-engine/Parser/TypeParser.cpp | 5 +- cpp-ch/local-engine/Parser/TypeParser.h | 2 +- cpp-ch/local-engine/Parser/WriteRelParser.cpp | 1 + .../Parser/example_udf/tests/gtest_my_add.cpp | 5 +- .../Parser/example_udf/tests/gtest_my_md5.cpp | 5 +- cpp-ch/local-engine/Rewriter/RelRewriter.h | 21 ++--- .../local-engine/Shuffle/PartitionWriter.cpp | 10 +-- cpp-ch/local-engine/Shuffle/PartitionWriter.h | 17 ++-- .../local-engine/Shuffle/SelectorBuilder.cpp | 6 +- .../Storages/Cache/CacheManager.cpp | 3 +- .../Storages/Cache/CacheManager.h | 2 +- .../Storages/Cache/JobScheduler.cpp | 4 +- .../Storages/Cache/JobScheduler.h | 6 +- .../Storages/MergeTree/MetaDataHelper.cpp | 4 +- .../MergeTree/StorageMergeTreeFactory.cpp | 2 +- .../MergeTree/StorageMergeTreeFactory.h | 5 +- .../Storages/Output/FileWriterWrappers.cpp | 1 + .../Storages/Output/FileWriterWrappers.h | 51 ++++++------ cpp-ch/local-engine/local_engine_jni.cpp | 22 +++--- .../tests/benchmark_cast_float_function.cpp | 19 +++-- .../tests/benchmark_local_engine.cpp | 24 ++---- .../tests/benchmark_parquet_read.cpp | 17 ++-- .../tests/benchmark_spark_divide_function.cpp | 5 +- .../tests/benchmark_spark_floor_function.cpp | 16 ++-- .../tests/benchmark_spark_row.cpp | 5 +- .../tests/benchmark_to_datetime_function.cpp | 18 ++--- .../benchmark_unix_timestamp_function.cpp | 32 ++++---- .../local-engine/tests/gluten_test_util.cpp | 6 +- cpp-ch/local-engine/tests/gluten_test_util.h | 5 +- .../local-engine/tests/gtest_ch_functions.cpp | 20 ++--- cpp-ch/local-engine/tests/gtest_ch_join.cpp | 20 ++--- .../local-engine/tests/gtest_ch_storages.cpp | 16 ++-- .../tests/gtest_clickhouse_pr_verify.cpp | 7 +- .../tests/gtest_parquet_columnindex.cpp | 18 ++--- .../tests/gtest_parquet_columnindex_bug.cpp | 5 +- cpp-ch/local-engine/tests/gtest_parser.cpp | 5 +- .../tests/gtest_write_pipeline.cpp | 25 +++--- 52 files changed, 322 insertions(+), 345 deletions(-) diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index b787888f5c17..68ed66c45f6b 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -46,7 +46,6 @@ #include #include #include -#include #include #include #include @@ -73,6 +72,7 @@ #include #include #include +#include #include #include @@ -815,14 +815,9 @@ void BackendInitializerUtil::initSettings(std::map & b void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) { /// Make sure global_context and shared_context are constructed only once. - auto & shared_context = SerializedPlanParser::shared_context; - if (!shared_context.get()) - shared_context = SharedContextHolder(Context::createShared()); - - auto & global_context = SerializedPlanParser::global_context; - if (!global_context) + if (auto global_context = QueryContext::globalMutableContext(); !global_context) { - global_context = Context::createGlobal(shared_context.get()); + global_context = QueryContext::createGlobal(); global_context->makeGlobalContext(); global_context->setConfig(config); @@ -878,9 +873,9 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) } } -void BackendInitializerUtil::applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr config, DB::Settings & settings) +void BackendInitializerUtil::applyGlobalConfigAndSettings(const DB::Context::ConfigurationPtr & config, const DB::Settings & settings) { - auto & global_context = SerializedPlanParser::global_context; + const auto global_context = QueryContext::globalMutableContext(); global_context->setConfig(config); global_context->setSettings(settings); } @@ -974,8 +969,8 @@ void BackendInitializerUtil::init(const std::string_view plan) // Init the table metadata cache map StorageMergeTreeFactory::init_cache_map(); - JobScheduler::initialize(SerializedPlanParser::global_context); - CacheManager::initialize(SerializedPlanParser::global_context); + JobScheduler::initialize(QueryContext::globalContext()); + CacheManager::initialize(QueryContext::globalMutableContext()); std::call_once( init_flag, @@ -1025,14 +1020,7 @@ void BackendFinalizerUtil::finalizeGlobally() // Make sure client caches release before ClientCacheRegistry ReadBufferBuilderFactory::instance().clean(); StorageMergeTreeFactory::clear(); - auto & global_context = SerializedPlanParser::global_context; - auto & shared_context = SerializedPlanParser::shared_context; - if (global_context) - { - global_context->shutdown(); - global_context.reset(); - shared_context.reset(); - } + QueryContext::resetGlobal(); std::lock_guard lock(paths_mutex); std::ranges::for_each(paths_need_to_clean, [](const auto & path) { diff --git a/cpp-ch/local-engine/Common/CHUtil.h b/cpp-ch/local-engine/Common/CHUtil.h index 3c741c7ffa22..2e0b7266cd94 100644 --- a/cpp-ch/local-engine/Common/CHUtil.h +++ b/cpp-ch/local-engine/Common/CHUtil.h @@ -213,7 +213,7 @@ class BackendInitializerUtil static void initContexts(DB::Context::ConfigurationPtr config); static void initCompiledExpressionCache(DB::Context::ConfigurationPtr config); static void registerAllFactories(); - static void applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr, DB::Settings &); + static void applyGlobalConfigAndSettings(const DB::Context::ConfigurationPtr & config, const DB::Settings & settings); static void updateNewSettings(const DB::ContextMutablePtr &, const DB::Settings &); static std::vector wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config); diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index 7cd96f4b1c07..142738aa3d01 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -35,21 +34,59 @@ extern const int LOGICAL_ERROR; } } +using namespace DB; + namespace local_engine { -using namespace DB; -struct QueryContext +struct QueryContext::Data { std::shared_ptr thread_status; std::shared_ptr thread_group; ContextMutablePtr query_context; + + static DB::ContextMutablePtr global_context; + static SharedContextHolder shared_context; }; -int64_t QueryContextManager::initializeQuery() +ContextMutablePtr QueryContext::Data::global_context{}; +SharedContextHolder QueryContext::Data::shared_context{}; + +DB::ContextMutablePtr QueryContext::globalMutableContext() +{ + return Data::global_context; +} +void QueryContext::resetGlobal() +{ + if (Data::global_context) + { + Data::global_context->shutdown(); + Data::global_context.reset(); + } + Data::shared_context.reset(); +} + +DB::ContextMutablePtr QueryContext::createGlobal() +{ + assert(Data::shared_context.get() == nullptr); + + if (!Data::shared_context.get()) + Data::shared_context = SharedContextHolder(Context::createShared()); + + assert(Data::global_context == nullptr); + Data::global_context = Context::createGlobal(Data::shared_context.get()); + return globalMutableContext(); +} + +DB::ContextPtr QueryContext::globalContext() +{ + return Data::global_context; +} + +int64_t QueryContext::initializeQuery() { - std::shared_ptr query_context = std::make_shared(); - query_context->query_context = Context::createCopy(SerializedPlanParser::global_context); + std::shared_ptr query_context = std::make_shared(); + query_context->query_context = Context::createCopy(globalContext()); query_context->query_context->makeQueryContext(); // empty input will trigger random query id to be set @@ -72,14 +109,14 @@ int64_t QueryContextManager::initializeQuery() return id; } -DB::ContextMutablePtr QueryContextManager::currentQueryContext() +DB::ContextMutablePtr QueryContext::currentQueryContext() { auto thread_group = currentThreadGroup(); const int64_t id = reinterpret_cast(CurrentThread::getGroup().get()); return query_map_.get(id)->query_context; } -std::shared_ptr QueryContextManager::currentThreadGroup() +std::shared_ptr QueryContext::currentThreadGroup() { if (auto thread_group = CurrentThread::getGroup()) return thread_group; @@ -87,12 +124,10 @@ std::shared_ptr QueryContextManager::currentThreadGroup() throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found."); } -void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters & counters) const +void QueryContext::logCurrentPerformanceCounters(ProfileEvents::Counters & counters) const { if (!CurrentThread::getGroup()) - { return; - } if (logger_->information()) { std::ostringstream msg; @@ -104,44 +139,37 @@ void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters auto & count = counters[event]; if (count == 0) continue; - msg << std::setw(50) << std::setfill(' ') << std::left << name << "|" - << std::setw(20) << std::setfill(' ') << std::left << count.load() - << " | (" << doc << ")\n"; + msg << std::setw(50) << std::setfill(' ') << std::left << name << "|" << std::setw(20) << std::setfill(' ') << std::left + << count.load() << " | (" << doc << ")\n"; } LOG_INFO(logger_, "{}", msg.str()); } } -size_t QueryContextManager::currentPeakMemory(int64_t id) +size_t QueryContext::currentPeakMemory(int64_t id) { if (!query_map_.contains(id)) throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "context released {}", id); return query_map_.get(id)->thread_group->memory_tracker.getPeak(); } -void QueryContextManager::finalizeQuery(int64_t id) +void QueryContext::finalizeQuery(int64_t id) { if (!CurrentThread::getGroup()) - { throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found."); - } - std::shared_ptr context; + std::shared_ptr context; { context = query_map_.get(id); } auto query_context = context->thread_status->getQueryContext(); if (!query_context) - { throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "query context not found"); - } context->thread_status->flushUntrackedMemory(); context->thread_status->finalizePerformanceCounters(); LOG_INFO(logger_, "Task finished, peak memory usage: {} bytes", currentPeakMemory(id)); if (currentThreadGroupMemoryUsage() > 1_MiB) - { LOG_WARNING(logger_, "{} bytes memory didn't release, There may be a memory leak!", currentThreadGroupMemoryUsage()); - } logCurrentPerformanceCounters(context->thread_group->performance_counters); context->thread_status->detachFromGroup(); context->thread_group.reset(); @@ -155,18 +183,14 @@ void QueryContextManager::finalizeQuery(int64_t id) size_t currentThreadGroupMemoryUsage() { if (!CurrentThread::getGroup()) - { throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found, please call initializeQuery first."); - } return CurrentThread::getGroup()->memory_tracker.get(); } double currentThreadGroupMemoryUsageRatio() { if (!CurrentThread::getGroup()) - { throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found, please call initializeQuery first."); - } return static_cast(CurrentThread::getGroup()->memory_tracker.get()) / CurrentThread::getGroup()->memory_tracker.getSoftLimit(); } } diff --git a/cpp-ch/local-engine/Common/QueryContext.h b/cpp-ch/local-engine/Common/QueryContext.h index 5079589f46b0..821144f5fcb6 100644 --- a/cpp-ch/local-engine/Common/QueryContext.h +++ b/cpp-ch/local-engine/Common/QueryContext.h @@ -19,16 +19,25 @@ #include #include +namespace DB +{ +struct ContextSharedPart; +} namespace local_engine { -struct QueryContext; -class QueryContextManager +class QueryContext { + struct Data; + public: - static QueryContextManager & instance() + static DB::ContextMutablePtr createGlobal(); + static void resetGlobal(); + static DB::ContextMutablePtr globalMutableContext(); + static DB::ContextPtr globalContext(); + static QueryContext & instance() { - static QueryContextManager instance; + static QueryContext instance; return instance; } int64_t initializeQuery(); @@ -39,9 +48,9 @@ class QueryContextManager void finalizeQuery(int64_t id); private: - QueryContextManager() = default; + QueryContext() = default; LoggerPtr logger_ = getLogger("QueryContextManager"); - ConcurrentMap> query_map_{}; + ConcurrentMap> query_map_{}; }; size_t currentThreadGroupMemoryUsage(); diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp index 9c4b390ea8b0..bd005132b9b9 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp @@ -17,11 +17,10 @@ #include "GlutenDiskHDFS.h" #include - +#include +#include #include -#include -#include "CompactObjectStorageDiskTransaction.h" #if USE_HDFS namespace local_engine @@ -30,7 +29,7 @@ using namespace DB; DiskTransactionPtr GlutenDiskHDFS::createTransaction() { - return std::make_shared(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk()); + return std::make_shared(*this, QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk()); } void GlutenDiskHDFS::createDirectory(const String & path) @@ -78,7 +77,7 @@ DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage() object_key_prefix, getMetadataStorage(), getObjectStorage(), - SerializedPlanParser::global_context->getConfigRef(), + QueryContext::globalContext()->getConfigRef(), config_prefix, object_storage_creator); } diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp index 4a73c5a4998b..b2a6bb523d3a 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp @@ -17,11 +17,13 @@ #pragma once - #include "GlutenDiskS3.h" +#include #include -#include -#include "CompactObjectStorageDiskTransaction.h" +#include +#include + +using namespace DB; #if USE_AWS_S3 namespace local_engine @@ -29,10 +31,10 @@ namespace local_engine DB::DiskTransactionPtr GlutenDiskS3::createTransaction() { - return std::make_shared(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk()); + return std::make_shared(*this, QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk()); } - std::unique_ptr GlutenDiskS3::readFile( + std::unique_ptr GlutenDiskS3::readFile( const String & path, const ReadSettings & settings, std::optional read_hint, @@ -52,7 +54,7 @@ namespace local_engine object_key_prefix, getMetadataStorage(), getObjectStorage(), - SerializedPlanParser::global_context->getConfigRef(), + QueryContext::globalContext()->getConfigRef(), config_prefix, object_storage_creator); } diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h index 4f0d7a029529..00a463b6d996 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h +++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h @@ -19,8 +19,7 @@ #include -#include -#include "CompactObjectStorageDiskTransaction.h" + #if USE_AWS_S3 namespace local_engine @@ -41,13 +40,11 @@ class GlutenDiskS3 : public DB::DiskObjectStorage DB::DiskTransactionPtr createTransaction() override; - std::unique_ptr readFile( - const String & path, - const ReadSettings & settings, - std::optional read_hint, - std::optional file_size) const override; + std::unique_ptr + readFile(const String & path, const DB::ReadSettings & settings, std::optional read_hint, std::optional file_size) + const override; - DiskObjectStoragePtr createDiskObjectStorage() override; + DB::DiskObjectStoragePtr createDiskObjectStorage() override; private: std::function object_storage_creator; diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.cpp b/cpp-ch/local-engine/Parser/CrossRelParser.cpp index 454f0387f094..3cb6ff7ede61 100644 --- a/cpp-ch/local-engine/Parser/CrossRelParser.cpp +++ b/cpp-ch/local-engine/Parser/CrossRelParser.cpp @@ -16,45 +16,40 @@ */ #include "CrossRelParser.h" -#include -#include #include #include #include #include #include #include -#include #include +#include #include #include #include #include #include #include +#include #include - namespace DB { namespace ErrorCodes { - extern const int LOGICAL_ERROR; - extern const int UNKNOWN_TYPE; - extern const int BAD_ARGUMENTS; +extern const int LOGICAL_ERROR; +extern const int UNKNOWN_TYPE; +extern const int BAD_ARGUMENTS; } } using namespace DB; - - - namespace local_engine { std::shared_ptr createCrossTableJoin(substrait::CrossRel_JoinType join_type) { - auto & global_context = SerializedPlanParser::global_context; + auto global_context = QueryContext::globalContext(); auto table_join = std::make_shared( global_context->getSettingsRef(), global_context->getGlobalTemporaryVolume(), global_context->getTempDataOnDisk()); diff --git a/cpp-ch/local-engine/Parser/ExpandRelParser.cpp b/cpp-ch/local-engine/Parser/ExpandRelParser.cpp index 960c9eba1e3d..c621332db662 100644 --- a/cpp-ch/local-engine/Parser/ExpandRelParser.cpp +++ b/cpp-ch/local-engine/Parser/ExpandRelParser.cpp @@ -22,9 +22,7 @@ #include #include #include -#include #include -#include #include namespace DB diff --git a/cpp-ch/local-engine/Parser/ExpandRelParser.h b/cpp-ch/local-engine/Parser/ExpandRelParser.h index 449515001f52..1ca7cc8149af 100644 --- a/cpp-ch/local-engine/Parser/ExpandRelParser.h +++ b/cpp-ch/local-engine/Parser/ExpandRelParser.h @@ -16,10 +16,11 @@ */ #pragma once #include -#include + namespace local_engine { +class SerializedPlanParser; class ExpandRelParser : public RelParser { public: diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index 20340a09907c..730a013dce4c 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -60,7 +60,7 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel( MergeTreeTableInstance merge_tree_table(extension_table); // ignore snapshot id for query merge_tree_table.snapshot_id = ""; - auto storage = merge_tree_table.restoreStorage(global_context); + auto storage = merge_tree_table.restoreStorage(QueryContext::globalMutableContext()); DB::Block input; if (rel.has_base_schema() && rel.base_schema().names_size()) @@ -318,7 +318,7 @@ String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_ MergeTreeTableInstance merge_tree_table(read_rel.advanced_extension().enhancement()); // ignore snapshot id for query merge_tree_table.snapshot_id = ""; - auto storage = merge_tree_table.restoreStorage(global_context); + auto storage = merge_tree_table.restoreStorage(QueryContext::globalMutableContext()); auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema()); auto names_and_types_list = input.getNamesAndTypesList(); diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h index 91c335a686a0..94b4809d3969 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h @@ -20,7 +20,6 @@ #include #include -#include namespace DB { @@ -37,8 +36,7 @@ using namespace DB; class MergeTreeRelParser : public RelParser { public: - explicit MergeTreeRelParser(SerializedPlanParser * plan_paser_, const ContextPtr & context_) - : RelParser(plan_paser_), context(context_), global_context(plan_paser_->global_context) + explicit MergeTreeRelParser(SerializedPlanParser * plan_paser_, const ContextPtr & context_) : RelParser(plan_paser_), context(context_) { } @@ -88,8 +86,7 @@ class MergeTreeRelParser : public RelParser static void collectColumns(const substrait::Expression & rel, NameSet & columns, Block & block); UInt64 getColumnsSize(const NameSet & columns); - const ContextPtr & context; - ContextMutablePtr & global_context; + ContextPtr context; }; } diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp b/cpp-ch/local-engine/Parser/RelMetric.cpp index e138642607c4..98a9b284ec6b 100644 --- a/cpp-ch/local-engine/Parser/RelMetric.cpp +++ b/cpp-ch/local-engine/Parser/RelMetric.cpp @@ -49,7 +49,7 @@ namespace local_engine static void writeCacheHits(Writer & writer) { - const auto thread_group = QueryContextManager::currentThreadGroup(); + const auto thread_group = QueryContext::currentThreadGroup(); auto & counters = thread_group->performance_counters; auto read_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load(); auto miss_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load(); @@ -109,7 +109,7 @@ RelMetricTimes RelMetric::getTotalTime() const { for (const auto & processor : step->getProcessors()) { - timeMetrics.time += processor->getElapsedNs() / 1000U ; + timeMetrics.time += processor->getElapsedNs() / 1000U; timeMetrics.input_wait_elapsed_us += processor->getInputWaitElapsedNs() / 1000U; timeMetrics.output_wait_elapsed_us += processor->getInputWaitElapsedNs() / 1000U; } @@ -209,9 +209,7 @@ std::string RelMetricSerializer::serializeRelMetric(const RelMetricPtr & rel_met auto metric = metrics.top(); metrics.pop(); for (const auto & item : metric->getInputs()) - { metrics.push(item); - } metric->serialize(writer); } writer.EndArray(); diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 718656ac8a55..8efbd97d240d 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -1324,10 +1324,6 @@ SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) : contex { } -ContextMutablePtr SerializedPlanParser::global_context = nullptr; - -Context::ConfigurationPtr SerializedPlanParser::config = nullptr; - void SerializedPlanParser::collectJoinKeys( const substrait::Expression & condition, std::vector> & join_keys, int32_t right_key_start) { @@ -1565,8 +1561,6 @@ void SerializedPlanParser::wrapNullable( } } -SharedContextHolder SerializedPlanParser::shared_context; - LocalExecutor::~LocalExecutor() { if (dump_pipeline) diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h b/cpp-ch/local-engine/Parser/SerializedPlanParser.h index 85150c099ba9..112e82a8790e 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h @@ -131,9 +131,6 @@ class SerializedPlanParser static std::pair parseLiteral(const substrait::Expression_Literal & literal); - static ContextMutablePtr global_context; - static Context::ConfigurationPtr config; - static SharedContextHolder shared_context; std::vector extra_plan_holder; private: @@ -142,22 +139,19 @@ class SerializedPlanParser collectJoinKeys(const substrait::Expression & condition, std::vector> & join_keys, int32_t right_key_start); void parseFunctionOrExpression( - const substrait::Expression & rel, - std::string & result_name, - DB::ActionsDAG& actions_dag, - bool keep_result = false); + const substrait::Expression & rel, std::string & result_name, DB::ActionsDAG & actions_dag, bool keep_result = false); void parseJsonTuple( const substrait::Expression & rel, std::vector & result_names, - DB::ActionsDAG& actions_dag, + DB::ActionsDAG & actions_dag, bool keep_result = false, bool position = false); const ActionsDAG::Node * parseFunctionWithDAG( - const substrait::Expression & rel, std::string & result_name, DB::ActionsDAG& actions_dag, bool keep_result = false); + const substrait::Expression & rel, std::string & result_name, DB::ActionsDAG & actions_dag, bool keep_result = false); ActionsDAG::NodeRawConstPtrs parseArrayJoinWithDAG( const substrait::Expression & rel, std::vector & result_name, - DB::ActionsDAG& actions_dag, + DB::ActionsDAG & actions_dag, bool keep_result = false, bool position = false); void parseFunctionArguments( @@ -174,14 +168,14 @@ class SerializedPlanParser bool & is_map); - const DB::ActionsDAG::Node * parseExpression(DB::ActionsDAG& actions_dag, const substrait::Expression & rel); + const DB::ActionsDAG::Node * parseExpression(DB::ActionsDAG & actions_dag, const substrait::Expression & rel); const ActionsDAG::Node * - toFunctionNode(ActionsDAG& actions_dag, const String & function, const DB::ActionsDAG::NodeRawConstPtrs & args); + toFunctionNode(ActionsDAG & actions_dag, const String & function, const DB::ActionsDAG::NodeRawConstPtrs & args); // remove nullable after isNotNull void removeNullableForRequiredColumns(const std::set & require_columns, ActionsDAG & actions_dag) const; std::string getUniqueName(const std::string & name) { return name + "_" + std::to_string(name_no++); } void wrapNullable( - const std::vector & columns, ActionsDAG& actions_dag, std::map & nullable_measure_names); + const std::vector & columns, ActionsDAG & actions_dag, std::map & nullable_measure_names); static std::pair convertStructFieldType(const DB::DataTypePtr & type, const DB::Field & field); bool isFunction(substrait::Expression_ScalarFunction rel, String function_name); @@ -198,7 +192,7 @@ class SerializedPlanParser std::vector metrics; public: - const ActionsDAG::Node * addColumn(DB::ActionsDAG& actions_dag, const DataTypePtr & type, const Field & field); + const ActionsDAG::Node * addColumn(DB::ActionsDAG & actions_dag, const DataTypePtr & type, const Field & field); }; struct SparkBuffer diff --git a/cpp-ch/local-engine/Parser/TypeParser.cpp b/cpp-ch/local-engine/Parser/TypeParser.cpp index 269f35747552..84c9362269de 100644 --- a/cpp-ch/local-engine/Parser/TypeParser.cpp +++ b/cpp-ch/local-engine/Parser/TypeParser.cpp @@ -19,14 +19,12 @@ #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include @@ -38,6 +36,7 @@ #include #include #include +#include namespace DB { @@ -274,7 +273,7 @@ DB::Block TypeParser::buildBlockFromNamedStruct(const substrait::NamedStruct & s auto args_types = tuple_type->getElements(); AggregateFunctionProperties properties; - auto tmp_ctx = DB::Context::createCopy(SerializedPlanParser::global_context); + auto tmp_ctx = DB::Context::createCopy(QueryContext::globalContext()); SerializedPlanParser tmp_plan_parser(tmp_ctx); auto function_parser = AggregateFunctionParserFactory::instance().get(name_parts[3], &tmp_plan_parser); /// This may remove elements from args_types, because some of them are used to determine CH function name, but not needed for the following diff --git a/cpp-ch/local-engine/Parser/TypeParser.h b/cpp-ch/local-engine/Parser/TypeParser.h index 57a12de0489d..2a498989e27a 100644 --- a/cpp-ch/local-engine/Parser/TypeParser.h +++ b/cpp-ch/local-engine/Parser/TypeParser.h @@ -16,7 +16,7 @@ */ #pragma once #include -#include + #include #include #include diff --git a/cpp-ch/local-engine/Parser/WriteRelParser.cpp b/cpp-ch/local-engine/Parser/WriteRelParser.cpp index ecae3f16d00a..c1d2ee2504ef 100644 --- a/cpp-ch/local-engine/Parser/WriteRelParser.cpp +++ b/cpp-ch/local-engine/Parser/WriteRelParser.cpp @@ -30,6 +30,7 @@ #include using namespace local_engine; +using namespace DB; DB::ProcessorPtr make_sink( const DB::ContextPtr & context, diff --git a/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_add.cpp b/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_add.cpp index 12284ea003fe..35e863e125db 100644 --- a/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_add.cpp +++ b/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_add.cpp @@ -15,17 +15,18 @@ * limitations under the License. */ #include +#include #include #include #include -#include +#include using namespace DB; using namespace local_engine; TEST(MyAdd, Common) { - auto context = local_engine::SerializedPlanParser::global_context; + auto context = local_engine::QueryContext::globalContext(); auto type = std::make_shared(std::make_shared()); FunctionExecutor executor("my_add", {type, type}, type, context); diff --git a/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_md5.cpp b/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_md5.cpp index c58262d9871a..3ff03e0b99e7 100644 --- a/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_md5.cpp +++ b/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_md5.cpp @@ -15,17 +15,18 @@ * limitations under the License. */ #include +#include #include #include #include -#include +#include using namespace DB; using namespace local_engine; TEST(MyMd5, Common) { - auto context = local_engine::SerializedPlanParser::global_context; + auto context = local_engine::QueryContext::globalContext(); auto type = std::make_shared(std::make_shared()); FunctionExecutor executor("my_md5", {type}, type, context); diff --git a/cpp-ch/local-engine/Rewriter/RelRewriter.h b/cpp-ch/local-engine/Rewriter/RelRewriter.h index 719ec4aff099..2830a179bf6a 100644 --- a/cpp-ch/local-engine/Rewriter/RelRewriter.h +++ b/cpp-ch/local-engine/Rewriter/RelRewriter.h @@ -16,32 +16,27 @@ */ #pragma once -#include -#include -#include -#include -#include #include -#include -#include #include +#include +#include +#include -#include -#include namespace local_engine { class RelRewriter { public: - RelRewriter(SerializedPlanParser *parser_) : parser(parser_) {} + RelRewriter(SerializedPlanParser * parser_) : parser(parser_) { } virtual ~RelRewriter() = default; virtual void rewrite(substrait::Rel & rel) = 0; + protected: - SerializedPlanParser *parser; + SerializedPlanParser * parser; - inline DB::ContextPtr getContext() { return parser->context; } - inline std::unordered_map & getFunctionMapping() { return parser->function_mapping; } + inline DB::ContextPtr getContext() const { return parser->context; } + inline std::unordered_map & getFunctionMapping() const { return parser->function_mapping; } }; } diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 79d640d3b2bc..e4a8f86b0b3b 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -22,18 +22,16 @@ #include #include #include +#include #include +#include #include -#include #include +#include #include #include #include #include -#include - -#include -#include namespace DB @@ -324,7 +322,7 @@ PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_, LoggerPt partition_block_buffer[partition_id] = std::make_shared(options->split_size); partition_buffer[partition_id] = std::make_shared(); } - settings = MemoryConfig::loadFromContext(SerializedPlanParser::global_context); + settings = MemoryConfig::loadFromContext(QueryContext::globalContext()); } size_t PartitionWriter::bytes() const diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index 78eb845e1db1..15f8b5086681 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -18,15 +18,14 @@ #include #include #include -#include #include #include #include -#include #include #include #include - +#include +#include namespace DB { @@ -113,7 +112,7 @@ class Spillable protected: String getNextSpillFile(); - std::vector mergeSpills(CachedShuffleWriter * shuffle_writer, WriteBuffer & data_file, ExtraData extra_data = {}); + std::vector mergeSpills(CachedShuffleWriter * shuffle_writer, DB::WriteBuffer & data_file, ExtraData extra_data = {}); std::vector spill_infos; private: @@ -140,7 +139,7 @@ class SortBasedPartitionWriter : public PartitionWriter { max_merge_block_size = options->split_size; max_sort_buffer_size = options->max_sort_buffer_size; - max_merge_block_bytes = SerializedPlanParser::global_context->getSettingsRef().prefer_external_sort_block_bytes; + max_merge_block_bytes = QueryContext::globalContext()->getSettingsRef().prefer_external_sort_block_bytes; } public: String getName() const override { return "SortBasedPartitionWriter"; } @@ -161,10 +160,10 @@ class SortBasedPartitionWriter : public PartitionWriter size_t max_merge_block_bytes = 0; size_t current_accumulated_bytes = 0; size_t current_accumulated_rows = 0; - Chunks accumulated_blocks; - Block output_header; - Block sort_header; - SortDescription sort_description; + DB::Chunks accumulated_blocks; + DB::Block output_header; + DB::Block sort_header; + DB::SortDescription sort_description; }; class MemorySortLocalPartitionWriter : public SortBasedPartitionWriter, public Spillable diff --git a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp index 272a6f2f6bee..755de6402926 100644 --- a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp +++ b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp @@ -17,7 +17,6 @@ #include "SelectorBuilder.h" #include #include -#include #include #include #include @@ -30,6 +29,7 @@ #include #include #include +#include namespace DB { @@ -101,7 +101,7 @@ PartitionInfo HashSelectorBuilder::build(DB::Block & block) if (!hash_function) [[unlikely]] { auto & factory = DB::FunctionFactory::instance(); - auto function = factory.get(hash_function_name, local_engine::SerializedPlanParser::global_context); + auto function = factory.get(hash_function_name, QueryContext::globalContext()); hash_function = function->build(args); } @@ -328,7 +328,7 @@ void RangeSelectorBuilder::initActionsDAG(const DB::Block & block) std::lock_guard lock(actions_dag_mutex); if (has_init_actions_dag) return; - SerializedPlanParser plan_parser(local_engine::SerializedPlanParser::global_context); + SerializedPlanParser plan_parser(QueryContext::globalContext()); plan_parser.parseExtensions(projection_plan_pb->extensions()); const auto & expressions = projection_plan_pb->relations().at(0).root().input().project().expressions(); diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index a2943f0b72d4..daa3c0e30577 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -65,7 +66,7 @@ CacheManager & CacheManager::instance() return cache_manager; } -void CacheManager::initialize(DB::ContextMutablePtr context_) +void CacheManager::initialize(const DB::ContextMutablePtr & context_) { auto & manager = instance(); manager.context = context_; diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h b/cpp-ch/local-engine/Storages/Cache/CacheManager.h index 2c1c010432dd..b59963ec4fa7 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h @@ -39,7 +39,7 @@ class CacheManager static void initJNI(JNIEnv * env); static CacheManager & instance(); - static void initialize(DB::ContextMutablePtr context); + static void initialize(const DB::ContextMutablePtr & context); JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set & columns); static jobject getCacheStatus(JNIEnv * env, const String & jobId); diff --git a/cpp-ch/local-engine/Storages/Cache/JobScheduler.cpp b/cpp-ch/local-engine/Storages/Cache/JobScheduler.cpp index 6a43ad644433..add3648f9134 100644 --- a/cpp-ch/local-engine/Storages/Cache/JobScheduler.cpp +++ b/cpp-ch/local-engine/Storages/Cache/JobScheduler.cpp @@ -18,9 +18,9 @@ #include "JobScheduler.h" +#include #include #include -#include #include namespace DB @@ -42,7 +42,7 @@ namespace local_engine { std::shared_ptr global_job_scheduler = nullptr; -void JobScheduler::initialize(DB::ContextPtr context) +void JobScheduler::initialize(const DB::ContextPtr & context) { auto config = GlutenJobSchedulerConfig::loadFromContext(context); instance().thread_pool = std::make_unique( diff --git a/cpp-ch/local-engine/Storages/Cache/JobScheduler.h b/cpp-ch/local-engine/Storages/Cache/JobScheduler.h index b5c2f601a92b..379d5cf5f45b 100644 --- a/cpp-ch/local-engine/Storages/Cache/JobScheduler.h +++ b/cpp-ch/local-engine/Storages/Cache/JobScheduler.h @@ -15,10 +15,10 @@ * limitations under the License. */ #pragma once -#include -#include #include +#include #include +#include namespace local_engine { @@ -108,7 +108,7 @@ class JobScheduler return global_job_scheduler; } - static void initialize(DB::ContextPtr context); + static void initialize(const DB::ContextPtr & context); JobId scheduleJob(Job&& job); diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp index 5cdeaf7a0e4b..c9b56734a658 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp @@ -15,12 +15,12 @@ * limitations under the License. */ #include "MetaDataHelper.h" - #include #include #include #include #include +#include namespace CurrentMetrics { @@ -78,7 +78,7 @@ void restoreMetaData(const SparkStorageMergeTreePtr & storage, const MergeTreeTa return; // Increase the speed of metadata recovery - auto max_concurrency = std::max(10UL, SerializedPlanParser::global_context->getSettingsRef().max_threads.value); + auto max_concurrency = std::max(10UL, QueryContext::globalContext()->getSettingsRef().max_threads.value); auto max_threads = std::min(max_concurrency, not_exists_part.size()); FreeThreadPool thread_pool( CurrentMetrics::LocalThread, diff --git a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp index ed41633d3d40..55cffe04a671 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp @@ -83,7 +83,7 @@ StorageMergeTreeFactory::getDataPartsByNames(const StorageID & id, const String { DataPartsVector res; auto table_name = getTableName(id, snapshot_id); - auto config = MergeTreeConfig::loadFromContext(SerializedPlanParser::global_context); + auto config = MergeTreeConfig::loadFromContext(QueryContext::globalContext()); std::lock_guard lock(datapart_mutex); std::unordered_set missing_names; if (!datapart_map->has(table_name)) [[unlikely]] diff --git a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h index 9418198583ec..2c76fe51eda4 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h +++ b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h @@ -16,10 +16,11 @@ */ #pragma once #include -#include #include +#include #include #include +#include namespace local_engine { @@ -70,7 +71,7 @@ class StorageMergeTreeFactory static DataPartsVector getDataPartsByNames(const StorageID & id, const String & snapshot_id, std::unordered_set part_name); static void init_cache_map() { - auto config = MergeTreeConfig::loadFromContext(SerializedPlanParser::global_context); + auto config = MergeTreeConfig::loadFromContext(QueryContext::globalContext()); auto & storage_map_v = storage_map; if (!storage_map_v) { diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp index 46edb7f30d5b..632fb0a4530c 100644 --- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp +++ b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ #include "FileWriterWrappers.h" +#include namespace local_engine { diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h index 736f5a95f6bd..49383f8de42c 100644 --- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h +++ b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h @@ -19,9 +19,9 @@ #include #include #include -#include #include #include +#include #include #include #include @@ -69,31 +69,25 @@ class NormalFileWriter : public FileWriterWrapper }; std::unique_ptr createFileWriterWrapper( - const DB::ContextPtr & context, - const std::string & file_uri, - const DB::Block & preferred_schema, - const std::string & format_hint); + const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint); OutputFormatFilePtr createOutputFormatFile( - const DB::ContextPtr & context, - const std::string & file_uri, - const DB::Block & preferred_schema, - const std::string & format_hint); + const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint); class WriteStats : public DB::ISimpleTransform { bool all_chunks_processed_ = false; /// flag to determine if we have already processed all chunks - Arena partition_keys_arena_; + DB::Arena partition_keys_arena_; std::string filename_; absl::flat_hash_map fiel_to_count_; - static Block statsHeader() + static DB::Block statsHeader() { return makeBlockHeader({{STRING(), "filename"}, {STRING(), "partition_id"}, {BIGINT(), "record_count"}}); } - Chunk final_result() const + DB::Chunk final_result() const { ///TODO: improve performance auto file_col = STRING()->createColumn(); @@ -115,7 +109,7 @@ class WriteStats : public DB::ISimpleTransform } public: - explicit WriteStats(const Block & input_header_) : ISimpleTransform(input_header_, statsHeader(), true) { } + explicit WriteStats(const DB::Block & input_header_) : ISimpleTransform(input_header_, statsHeader(), true) { } Status prepare() override { @@ -130,7 +124,7 @@ class WriteStats : public DB::ISimpleTransform } String getName() const override { return "WriteStats"; } - void transform(Chunk & chunk) override + void transform(DB::Chunk & chunk) override { if (all_chunks_processed_) chunk = final_result(); @@ -159,11 +153,11 @@ class WriteStats : public DB::ISimpleTransform it->second += rows; return; } - throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "File path {} not found in the stats map", file_path); + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "File path {} not found in the stats map", file_path); } }; -class SubstraitFileSink final : public SinkToStorage +class SubstraitFileSink final : public DB::SinkToStorage { const std::string partition_id_; const std::string relative_path_; @@ -187,7 +181,7 @@ class SubstraitFileSink final : public SinkToStorage const std::string & partition_id, const std::string & relative, const std::string & format_hint, - const Block & header) + const DB::Block & header) : SinkToStorage(header) , partition_id_(partition_id.empty() ? NO_PARTITION_ID : partition_id) , relative_path_(relative) @@ -205,7 +199,7 @@ class SubstraitFileSink final : public SinkToStorage } protected: - void consume(Chunk & chunk) override + void consume(DB::Chunk & chunk) override { const size_t row_count = chunk.getNumRows(); output_format_->output->write(materializeBlock(getHeader().cloneWithColumns(chunk.detachColumns()))); @@ -227,12 +221,12 @@ class SubstraitPartitionedFileSink final : public DB::PartitionedSink public: /// visible for UTs - static ASTPtr make_partition_expression(const DB::Names & partition_columns) + static DB::ASTPtr make_partition_expression(const DB::Names & partition_columns) { /// Parse the following expression into ASTs /// cancat('/col_name=', 'toString(col_name)') bool add_slash = false; - ASTs arguments; + DB::ASTs arguments; for (const auto & column : partition_columns) { // partition_column= @@ -243,7 +237,8 @@ class SubstraitPartitionedFileSink final : public DB::PartitionedSink // ifNull(toString(partition_column), DEFAULT_PARTITION_NAME) // FIXME if toString(partition_column) is empty auto column_ast = std::make_shared(column); - ASTs if_null_args{makeASTFunction("toString", ASTs{column_ast}), std::make_shared(DEFAULT_PARTITION_NAME)}; + DB::ASTs if_null_args{ + makeASTFunction("toString", DB::ASTs{column_ast}), std::make_shared(DEFAULT_PARTITION_NAME)}; arguments.emplace_back(makeASTFunction("ifNull", std::move(if_null_args))); } return DB::makeASTFunction("concat", std::move(arguments)); @@ -252,17 +247,17 @@ class SubstraitPartitionedFileSink final : public DB::PartitionedSink private: const std::string base_path_; const std::string filenmame_; - ContextPtr context_; - const Block sample_block_; + DB::ContextPtr context_; + const DB::Block sample_block_; const std::string format_hint_; std::shared_ptr stats_{nullptr}; public: SubstraitPartitionedFileSink( - const ContextPtr & context, - const Names & partition_by, - const Block & input_header, - const Block & sample_block, + const DB::ContextPtr & context, + const DB::Names & partition_by, + const DB::Block & input_header, + const DB::Block & sample_block, const std::string & base_path, const std::string & filename, const std::string & format_hint) @@ -274,7 +269,7 @@ class SubstraitPartitionedFileSink final : public DB::PartitionedSink , format_hint_(format_hint) { } - SinkPtr createSinkForPartition(const String & partition_id) override + DB::SinkPtr createSinkForPartition(const String & partition_id) override { assert(stats_); const auto partition_path = fmt::format("{}/{}", partition_id, filenmame_); diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 5e862040ca72..15c721be3489 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -211,7 +211,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_i JNIEnv * env, jclass, jbyteArray temp_path, jbyteArray filename) { LOCAL_ENGINE_JNI_METHOD_START - const auto query_context = local_engine::QueryContextManager::instance().currentQueryContext(); + const auto query_context = local_engine::QueryContext::instance().currentQueryContext(); const auto path_array = local_engine::getByteArrayElementsSafe(env, temp_path); const auto filename_array = local_engine::getByteArrayElementsSafe(env, filename); @@ -235,7 +235,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_ jboolean materialize_input) { LOCAL_ENGINE_JNI_METHOD_START - auto query_context = local_engine::QueryContextManager::instance().currentQueryContext(); + auto query_context = local_engine::QueryContext::instance().currentQueryContext(); // by task update new configs ( in case of dynamic config update ) const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan); @@ -864,7 +864,7 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW const auto file_uri = jstring2string(env, file_uri_); // for HiveFileFormat, the file url may not end with .parquet, so we pass in the format as a hint const auto format_hint = jstring2string(env, format_hint_); - const auto context = local_engine::QueryContextManager::instance().currentQueryContext(); + const auto context = local_engine::QueryContext::instance().currentQueryContext(); auto * writer = local_engine::createFileWriterWrapper(context, file_uri, preferred_schema, format_hint).release(); return reinterpret_cast(writer); LOCAL_ENGINE_JNI_METHOD_END(env, 0) @@ -882,7 +882,7 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW jbyteArray conf_plan) { LOCAL_ENGINE_JNI_METHOD_START - auto query_context = local_engine::QueryContextManager::instance().currentQueryContext(); + auto query_context = local_engine::QueryContext::instance().currentQueryContext(); // by task update new configs ( in case of dynamic config update ) const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan); const std::string::size_type conf_plan_size = conf_plan_a.length(); @@ -921,9 +921,9 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn auto read_ptr = local_engine::BinaryToMessage( {reinterpret_cast(read_a.elems()), static_cast(read_a.length())}); - local_engine::SerializedPlanParser parser(local_engine::SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(local_engine::QueryContext::globalContext()); parser.parseExtensions(plan_ptr.extensions()); - local_engine::MergeTreeRelParser mergeTreeParser(&parser, local_engine::SerializedPlanParser::global_context); + local_engine::MergeTreeRelParser mergeTreeParser(&parser, local_engine::QueryContext::globalContext()); auto res = mergeTreeParser.filterRangesOnDriver(read_ptr.read()); return local_engine::charTojstring(env, res.c_str()); @@ -996,7 +996,7 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn {reinterpret_cast(split_info_a.elems()), static_cast(split_info_a.length())}); local_engine::MergeTreeTableInstance merge_tree_table(extension_table); - auto context = local_engine::QueryContextManager::instance().currentQueryContext(); + auto context = local_engine::QueryContext::instance().currentQueryContext(); // each task using its own CustomStorageMergeTree, don't reuse auto temp_storage = merge_tree_table.copyToVirtualStorage(context); // prefetch all needed parts metadata before merge @@ -1218,7 +1218,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIEnv * env, jclass, jobject input, jbyteArray plan) { LOCAL_ENGINE_JNI_METHOD_START - const auto context = DB::Context::createCopy(local_engine::SerializedPlanParser::global_context); + const auto context = DB::Context::createCopy(local_engine::QueryContext::globalContext()); local_engine::SerializedPlanParser parser(context); const jobject iter = env->NewGlobalRef(input); parser.addInputIter(iter, false); @@ -1256,21 +1256,21 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeNex JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JNIEnv * env, jclass) { LOCAL_ENGINE_JNI_METHOD_START - return local_engine::QueryContextManager::instance().initializeQuery(); + return local_engine::QueryContext::instance().initializeQuery(); LOCAL_ENGINE_JNI_METHOD_END(env, 0l) } JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_threadGroupPeakMemory(JNIEnv * env, jclass, jlong id) { LOCAL_ENGINE_JNI_METHOD_START - return local_engine::QueryContextManager::instance().currentPeakMemory(id); + return local_engine::QueryContext::instance().currentPeakMemory(id); LOCAL_ENGINE_JNI_METHOD_END(env, 0l) } JNIEXPORT void Java_org_apache_gluten_memory_CHThreadGroup_releaseThreadGroup(JNIEnv * env, jclass, jlong id) { LOCAL_ENGINE_JNI_METHOD_START - local_engine::QueryContextManager::instance().finalizeQuery(id); + local_engine::QueryContext::instance().finalizeQuery(id); LOCAL_ENGINE_JNI_METHOD_END(env, ) } diff --git a/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp b/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp index 53155b7499ca..4ef9b5771af8 100644 --- a/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp +++ b/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp @@ -15,13 +15,14 @@ * limitations under the License. */ -#include #include -#include +#include #include +#include +#include #include -#include #include +#include using namespace DB; @@ -30,9 +31,7 @@ static Block createDataBlock(size_t rows) auto type = DataTypeFactory::instance().get("Float64"); auto column = type->createColumn(); for (size_t i = 0; i < rows; ++i) - { column->insert(i * 1.0f); - } Block block; block.insert(ColumnWithTypeAndName(std::move(column), type, "d")); return std::move(block); @@ -42,7 +41,7 @@ static void BM_CHCastFloatToInt(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("CAST", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("CAST", local_engine::QueryContext::globalContext()); Block block = createDataBlock(30000000); DB::ColumnsWithTypeAndName args; args.emplace_back(block.getColumnsWithTypeAndName()[0]); @@ -52,7 +51,7 @@ static void BM_CHCastFloatToInt(benchmark::State & state) type_name_col.type = std::make_shared(); args.emplace_back(type_name_col); auto executable = function->build(args); - for (auto _ : state)[[maybe_unused]] + for (auto _ : state) [[maybe_unused]] auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); } @@ -60,11 +59,11 @@ static void BM_SparkCastFloatToInt(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("sparkCastFloatToInt64", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("sparkCastFloatToInt64", local_engine::QueryContext::globalContext()); Block block = createDataBlock(30000000); auto executable = function->build(block.getColumnsWithTypeAndName()); - for (auto _ : state)[[maybe_unused]] - auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); + for (auto _ : state) [[maybe_unused]] + auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); } BENCHMARK(BM_CHCastFloatToInt)->Unit(benchmark::kMillisecond)->Iterations(100); diff --git a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp index bc3e95c2dadc..a6e77e72fc49 100644 --- a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp +++ b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp @@ -23,33 +23,25 @@ #include #include #include -#include #include #include #include #include #include -#include #include #include -#include #include #include #include #include #include -#include -#include #include #include #include -#include #include #include #include -#include -#include -#include "testConfig.h" +#include #if defined(__SSE2__) #include @@ -90,7 +82,7 @@ DB::ContextMutablePtr global_context; substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format; file->mutable_parquet()->CopyFrom(parquet_format); auto builder = std::make_unique(); - builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); + builder->init(Pipe(std::make_shared(QueryContext::globalContext(), header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -206,7 +198,7 @@ DB::ContextMutablePtr global_context; "/home/kyligence/Documents/test-dataset/intel-gazelle-test-" + std::to_string(state.range(0)) + ".snappy.parquet", std::move(schema)) .build(); - local_engine::SerializedPlanParser parser(SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(QueryContext::globalContext()); auto local_executor = parser.createExecutor(*plan); state.ResumeTiming(); @@ -220,8 +212,8 @@ DB::ContextMutablePtr global_context; [[maybe_unused]] static void BM_MERGE_TREE_TPCH_Q6_FROM_TEXT(benchmark::State & state) { - SerializedPlanParser::global_context = global_context; - local_engine::SerializedPlanParser parser(SerializedPlanParser::global_context); + QueryContext::globalContext() = global_context; + local_engine::SerializedPlanParser parser(QueryContext::globalContext()); for (auto _ : state) { state.PauseTiming(); @@ -270,7 +262,7 @@ DB::ContextMutablePtr global_context; "/home/kyligence/Documents/test-dataset/intel-gazelle-test-" + std::to_string(state.range(0)) + ".snappy.parquet", std::move(schema)) .build(); - local_engine::SerializedPlanParser parser(SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(QueryContext::globalContext()); auto local_executor = parser.createExecutor(*plan); state.ResumeTiming(); @@ -306,7 +298,7 @@ DB::ContextMutablePtr global_context; std::move(schema)) .build(); - local_engine::SerializedPlanParser parser(SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(QueryContext::globalContext()); auto local_executor = parser.createExecutor(*plan); local_engine::SparkRowToCHColumn converter; while (local_executor->hasNext()) @@ -351,7 +343,7 @@ DB::ContextMutablePtr global_context; "/home/kyligence/Documents/test-dataset/intel-gazelle-test-" + std::to_string(state.range(0)) + ".snappy.parquet", std::move(schema)) .build(); - local_engine::SerializedPlanParser parser(SerializedPlanParser::global_context); + local_engine::SerializedPlanParser parser(QueryContext::globalContext()); auto local_executor = parser.createExecutor(*plan); local_engine::SparkRowToCHColumn converter; while (local_executor->hasNext()) diff --git a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp index 5cfe51389f2f..8ed7c108406d 100644 --- a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp +++ b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -31,6 +30,7 @@ #include #include #include +#include namespace { @@ -124,7 +124,7 @@ void BM_OptimizedParquetReadString(benchmark::State & state) auto builder = std::make_unique(); builder->init( - Pipe(std::make_shared(local_engine::SerializedPlanParser::global_context, header, files))); + Pipe(std::make_shared(local_engine::QueryContext::globalContext(), header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto reader = PullingPipelineExecutor(pipeline); while (reader.pull(res)) @@ -156,7 +156,7 @@ void BM_OptimizedParquetReadDate32(benchmark::State & state) auto builder = std::make_unique(); builder->init( - Pipe(std::make_shared(local_engine::SerializedPlanParser::global_context, header, files))); + Pipe(std::make_shared(local_engine::QueryContext::globalContext(), header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto reader = PullingPipelineExecutor(pipeline); while (reader.pull(res)) @@ -178,7 +178,7 @@ substrait::ReadRel::LocalFiles createLocalFiles(const std::string & filename, co auto config = Poco::AutoPtr(new Poco::Util::MapConfiguration()); config->setBool("use_local_format", use_local_format); - local_engine::SerializedPlanParser::global_context->setConfig(config); + local_engine::QueryContext::globalMutableContext()->setConfig(config); return files; } @@ -186,9 +186,8 @@ substrait::ReadRel::LocalFiles createLocalFiles(const std::string & filename, co void doRead(const substrait::ReadRel::LocalFiles & files, const std::optional & pushDown, const DB::Block & header) { const auto builder = std::make_unique(); - const auto source - = std::make_shared(local_engine::SerializedPlanParser::global_context, header, files); - source->setKeyCondition(pushDown, local_engine::SerializedPlanParser::global_context); + const auto source = std::make_shared(local_engine::QueryContext::globalContext(), header, files); + source->setKeyCondition(pushDown, local_engine::QueryContext::globalContext()); builder->init(DB::Pipe(source)); auto pipeline = DB::QueryPipelineBuilder::getPipeline(std::move(*builder)); auto reader = DB::PullingPipelineExecutor(pipeline); @@ -220,7 +219,7 @@ void BM_ColumnIndexRead_Filter_ReturnAllResult(benchmark::State & state) for (auto _ : state) doRead(files, pushDown, header); - local_engine::SerializedPlanParser::global_context->setConfig(Poco::AutoPtr(new Poco::Util::MapConfiguration())); + local_engine::QueryContext::globalMutableContext()->setConfig(Poco::AutoPtr(new Poco::Util::MapConfiguration())); } void BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state) @@ -237,7 +236,7 @@ void BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state) for (auto _ : state) doRead(files, pushDown, header); - local_engine::SerializedPlanParser::global_context->setConfig(Poco::AutoPtr(new Poco::Util::MapConfiguration())); + local_engine::QueryContext::globalMutableContext()->setConfig(Poco::AutoPtr(new Poco::Util::MapConfiguration())); } } diff --git a/cpp-ch/local-engine/tests/benchmark_spark_divide_function.cpp b/cpp-ch/local-engine/tests/benchmark_spark_divide_function.cpp index 2a824174bcf5..b6aac2445c8a 100644 --- a/cpp-ch/local-engine/tests/benchmark_spark_divide_function.cpp +++ b/cpp-ch/local-engine/tests/benchmark_spark_divide_function.cpp @@ -21,10 +21,11 @@ #include #include #include -#include +#include #include #include #include +#include using namespace DB; @@ -63,7 +64,7 @@ static std::string join(const ActionsDAG::NodeRawConstPtrs & v, char c) static const ActionsDAG::Node * addFunction(ActionsDAG & actions_dag, const String & function, const DB::ActionsDAG::NodeRawConstPtrs & args) { - auto function_builder = FunctionFactory::instance().get(function, local_engine::SerializedPlanParser::global_context); + auto function_builder = FunctionFactory::instance().get(function, local_engine::QueryContext::globalContext()); std::string args_name = join(args, ','); auto result_name = function + "(" + args_name + ")"; return &actions_dag.addFunction(function_builder, args, result_name); diff --git a/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp b/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp index 601355241d3f..ef961f21cbb6 100644 --- a/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp +++ b/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp @@ -26,8 +26,8 @@ #include #include #include -#include #include +#include #include using namespace DB; @@ -61,7 +61,7 @@ static void BM_CHFloorFunction_For_Int64(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("floor", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("floor", local_engine::QueryContext::globalContext()); Block int64_block = createDataBlock("Nullable(Int64)", 65536); auto executable = function->build(int64_block.getColumnsWithTypeAndName()); for (auto _ : state) @@ -75,7 +75,7 @@ static void BM_CHFloorFunction_For_Float64(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("floor", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("floor", local_engine::QueryContext::globalContext()); Block float64_block = createDataBlock("Nullable(Float64)", 65536); auto executable = function->build(float64_block.getColumnsWithTypeAndName()); for (auto _ : state) @@ -89,7 +89,7 @@ static void BM_SparkFloorFunction_For_Int64(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("sparkFloor", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("sparkFloor", local_engine::QueryContext::globalContext()); Block int64_block = createDataBlock("Nullable(Int64)", 65536); auto executable = function->build(int64_block.getColumnsWithTypeAndName()); for (auto _ : state) @@ -103,7 +103,7 @@ static void BM_SparkFloorFunction_For_Float64(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("sparkFloor", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("sparkFloor", local_engine::QueryContext::globalContext()); Block float64_block = createDataBlock("Nullable(Float64)", 65536); auto executable = function->build(float64_block.getColumnsWithTypeAndName()); for (auto _ : state) @@ -118,7 +118,8 @@ static void nanInfToNullAutoOpt(float * data, uint8_t * null_map, size_t size) for (size_t i = 0; i < size; ++i) { uint8_t is_nan = (data[i] != data[i]); - uint8_t is_inf = ((*reinterpret_cast(&data[i]) & 0b01111111111111111111111111111111) == 0b01111111100000000000000000000000); + uint8_t is_inf + = ((*reinterpret_cast(&data[i]) & 0b01111111111111111111111111111111) == 0b01111111100000000000000000000000); uint8_t null_flag = is_nan | is_inf; null_map[i] = null_flag; @@ -171,8 +172,7 @@ DECLARE_AVX2_SPECIFIC_CODE( mask >>= 1; } } - } -) + }) static void BMNanInfToNullAVX2(benchmark::State & state) { diff --git a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp index 3169df2e36fd..28e11a7badf9 100644 --- a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp +++ b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp @@ -14,6 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include +#include #include #include #include @@ -27,9 +29,6 @@ #include #include -#include -#include - using namespace DB; using namespace local_engine; diff --git a/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp b/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp index 9c04b7e316db..c72125163351 100644 --- a/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp +++ b/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp @@ -15,14 +15,14 @@ * limitations under the License. */ -#include #include -#include +#include #include +#include #include -#include #include #include +#include using namespace DB; @@ -31,9 +31,7 @@ static Block createDataBlock(size_t rows) auto type = DataTypeFactory::instance().get("String"); auto column = type->createColumn(); for (size_t i = 0; i < rows; ++i) - { column->insert("2024-01-05 12:12:12"); - } Block block; block.insert(ColumnWithTypeAndName(std::move(column), type, "d")); return std::move(block); @@ -43,10 +41,10 @@ static void BM_CHParseDateTime64(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("toDateTime64OrNull", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("toDateTime64OrNull", local_engine::QueryContext::globalContext()); Block block = createDataBlock(30000000); auto executable = function->build(block.getColumnsWithTypeAndName()); - for (auto _ : state)[[maybe_unused]] + for (auto _ : state) [[maybe_unused]] auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); } @@ -55,11 +53,11 @@ static void BM_SparkParseDateTime64(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("sparkToDateTime", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("sparkToDateTime", local_engine::QueryContext::globalContext()); Block block = createDataBlock(30000000); auto executable = function->build(block.getColumnsWithTypeAndName()); - for (auto _ : state)[[maybe_unused]] - auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); + for (auto _ : state) [[maybe_unused]] + auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); } BENCHMARK(BM_CHParseDateTime64)->Unit(benchmark::kMillisecond)->Iterations(50); diff --git a/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp b/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp index 3f053dcee663..e7abfda7a2b2 100644 --- a/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp +++ b/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp @@ -15,15 +15,15 @@ * limitations under the License. */ -#include #include -#include +#include #include +#include #include #include -#include #include #include +#include using namespace DB; @@ -32,16 +32,10 @@ static Block createDataBlock(String type_str, size_t rows) auto type = DataTypeFactory::instance().get(type_str); auto column = type->createColumn(); for (size_t i = 0; i < rows; ++i) - { if (type_str == "Date32") - { column->insert(i); - } else if (type_str == "Date") - { column->insert(i); - } - } Block block; block.insert(ColumnWithTypeAndName(std::move(column), type, "d")); return std::move(block); @@ -51,10 +45,10 @@ static void BM_CHUnixTimestamp_For_Date32(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("toUnixTimestamp", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("toUnixTimestamp", local_engine::QueryContext::globalContext()); Block block = createDataBlock("Date32", 30000000); auto executable = function->build(block.getColumnsWithTypeAndName()); - for (auto _ : state)[[maybe_unused]] + for (auto _ : state) [[maybe_unused]] auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); } @@ -62,10 +56,10 @@ static void BM_CHUnixTimestamp_For_Date(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("toUnixTimestamp", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("toUnixTimestamp", local_engine::QueryContext::globalContext()); Block block = createDataBlock("Date", 30000000); auto executable = function->build(block.getColumnsWithTypeAndName()); - for (auto _ : state)[[maybe_unused]] + for (auto _ : state) [[maybe_unused]] auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); } @@ -73,22 +67,22 @@ static void BM_SparkUnixTimestamp_For_Date32(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("sparkDateToUnixTimestamp", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("sparkDateToUnixTimestamp", local_engine::QueryContext::globalContext()); Block block = createDataBlock("Date32", 30000000); auto executable = function->build(block.getColumnsWithTypeAndName()); - for (auto _ : state)[[maybe_unused]] - auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); + for (auto _ : state) [[maybe_unused]] + auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); } static void BM_SparkUnixTimestamp_For_Date(benchmark::State & state) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("sparkDateToUnixTimestamp", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("sparkDateToUnixTimestamp", local_engine::QueryContext::globalContext()); Block block = createDataBlock("Date", 30000000); auto executable = function->build(block.getColumnsWithTypeAndName()); - for (auto _ : state)[[maybe_unused]] - auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); + for (auto _ : state) [[maybe_unused]] + auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); } BENCHMARK(BM_CHUnixTimestamp_For_Date32)->Unit(benchmark::kMillisecond)->Iterations(100); diff --git a/cpp-ch/local-engine/tests/gluten_test_util.cpp b/cpp-ch/local-engine/tests/gluten_test_util.cpp index 7dbc7206dbb7..66ce81a509a6 100644 --- a/cpp-ch/local-engine/tests/gluten_test_util.cpp +++ b/cpp-ch/local-engine/tests/gluten_test_util.cpp @@ -17,7 +17,6 @@ #include "gluten_test_util.h" #include #include - #include #include #include @@ -31,6 +30,7 @@ #include #include #include +#include namespace fs = std::filesystem; @@ -60,7 +60,7 @@ std::optional parseFilter(const std::string & filter, const AnotherR const ASTPtr ast_exp = parseQuery(parser2, filter.data(), filter.data() + filter.size(), "", 0, 0, 0); const auto prepared_sets = std::make_shared(); ActionsMatcher::Data visitor_data( - SerializedPlanParser::global_context, + QueryContext::globalContext(), size_limits_for_set, static_cast(0), name_and_types, @@ -78,7 +78,7 @@ std::pair> create_plan_and_execu std::string_view json_plan, std::string_view split_template, std::string_view file, const std::optional & context) { const std::string split = replaceLocalFilesWildcards(split_template, file); - SerializedPlanParser parser(context.value_or(SerializedPlanParser::global_context)); + SerializedPlanParser parser(context.value_or(QueryContext::globalContext())); parser.addSplitInfo(local_engine::JsonStringToBinary(split)); const auto plan = local_engine::JsonStringToMessage(json_plan); return {plan, parser.createExecutor(plan)}; diff --git a/cpp-ch/local-engine/tests/gluten_test_util.h b/cpp-ch/local-engine/tests/gluten_test_util.h index 9f7380cf5446..a9d8af37b47a 100644 --- a/cpp-ch/local-engine/tests/gluten_test_util.h +++ b/cpp-ch/local-engine/tests/gluten_test_util.h @@ -23,10 +23,13 @@ #include #include -#include #include #include +namespace substrait +{ +class Plan; +} namespace local_engine { class LocalExecutor; diff --git a/cpp-ch/local-engine/tests/gtest_ch_functions.cpp b/cpp-ch/local-engine/tests/gtest_ch_functions.cpp index 613beb9b8051..e905bc1787fa 100644 --- a/cpp-ch/local-engine/tests/gtest_ch_functions.cpp +++ b/cpp-ch/local-engine/tests/gtest_ch_functions.cpp @@ -15,18 +15,19 @@ * limitations under the License. */ #include +#include #include #include #include -#include #include #include +#include TEST(TestFuntion, Hash) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("murmurHash2_64", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("murmurHash2_64", local_engine::QueryContext::globalContext()); auto type0 = DataTypeFactory::instance().get("String"); auto column0 = type0->createColumn(); column0->insert("A"); @@ -56,7 +57,7 @@ TEST(TestFunction, In) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("in", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("in", local_engine::QueryContext::globalContext()); auto type0 = DataTypeFactory::instance().get("String"); auto type_set = std::make_shared(); @@ -83,8 +84,7 @@ TEST(TestFunction, In) auto arg = ColumnSet::create(4, future_set); ColumnsWithTypeAndName columns - = {ColumnWithTypeAndName(std::move(column1), type0, "string0"), - ColumnWithTypeAndName(std::move(arg), type_set, "__set")}; + = {ColumnWithTypeAndName(std::move(column1), type0, "string0"), ColumnWithTypeAndName(std::move(arg), type_set, "__set")}; Block block(columns); std::cerr << "input:\n"; debug::headBlock(block); @@ -100,7 +100,7 @@ TEST(TestFunction, NotIn1) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("notIn", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("notIn", local_engine::QueryContext::globalContext()); auto type0 = DataTypeFactory::instance().get("String"); auto type_set = std::make_shared(); @@ -125,7 +125,7 @@ TEST(TestFunction, NotIn1) auto future_set = std::make_shared(std::move(set)); //TODO: WHY? after https://github.com/ClickHouse/ClickHouse/pull/63723 we need pass 4 instead of 1 - auto arg = ColumnSet::create(4,future_set); + auto arg = ColumnSet::create(4, future_set); ColumnsWithTypeAndName columns = {ColumnWithTypeAndName(std::move(column1), type0, "string0"), ColumnWithTypeAndName(std::move(arg), type_set, "__set")}; @@ -143,7 +143,7 @@ TEST(TestFunction, NotIn2) { using namespace DB; auto & factory = FunctionFactory::instance(); - auto function = factory.get("in", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("in", local_engine::QueryContext::globalContext()); auto type0 = DataTypeFactory::instance().get("String"); auto type_set = std::make_shared(); @@ -168,7 +168,7 @@ TEST(TestFunction, NotIn2) auto future_set = std::make_shared(std::move(set)); //TODO: WHY? after https://github.com/ClickHouse/ClickHouse/pull/63723 we need pass 4 instead of 1 - auto arg = ColumnSet::create(4,future_set); + auto arg = ColumnSet::create(4, future_set); ColumnsWithTypeAndName columns = {ColumnWithTypeAndName(std::move(column1), type0, "string0"), ColumnWithTypeAndName(std::move(arg), type_set, "__set")}; @@ -178,7 +178,7 @@ TEST(TestFunction, NotIn2) auto executable = function->build(block.getColumnsWithTypeAndName()); auto result = executable->execute(block.getColumnsWithTypeAndName(), executable->getResultType(), block.rows()); - auto function_not = factory.get("not", local_engine::SerializedPlanParser::global_context); + auto function_not = factory.get("not", local_engine::QueryContext::globalContext()); auto type_bool = DataTypeFactory::instance().get("UInt8"); ColumnsWithTypeAndName columns2 = {ColumnWithTypeAndName(result, type_bool, "string0")}; Block block2(columns2); diff --git a/cpp-ch/local-engine/tests/gtest_ch_join.cpp b/cpp-ch/local-engine/tests/gtest_ch_join.cpp index 67775dd1a4a6..af661c297f15 100644 --- a/cpp-ch/local-engine/tests/gtest_ch_join.cpp +++ b/cpp-ch/local-engine/tests/gtest_ch_join.cpp @@ -14,37 +14,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include +#include #include +#include +#include #include -#include #include #include +#include #include #include #include +#include #include #include #include - #include #include #include #include - -#include -#include -#include -#include +#include using namespace DB; using namespace local_engine; TEST(TestJoin, simple) { - auto global_context = SerializedPlanParser::global_context; - local_engine::SerializedPlanParser::global_context->setSetting("join_use_nulls", true); + auto global_context = local_engine::QueryContext::globalContext(); + local_engine::QueryContext::globalMutableContext()->setSetting("join_use_nulls", true); auto & factory = DB::FunctionFactory::instance(); - auto function = factory.get("murmurHash2_64", local_engine::SerializedPlanParser::global_context); + auto function = factory.get("murmurHash2_64", global_context); auto int_type = DataTypeFactory::instance().get("Int32"); auto column0 = int_type->createColumn(); column0->insert(1); diff --git a/cpp-ch/local-engine/tests/gtest_ch_storages.cpp b/cpp-ch/local-engine/tests/gtest_ch_storages.cpp index a32e7c476dcb..a3454f9b0959 100644 --- a/cpp-ch/local-engine/tests/gtest_ch_storages.cpp +++ b/cpp-ch/local-engine/tests/gtest_ch_storages.cpp @@ -16,7 +16,6 @@ */ #include #include -#include #include #include #include @@ -26,6 +25,7 @@ #include #include #include +#include using namespace DB; using namespace local_engine; @@ -33,7 +33,7 @@ using namespace local_engine; TEST(TestBatchParquetFileSource, blob) { GTEST_SKIP(); - auto config = local_engine::SerializedPlanParser::config; + Context::ConfigurationPtr config; config->setString("blob.storage_account_url", "http://127.0.0.1:10000/devstoreaccount1"); config->setString("blob.container_name", "libch"); config->setString("blob.container_already_exists", "true"); @@ -79,7 +79,7 @@ TEST(TestBatchParquetFileSource, blob) columns.emplace_back(std::move(col)); } auto header = Block(std::move(columns)); - builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); + builder->init(Pipe(std::make_shared(QueryContext::globalContext(), header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -101,7 +101,7 @@ TEST(TestBatchParquetFileSource, blob) TEST(TestBatchParquetFileSource, s3) { GTEST_SKIP(); - auto config = local_engine::SerializedPlanParser::config; + Context::ConfigurationPtr config; config->setString("s3.endpoint", "http://localhost:9000/tpch/"); config->setString("s3.region", "us-east-1"); config->setString("s3.access_key_id", "admin"); @@ -143,7 +143,7 @@ TEST(TestBatchParquetFileSource, s3) columns.emplace_back(std::move(col)); } auto header = Block(std::move(columns)); - builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); + builder->init(Pipe(std::make_shared(QueryContext::globalContext(), header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -210,7 +210,7 @@ TEST(TestBatchParquetFileSource, local_file) columns.emplace_back(std::move(col)); } auto header = Block(std::move(columns)); - builder->init(Pipe(std::make_shared(SerializedPlanParser::global_context, header, files))); + builder->init(Pipe(std::make_shared(QueryContext::globalContext(), header, files))); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto executor = PullingPipelineExecutor(pipeline); @@ -260,11 +260,11 @@ TEST(TestPrewhere, OptimizePrewhereCondition) } Block block(std::move(columns)); - ContextPtr context = SerializedPlanParser::global_context; + ContextPtr context = QueryContext::globalContext(); SerializedPlanParser * parser = new SerializedPlanParser(context); parser->parseExtensions(plan_ptr->extensions()); - MergeTreeRelParser mergeTreeParser(parser, SerializedPlanParser::global_context); + MergeTreeRelParser mergeTreeParser(parser, QueryContext::globalContext()); mergeTreeParser.column_sizes["l_discount"] = 0; mergeTreeParser.column_sizes["l_quantity"] = 1; diff --git a/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp b/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp index 1e2525b33a5b..012b4ebdddfc 100644 --- a/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp +++ b/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp @@ -17,11 +17,12 @@ #include #include #include +#include #include #include #include #include - +#include using namespace local_engine; @@ -31,7 +32,7 @@ using namespace DB; INCBIN(_pr_54881_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_54881.json"); TEST(Clickhouse, PR54881) { - const auto context1 = DB::Context::createCopy(SerializedPlanParser::global_context); + const auto context1 = DB::Context::createCopy(QueryContext::globalContext()); // context1->setSetting("enable_named_columns_in_function_tuple", DB::Field(true)); auto settings = context1->getSettingsRef(); EXPECT_FALSE(settings.enable_named_columns_in_function_tuple) << "GLUTEN NEED set enable_named_columns_in_function_tuple to false"; @@ -81,7 +82,7 @@ INCBIN(_pr_65234_, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_ TEST(Clickhouse, PR65234) { const std::string split = R"({"items":[{"uriFile":"file:///foo","length":"84633","parquet":{},"schema":{},"metadataColumns":[{}]}]})"; - SerializedPlanParser parser(SerializedPlanParser::global_context); + SerializedPlanParser parser(QueryContext::globalContext()); parser.addSplitInfo(local_engine::JsonStringToBinary(split)); const auto plan = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_pr_65234_)); auto query_plan = parser.parse(plan); diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp index 45aaf3db6f85..e42a2a89a3c0 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp @@ -18,17 +18,16 @@ #include "config.h" #if USE_PARQUET +#include #include #include -#include +#include #include #include +#include #include #include #include - -#include -#include #include #include #include @@ -40,6 +39,8 @@ #include #include #include +#include +#include namespace DB::ErrorCodes { @@ -359,7 +360,7 @@ void testCondition(const std::string & exp, const std::vector & expected static const AnotherRowType name_and_types = buildTestRowType(); static const local_engine::ColumnIndexStore column_index_store = buildTestColumnIndexStore(); const local_engine::ColumnIndexFilter filter( - local_engine::test::parseFilter(exp, name_and_types).value(), local_engine::SerializedPlanParser::global_context); + local_engine::test::parseFilter(exp, name_and_types).value(), local_engine::QueryContext::globalContext()); assertRows(filter.calculateRowRanges(column_index_store, TOTALSIZE), expectedRows); } @@ -470,7 +471,6 @@ TEST(ColumnIndex, FilteringWithAllNullPages) } TEST(ColumnIndex, FilteringWithNotFoundColumnName) { - using namespace test_utils; using namespace local_engine; const local_engine::ColumnIndexStore column_index_store = buildTestColumnIndexStore(); @@ -480,7 +480,7 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName) const AnotherRowType upper_name_and_types{{"COLUMN5", BIGINT()}}; const local_engine::ColumnIndexFilter filter_upper( local_engine::test::parseFilter("COLUMN5 in (7, 20)", upper_name_and_types).value(), - local_engine::SerializedPlanParser::global_context); + local_engine::QueryContext::globalContext()); assertRows( filter_upper.calculateRowRanges(column_index_store, TOTALSIZE), std::vector(boost::counting_iterator(0), boost::counting_iterator(TOTALSIZE))); @@ -490,7 +490,7 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName) const AnotherRowType lower_name_and_types{{"column5", BIGINT()}}; const local_engine::ColumnIndexFilter filter_lower( local_engine::test::parseFilter("column5 in (7, 20)", lower_name_and_types).value(), - local_engine::SerializedPlanParser::global_context); + local_engine::QueryContext::globalContext()); assertRows(filter_lower.calculateRowRanges(column_index_store, TOTALSIZE), {}); } } @@ -1053,7 +1053,7 @@ TEST(ColumnIndex, VectorizedParquetRecordReader) static const AnotherRowType name_and_types{{"11", BIGINT()}}; const auto filterAction = local_engine::test::parseFilter("`11` = 10 or `11` = 50", name_and_types); auto column_index_filter - = std::make_shared(filterAction.value(), local_engine::SerializedPlanParser::global_context); + = std::make_shared(filterAction.value(), local_engine::QueryContext::globalContext()); Block blockHeader({{BIGINT(), "11"}, {STRING(), "18"}}); diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp b/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp index 4436bf0cd0ac..5e2be65528fe 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp @@ -17,12 +17,13 @@ #include #include #include +#include #include #include #include #include #include - +#include using namespace local_engine; @@ -32,7 +33,7 @@ INCBIN(_pr_18_2, SOURCE_DIR "/utils/extern-local-engine/tests/decimal_filter_pus TEST(ColumnIndex, Decimal182) { // [precision,scale] = [18,2] - const auto context1 = DB::Context::createCopy(SerializedPlanParser::global_context); + const auto context1 = DB::Context::createCopy(QueryContext::globalMutableContext()); const auto config = ExecutorConfig::loadFromContext(context1); EXPECT_TRUE(config.use_local_format) << "gtest need set use_local_format to true"; diff --git a/cpp-ch/local-engine/tests/gtest_parser.cpp b/cpp-ch/local-engine/tests/gtest_parser.cpp index bf52bd54ccee..5f8d482d5828 100644 --- a/cpp-ch/local-engine/tests/gtest_parser.cpp +++ b/cpp-ch/local-engine/tests/gtest_parser.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -26,7 +27,7 @@ #include #include #include - +#include using namespace local_engine; using namespace DB; @@ -39,7 +40,7 @@ TEST(LocalExecutor, ReadCSV) = R"({"items":[{"uriFile":"{replace_local_files}","length":"56","text":{"fieldDelimiter":",","maxBlockSize":"8192","header":"1"},"schema":{"names":["id","name","language"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; const std::string split = replaceLocalFilesWildcards( split_template, GLUTEN_SOURCE_DIR("/backends-velox/src/test/resources/datasource/csv/student_option_schema.csv")); - SerializedPlanParser parser(SerializedPlanParser::global_context); + SerializedPlanParser parser(QueryContext::globalContext()); parser.addSplitInfo(local_engine::JsonStringToBinary(split)); auto plan = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_readcsv_plan)); diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp index 69b6fa0f6d44..3db6f2fd4cb9 100644 --- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp @@ -17,10 +17,10 @@ #include #include - #include #include #include +#include #include #include #include @@ -42,6 +42,7 @@ #include #include #include +#include using namespace local_engine; using namespace DB; @@ -66,7 +67,7 @@ Chunk testChunk() TEST(LocalExecutor, StorageObjectStorageSink) { /// 0. Create ObjectStorage for HDFS - auto settings = SerializedPlanParser::global_context->getSettingsRef(); + auto settings = QueryContext::globalContext()->getSettingsRef(); const std::string query = R"(CREATE TABLE hdfs_engine_xxxx (name String, value UInt32) ENGINE=HDFS('hdfs://localhost:8020/clickhouse/test2', 'Parquet'))"; DB::ParserCreateQuery parser; @@ -90,10 +91,10 @@ TEST(LocalExecutor, StorageObjectStorageSink) EXPECT_TRUE(func && func->name == "HDFS"); DB::StorageHDFSConfiguration config; - StorageObjectStorage::Configuration::initialize(config, arg->children[0]->children, SerializedPlanParser::global_context, false); + StorageObjectStorage::Configuration::initialize(config, arg->children[0]->children, QueryContext::globalContext(), false); const std::shared_ptr object_storage - = std::dynamic_pointer_cast(config.createObjectStorage(SerializedPlanParser::global_context, false)); + = std::dynamic_pointer_cast(config.createObjectStorage(QueryContext::globalContext(), false)); EXPECT_TRUE(object_storage != nullptr); RelativePathsWithMetadata files_with_metadata; @@ -101,7 +102,7 @@ TEST(LocalExecutor, StorageObjectStorageSink) /// 1. Create ObjectStorageSink DB::StorageObjectStorageSink sink{ - object_storage, config.clone(), {}, {{STRING(), "name"}, {UINT(), "value"}}, SerializedPlanParser::global_context, ""}; + object_storage, config.clone(), {}, {{STRING(), "name"}, {UINT(), "value"}}, QueryContext::globalContext(), ""}; /// 2. Create Chunk auto chunk = testChunk(); @@ -114,7 +115,7 @@ TEST(LocalExecutor, StorageObjectStorageSink) INCBIN(native_write, SOURCE_DIR "/utils/extern-local-engine/tests/json/native_write_plan.json"); TEST(WritePipeline, SubstraitFileSink) { - const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + const auto context = DB::Context::createCopy(QueryContext::globalContext()); GlutenWriteSettings settings{ .task_write_tmp_dir = "file:///tmp/test_table/test", .task_write_filename = "data.parquet", @@ -170,7 +171,7 @@ INCBIN(native_write_one_partition, SOURCE_DIR "/utils/extern-local-engine/tests/ TEST(WritePipeline, SubstraitPartitionedFileSink) { - const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + const auto context = DB::Context::createCopy(QueryContext::globalContext()); GlutenWriteSettings settings{ .task_write_tmp_dir = "file:///tmp/test_table/test_partition", .task_write_filename = "data.parquet", @@ -221,7 +222,7 @@ TEST(WritePipeline, SubstraitPartitionedFileSink) TEST(WritePipeline, ComputePartitionedExpression) { - const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + const auto context = DB::Context::createCopy(QueryContext::globalContext()); auto partition_by = SubstraitPartitionedFileSink::make_partition_expression({"s_nationkey", "name"}); @@ -299,7 +300,7 @@ TEST(WritePipeline, MergeTree) { ThreadStatus thread_status; - const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + const auto context = DB::Context::createCopy(QueryContext::globalContext()); context->setPath("./"); const Settings & settings = context->getSettingsRef(); @@ -382,7 +383,7 @@ TEST(WritePipeline, SparkMergeTree) { ThreadStatus thread_status; - const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + const auto context = DB::Context::createCopy(QueryContext::globalContext()); context->setPath("./"); const Settings & settings = context->getSettingsRef(); @@ -396,7 +397,7 @@ TEST(WritePipeline, SparkMergeTree) do_remove(merge_tree_table.relative_path); - const auto dest_storage = merge_tree_table.getStorage(SerializedPlanParser::global_context); + const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext()); EXPECT_TRUE(dest_storage); EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote()); DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr(); @@ -437,7 +438,7 @@ TEST(WritePipeline, SparkMergeTree) EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs"); EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main"); - const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(SerializedPlanParser::global_context); + const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext()); EXPECT_TRUE(dest_storage_hdfs); EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote()); }