Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-1] Using PushingPipelineExecutor to write me…
Browse files Browse the repository at this point in the history
…rge tree (apache#7029)

* 1. Rename Storages/Mergetree to Storages/MergeTree
2. Move MergeTreeTool.cpp/.h from Common to Storages/MergeTree
3. Move CustomStorageMergeTree.cpp/.h and StorageMergeTreeFactory.cpp/.h to MergeTree  folderMove CustomStorageMergeTree.cpp/.h and StorageMergeTreeFactory.cpp/.h to MergeTree  folder
4. Add CustomMergeTreeDataWriter
5. Remove TempStorageFreer
6. Add SubstraitParserUtils

* Make query_map_ as QueryContextManager member

* EMBEDDED_PLAN and create_plan_and_executor

* minor refactor

* tmp

* SparkStorageMergeTree
CustomMergeTreeDataWriter => SparkMergeTreeDataWriter

* Add SparkMergeTreeSink

* use SparkStorageMergeTree and SparkMergeTreeSink

* Introduce GlutenSettings.h

* GlutenMergeTreeWriteSettings

* Fix Test Build

* typo

* ContextPtr => const ContextPtr &

* minor refactor

* fix style

* using GlutenMergeTreeWriteSettings

* [TMP] GlutenMergeTreeWriteSettings refactor

* [TMP] StorageMergeTreeWrapper

* [TMP] StorageMergeTreeWrapper::commitPartToRemoteStorageIfNeeded

* [TMP] StorageMergeTreeWrapper::saveMetadata

* move thread pool

* tmp

* rename

* move to sparkmergetreesink.h/cpp

* MergeTreeTableInstance

* sameStructWith => sameTable

* parseStorageAndRestore => restoreStorage
parseStorage => getStorage

* Sink with MergeTreeTable table;

* remvoe SparkMergeTreeWriter::writeTempPartAndFinalize

* refactor SinkHelper::writeTempPart

* Remove write_setting of SparkMergeTreeWriter

* SparkMergeTreeWriter using PushingPipelineExecutor

* SparkMergeTreeWriteSettings

* tmp

* GlutenMergeTreeWriteSettings => SparkMergeTreeWriteSettings

* make CustomStorageMergeTree constructor protected

* MergeTreeTool.cpp/.h => SparkMergeTreeMeta.cpp/.h

* CustomStorageMergeTree.cpp/.h => SparkStorageMergeTree.cpp/.h

* CustomStorageMergeTree => SparkStorageMergeTree
SparkStorageMergeTree => SparkWriteStorageMergeTree

* Refactor move codes from MergeTreeRelParser to MergeTreeTable and MergeTreeTableInstance

* Refactor Make static member to normal member
  • Loading branch information
baibaichen authored and shamirchen committed Oct 14, 2024
1 parent e8b457e commit e335fc9
Show file tree
Hide file tree
Showing 55 changed files with 2,239 additions and 1,348 deletions.
3 changes: 1 addition & 2 deletions cpp-ch/local-engine/Builder/SerializedPlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
*/
#pragma once

#include <DataTypes/IDataType.h>
#include <substrait/plan.pb.h>
#include <Common/MergeTreeTool.h>


namespace dbms
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ add_headers_and_sources(storages Storages)
add_headers_and_sources(storages Storages/Output)
add_headers_and_sources(storages Storages/Serializations)
add_headers_and_sources(storages Storages/IO)
add_headers_and_sources(storages Storages/Mergetree)
add_headers_and_sources(storages Storages/MergeTree)
add_headers_and_sources(storages Storages/Cache)
add_headers_and_sources(common Common)
add_headers_and_sources(external External)
Expand Down
16 changes: 5 additions & 11 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Parser/RelParser.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Processors/Chunk.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/printPipeline.h>
#include <Storages/Cache/CacheManager.h>
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <Storages/Output/WriteBufferBuilder.h>
#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>
Expand Down Expand Up @@ -519,16 +520,9 @@ std::map<std::string, std::string> BackendInitializerUtil::getBackendConfMap(std
if (!success)
break;

if (logger && logger->debug())
{
namespace pb_util = google::protobuf::util;
pb_util::JsonOptions options;
std::string json;
auto s = pb_util::MessageToJsonString(sPlan, &json, options);
if (!s.ok())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert Substrait Plan to Json");
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Update Config Map Plan:\n{}", json);
}
/// see initLoggers, logger == nullptr which meanas initLoggers is not called.
if (logger != nullptr)
logDebugMessage(sPlan, "Update Config Map Plan");

if (!sPlan.has_advanced_extensions() || !sPlan.advanced_extensions().has_enhancement())
break;
Expand Down
64 changes: 6 additions & 58 deletions cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ namespace local_engine
static const String MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE = "mergetree.insert_without_local_storage";
static const String MERGETREE_MERGE_AFTER_INSERT = "mergetree.merge_after_insert";
static const std::string DECIMAL_OPERATIONS_ALLOW_PREC_LOSS = "spark.sql.decimalOperations.allowPrecisionLoss";
static const std::string SPARK_TASK_WRITE_TMEP_DIR = "gluten.write.temp.dir";
static const std::string SPARK_TASK_WRITE_FILENAME = "gluten.write.file.name";

static const std::unordered_set<String> BOOL_VALUE_SETTINGS{
MERGETREE_MERGE_AFTER_INSERT, MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE, DECIMAL_OPERATIONS_ALLOW_PREC_LOSS};
Expand Down Expand Up @@ -84,7 +82,8 @@ class BlockUtil

/// The column names may be different in two blocks.
/// and the nullability also could be different, with TPCDS-Q1 as an example.
static DB::ColumnWithTypeAndName convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column);
static DB::ColumnWithTypeAndName
convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column);
};

class PODArrayUtil
Expand Down Expand Up @@ -216,7 +215,8 @@ class BackendInitializerUtil
static void registerAllFactories();
static void applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr, DB::Settings &);
static void updateNewSettings(const DB::ContextMutablePtr &, const DB::Settings &);
static std::vector<String> wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config);
static std::vector<String>
wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config);


static std::map<std::string, std::string> getBackendConfMap(std::string_view plan);
Expand Down Expand Up @@ -262,64 +262,12 @@ class MemoryUtil
static UInt64 getMemoryRSS();
};

template <typename T>
class ConcurrentDeque
{
public:
std::optional<T> pop_front()
{
std::lock_guard<std::mutex> lock(mtx);

if (deq.empty())
return {};

T t = deq.front();
deq.pop_front();
return t;
}

void emplace_back(T value)
{
std::lock_guard<std::mutex> lock(mtx);
deq.emplace_back(value);
}

void emplace_back(std::vector<T> values)
{
std::lock_guard<std::mutex> lock(mtx);
deq.insert(deq.end(), values.begin(), values.end());
}

void emplace_front(T value)
{
std::lock_guard<std::mutex> lock(mtx);
deq.emplace_front(value);
}

size_t size()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.size();
}

bool empty()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.empty();
}

std::deque<T> unsafeGet() { return deq; }

private:
std::deque<T> deq;
mutable std::mutex mtx;
};

class JoinUtil
{
public:
static void reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols);
static std::pair<DB::JoinKind, DB::JoinStrictness> getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join);
static std::pair<DB::JoinKind, DB::JoinStrictness>
getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join);
static std::pair<DB::JoinKind, DB::JoinStrictness> getCrossJoinKindAndStrictness(substrait::CrossRel_JoinType join_type);
};

Expand Down
14 changes: 7 additions & 7 deletions cpp-ch/local-engine/Common/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct MemoryConfig
size_t off_heap_per_task = 0;
double spill_mem_ratio = 0.9;

static MemoryConfig loadFromContext(DB::ContextPtr context)
static MemoryConfig loadFromContext(const DB::ContextPtr & context)
{
MemoryConfig config;
config.extra_memory_hard_limit = context->getConfigRef().getUInt64(EXTRA_MEMORY_HARD_LIMIT, 0);
Expand All @@ -58,7 +58,7 @@ struct GraceMergingAggregateConfig
size_t max_pending_flush_blocks_per_grace_aggregate_merging_bucket = 1_MiB;
double max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9;

static GraceMergingAggregateConfig loadFromContext(DB::ContextPtr context)
static GraceMergingAggregateConfig loadFromContext(const DB::ContextPtr & context)
{
GraceMergingAggregateConfig config;
config.max_grace_aggregate_merging_buckets = context->getConfigRef().getUInt64(MAX_GRACE_AGGREGATE_MERGING_BUCKETS, 32);
Expand All @@ -82,7 +82,7 @@ struct StreamingAggregateConfig
double high_cardinality_threshold_for_streaming_aggregating = 0.8;
bool enable_streaming_aggregating = true;

static StreamingAggregateConfig loadFromContext(DB::ContextPtr context)
static StreamingAggregateConfig loadFromContext(const DB::ContextPtr & context)
{
StreamingAggregateConfig config;
config.aggregated_keys_before_streaming_aggregating_evict = context->getConfigRef().getUInt64(AGGREGATED_KEYS_BEFORE_STREAMING_AGGREGATING_EVICT, 1024);
Expand Down Expand Up @@ -122,7 +122,7 @@ struct ExecutorConfig
bool dump_pipeline = false;
bool use_local_format = false;

static ExecutorConfig loadFromContext(DB::ContextPtr context)
static ExecutorConfig loadFromContext(const DB::ContextPtr & context)
{
ExecutorConfig config;
config.dump_pipeline = context->getConfigRef().getBool(DUMP_PIPELINE, false);
Expand Down Expand Up @@ -161,7 +161,7 @@ struct S3Config
String s3_local_cache_cache_path = "";
bool s3_gcs_issue_compose_request = false;

static S3Config loadFromContext(DB::ContextPtr context)
static S3Config loadFromContext(const DB::ContextPtr & context)
{
S3Config config;

Expand All @@ -187,7 +187,7 @@ struct MergeTreeConfig
size_t table_part_metadata_cache_max_count = 5000;
size_t table_metadata_cache_max_count = 500;

static MergeTreeConfig loadFromContext(DB::ContextPtr context)
static MergeTreeConfig loadFromContext(const DB::ContextPtr & context)
{
MergeTreeConfig config;
config.table_part_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_PART_METADATA_CACHE_MAX_COUNT, 5000);
Expand All @@ -202,7 +202,7 @@ struct GlutenJobSchedulerConfig

size_t job_scheduler_max_threads = 10;

static GlutenJobSchedulerConfig loadFromContext(DB::ContextPtr context)
static GlutenJobSchedulerConfig loadFromContext(const DB::ContextPtr & context)
{
GlutenJobSchedulerConfig config;
config.job_scheduler_max_threads = context->getConfigRef().getUInt64(JOB_SCHEDULER_MAX_THREADS, 10);
Expand Down
63 changes: 63 additions & 0 deletions cpp-ch/local-engine/Common/GlutenSettings.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Interpreters/Context_fwd.h>

namespace local_engine
{

#define SKIP_ALIAS(ALIAS_NAME)

#define DECLARE_GLUTEN_SETTING_STATIC_MEMBER_(TYPE, NAME, DEFAULT, DESCRIPTION, UNIQ) TYPE NAME{DEFAULT};

#define GLUTEN_SETTING_STATIC_MEMBER_(NAME) s_##NAME##_

#define INITIALIZE_GLUTEN_SETTING_STATIC_MEMBER_(TYPE, NAME, DEFAULT, DESCRIPTION, UNIQ) \
static constexpr std::string_view GLUTEN_SETTING_STATIC_MEMBER_(NAME) = "g." #UNIQ "." #NAME;

#define DECLARE_GLUTEN_SETTINGS(SETTINGS_CLASS_NAME, LIST_OF_SETTINGS_MACRO) \
struct SETTINGS_CLASS_NAME \
{ \
LIST_OF_SETTINGS_MACRO(DECLARE_GLUTEN_SETTING_STATIC_MEMBER_, SKIP_ALIAS, _) \
static SETTINGS_CLASS_NAME get(const DB::ContextPtr & context); \
void set(const DB::ContextMutablePtr & context) const; \
\
private: \
LIST_OF_SETTINGS_MACRO(INITIALIZE_GLUTEN_SETTING_STATIC_MEMBER_, SKIP_ALIAS, __COUNTER__) \
};

#define IMPLEMENT_GLUTEN_GET_(TYPE, NAME, DEFAULT, DESCRIPTION, UNIQ) \
if (DB::Field field_##NAME; settings.tryGet(GLUTEN_SETTING_STATIC_MEMBER_(NAME), field_##NAME)) \
result.NAME = field_##NAME.safeGet<TYPE>();

#define IMPLEMENT_GLUTEN_SET_(TYPE, NAME, DEFAULT, DESCRIPTION, UNIQ) context->setSetting(GLUTEN_SETTING_STATIC_MEMBER_(NAME), NAME);

#define IMPLEMENT_GLUTEN_SETTINGS(SETTINGS_CLASS_NAME, LIST_OF_SETTINGS_MACRO) \
SETTINGS_CLASS_NAME SETTINGS_CLASS_NAME::get(const DB::ContextPtr & context) \
{ \
SETTINGS_CLASS_NAME result; \
const DB::Settings & settings = context->getSettingsRef(); \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_GLUTEN_GET_, SKIP_ALIAS, _) \
return result; \
} \
void SETTINGS_CLASS_NAME::SETTINGS_CLASS_NAME::set(const DB::ContextMutablePtr & context) const \
{ \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_GLUTEN_SET_, SKIP_ALIAS, _) \
}


}
Loading

0 comments on commit e335fc9

Please sign in to comment.