Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7028][CH][Part-1] Using PushingPipelineExecutor to write merge tree #7029

Merged
merged 41 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
dda16a3
1. Rename Storages/Mergetree to Storages/MergeTree
baibaichen Aug 28, 2024
dabedb8
Make query_map_ as QueryContextManager member
baibaichen Aug 28, 2024
62ad0d9
EMBEDDED_PLAN and create_plan_and_executor
baibaichen Aug 28, 2024
bee854f
minor refactor
baibaichen Aug 28, 2024
e2ced23
tmp
baibaichen Aug 28, 2024
c09a531
SparkStorageMergeTree
baibaichen Aug 29, 2024
8800bc0
Add SparkMergeTreeSink
baibaichen Aug 29, 2024
069a3cb
use SparkStorageMergeTree and SparkMergeTreeSink
baibaichen Aug 29, 2024
a780dc1
Introduce GlutenSettings.h
baibaichen Sep 2, 2024
5a168c7
GlutenMergeTreeWriteSettings
baibaichen Sep 2, 2024
beb6b51
Fix Test Build
baibaichen Sep 2, 2024
a7b04fc
typo
baibaichen Sep 2, 2024
05b1595
ContextPtr => const ContextPtr &
baibaichen Sep 2, 2024
91b8ea6
minor refactor
baibaichen Sep 2, 2024
acc9d2d
fix style
baibaichen Sep 2, 2024
6df785a
using GlutenMergeTreeWriteSettings
baibaichen Sep 2, 2024
dac797b
[TMP] GlutenMergeTreeWriteSettings refactor
baibaichen Sep 2, 2024
f681bb8
[TMP] StorageMergeTreeWrapper
baibaichen Sep 2, 2024
ff13665
[TMP] StorageMergeTreeWrapper::commitPartToRemoteStorageIfNeeded
baibaichen Sep 2, 2024
34bd4cd
[TMP] StorageMergeTreeWrapper::saveMetadata
baibaichen Sep 2, 2024
3e688aa
move thread pool
baibaichen Sep 3, 2024
cb6ece7
tmp
baibaichen Sep 3, 2024
4cdec0d
rename
baibaichen Sep 3, 2024
4741bc0
move to sparkmergetreesink.h/cpp
baibaichen Sep 3, 2024
4a3d71f
MergeTreeTableInstance
baibaichen Sep 3, 2024
eb59e3b
sameStructWith => sameTable
baibaichen Sep 3, 2024
6008674
parseStorageAndRestore => restoreStorage
baibaichen Sep 3, 2024
cc644b4
Sink with MergeTreeTable table;
baibaichen Sep 3, 2024
bf25c6c
remvoe SparkMergeTreeWriter::writeTempPartAndFinalize
baibaichen Sep 4, 2024
f12ef0a
refactor SinkHelper::writeTempPart
baibaichen Sep 4, 2024
b6267d4
Remove write_setting of SparkMergeTreeWriter
baibaichen Sep 4, 2024
81c9025
SparkMergeTreeWriter using PushingPipelineExecutor
baibaichen Sep 4, 2024
29983f9
SparkMergeTreeWriteSettings
baibaichen Sep 5, 2024
4910601
tmp
baibaichen Sep 5, 2024
a263a01
GlutenMergeTreeWriteSettings => SparkMergeTreeWriteSettings
baibaichen Sep 5, 2024
05ff165
make CustomStorageMergeTree constructor protected
baibaichen Sep 5, 2024
22fdf73
MergeTreeTool.cpp/.h => SparkMergeTreeMeta.cpp/.h
baibaichen Sep 5, 2024
24bfe01
CustomStorageMergeTree.cpp/.h => SparkStorageMergeTree.cpp/.h
baibaichen Sep 5, 2024
ef7cd23
CustomStorageMergeTree => SparkStorageMergeTree
baibaichen Sep 5, 2024
b758f9b
Refactor move codes from MergeTreeRelParser to MergeTreeTable and Mer…
baibaichen Sep 5, 2024
67f377b
Refactor Make static member to normal member
baibaichen Sep 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cpp-ch/local-engine/Builder/SerializedPlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
*/
#pragma once

#include <DataTypes/IDataType.h>
#include <substrait/plan.pb.h>
#include <Common/MergeTreeTool.h>


namespace dbms
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ add_headers_and_sources(storages Storages)
add_headers_and_sources(storages Storages/Output)
add_headers_and_sources(storages Storages/Serializations)
add_headers_and_sources(storages Storages/IO)
add_headers_and_sources(storages Storages/Mergetree)
add_headers_and_sources(storages Storages/MergeTree)
add_headers_and_sources(storages Storages/Cache)
add_headers_and_sources(common Common)
add_headers_and_sources(external External)
Expand Down
16 changes: 5 additions & 11 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Parser/RelParser.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Processors/Chunk.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/printPipeline.h>
#include <Storages/Cache/CacheManager.h>
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <Storages/Output/WriteBufferBuilder.h>
#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>
Expand Down Expand Up @@ -519,16 +520,9 @@ std::map<std::string, std::string> BackendInitializerUtil::getBackendConfMap(std
if (!success)
break;

if (logger && logger->debug())
{
namespace pb_util = google::protobuf::util;
pb_util::JsonOptions options;
std::string json;
auto s = pb_util::MessageToJsonString(sPlan, &json, options);
if (!s.ok())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert Substrait Plan to Json");
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Update Config Map Plan:\n{}", json);
}
/// see initLoggers, logger == nullptr which meanas initLoggers is not called.
if (logger != nullptr)
logDebugMessage(sPlan, "Update Config Map Plan");

if (!sPlan.has_advanced_extensions() || !sPlan.advanced_extensions().has_enhancement())
break;
Expand Down
64 changes: 6 additions & 58 deletions cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ namespace local_engine
static const String MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE = "mergetree.insert_without_local_storage";
static const String MERGETREE_MERGE_AFTER_INSERT = "mergetree.merge_after_insert";
static const std::string DECIMAL_OPERATIONS_ALLOW_PREC_LOSS = "spark.sql.decimalOperations.allowPrecisionLoss";
static const std::string SPARK_TASK_WRITE_TMEP_DIR = "gluten.write.temp.dir";
static const std::string SPARK_TASK_WRITE_FILENAME = "gluten.write.file.name";

static const std::unordered_set<String> BOOL_VALUE_SETTINGS{
MERGETREE_MERGE_AFTER_INSERT, MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE, DECIMAL_OPERATIONS_ALLOW_PREC_LOSS};
Expand Down Expand Up @@ -84,7 +82,8 @@ class BlockUtil

/// The column names may be different in two blocks.
/// and the nullability also could be different, with TPCDS-Q1 as an example.
static DB::ColumnWithTypeAndName convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column);
static DB::ColumnWithTypeAndName
convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column);
};

class PODArrayUtil
Expand Down Expand Up @@ -216,7 +215,8 @@ class BackendInitializerUtil
static void registerAllFactories();
static void applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr, DB::Settings &);
static void updateNewSettings(const DB::ContextMutablePtr &, const DB::Settings &);
static std::vector<String> wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config);
static std::vector<String>
wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config);


static std::map<std::string, std::string> getBackendConfMap(std::string_view plan);
Expand Down Expand Up @@ -262,64 +262,12 @@ class MemoryUtil
static UInt64 getMemoryRSS();
};

template <typename T>
class ConcurrentDeque
{
public:
std::optional<T> pop_front()
{
std::lock_guard<std::mutex> lock(mtx);

if (deq.empty())
return {};

T t = deq.front();
deq.pop_front();
return t;
}

void emplace_back(T value)
{
std::lock_guard<std::mutex> lock(mtx);
deq.emplace_back(value);
}

void emplace_back(std::vector<T> values)
{
std::lock_guard<std::mutex> lock(mtx);
deq.insert(deq.end(), values.begin(), values.end());
}

void emplace_front(T value)
{
std::lock_guard<std::mutex> lock(mtx);
deq.emplace_front(value);
}

size_t size()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.size();
}

bool empty()
{
std::lock_guard<std::mutex> lock(mtx);
return deq.empty();
}

std::deque<T> unsafeGet() { return deq; }

private:
std::deque<T> deq;
mutable std::mutex mtx;
};

class JoinUtil
{
public:
static void reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols);
static std::pair<DB::JoinKind, DB::JoinStrictness> getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join);
static std::pair<DB::JoinKind, DB::JoinStrictness>
getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join);
static std::pair<DB::JoinKind, DB::JoinStrictness> getCrossJoinKindAndStrictness(substrait::CrossRel_JoinType join_type);
};

Expand Down
14 changes: 7 additions & 7 deletions cpp-ch/local-engine/Common/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct MemoryConfig
size_t off_heap_per_task = 0;
double spill_mem_ratio = 0.9;

static MemoryConfig loadFromContext(DB::ContextPtr context)
static MemoryConfig loadFromContext(const DB::ContextPtr & context)
{
MemoryConfig config;
config.extra_memory_hard_limit = context->getConfigRef().getUInt64(EXTRA_MEMORY_HARD_LIMIT, 0);
Expand All @@ -58,7 +58,7 @@ struct GraceMergingAggregateConfig
size_t max_pending_flush_blocks_per_grace_aggregate_merging_bucket = 1_MiB;
double max_allowed_memory_usage_ratio_for_aggregate_merging = 0.9;

static GraceMergingAggregateConfig loadFromContext(DB::ContextPtr context)
static GraceMergingAggregateConfig loadFromContext(const DB::ContextPtr & context)
{
GraceMergingAggregateConfig config;
config.max_grace_aggregate_merging_buckets = context->getConfigRef().getUInt64(MAX_GRACE_AGGREGATE_MERGING_BUCKETS, 32);
Expand All @@ -82,7 +82,7 @@ struct StreamingAggregateConfig
double high_cardinality_threshold_for_streaming_aggregating = 0.8;
bool enable_streaming_aggregating = true;

static StreamingAggregateConfig loadFromContext(DB::ContextPtr context)
static StreamingAggregateConfig loadFromContext(const DB::ContextPtr & context)
{
StreamingAggregateConfig config;
config.aggregated_keys_before_streaming_aggregating_evict = context->getConfigRef().getUInt64(AGGREGATED_KEYS_BEFORE_STREAMING_AGGREGATING_EVICT, 1024);
Expand Down Expand Up @@ -122,7 +122,7 @@ struct ExecutorConfig
bool dump_pipeline = false;
bool use_local_format = false;

static ExecutorConfig loadFromContext(DB::ContextPtr context)
static ExecutorConfig loadFromContext(const DB::ContextPtr & context)
{
ExecutorConfig config;
config.dump_pipeline = context->getConfigRef().getBool(DUMP_PIPELINE, false);
Expand Down Expand Up @@ -161,7 +161,7 @@ struct S3Config
String s3_local_cache_cache_path = "";
bool s3_gcs_issue_compose_request = false;

static S3Config loadFromContext(DB::ContextPtr context)
static S3Config loadFromContext(const DB::ContextPtr & context)
{
S3Config config;

Expand All @@ -187,7 +187,7 @@ struct MergeTreeConfig
size_t table_part_metadata_cache_max_count = 5000;
size_t table_metadata_cache_max_count = 500;

static MergeTreeConfig loadFromContext(DB::ContextPtr context)
static MergeTreeConfig loadFromContext(const DB::ContextPtr & context)
{
MergeTreeConfig config;
config.table_part_metadata_cache_max_count = context->getConfigRef().getUInt64(TABLE_PART_METADATA_CACHE_MAX_COUNT, 5000);
Expand All @@ -202,7 +202,7 @@ struct GlutenJobSchedulerConfig

size_t job_scheduler_max_threads = 10;

static GlutenJobSchedulerConfig loadFromContext(DB::ContextPtr context)
static GlutenJobSchedulerConfig loadFromContext(const DB::ContextPtr & context)
{
GlutenJobSchedulerConfig config;
config.job_scheduler_max_threads = context->getConfigRef().getUInt64(JOB_SCHEDULER_MAX_THREADS, 10);
Expand Down
63 changes: 63 additions & 0 deletions cpp-ch/local-engine/Common/GlutenSettings.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Interpreters/Context_fwd.h>

namespace local_engine
{

#define SKIP_ALIAS(ALIAS_NAME)

#define DECLARE_GLUTEN_SETTING_STATIC_MEMBER_(TYPE, NAME, DEFAULT, DESCRIPTION, UNIQ) TYPE NAME{DEFAULT};

#define GLUTEN_SETTING_STATIC_MEMBER_(NAME) s_##NAME##_

#define INITIALIZE_GLUTEN_SETTING_STATIC_MEMBER_(TYPE, NAME, DEFAULT, DESCRIPTION, UNIQ) \
static constexpr std::string_view GLUTEN_SETTING_STATIC_MEMBER_(NAME) = "g." #UNIQ "." #NAME;

#define DECLARE_GLUTEN_SETTINGS(SETTINGS_CLASS_NAME, LIST_OF_SETTINGS_MACRO) \
struct SETTINGS_CLASS_NAME \
{ \
LIST_OF_SETTINGS_MACRO(DECLARE_GLUTEN_SETTING_STATIC_MEMBER_, SKIP_ALIAS, _) \
static SETTINGS_CLASS_NAME get(const DB::ContextPtr & context); \
void set(const DB::ContextMutablePtr & context) const; \
\
private: \
LIST_OF_SETTINGS_MACRO(INITIALIZE_GLUTEN_SETTING_STATIC_MEMBER_, SKIP_ALIAS, __COUNTER__) \
};

#define IMPLEMENT_GLUTEN_GET_(TYPE, NAME, DEFAULT, DESCRIPTION, UNIQ) \
if (DB::Field field_##NAME; settings.tryGet(GLUTEN_SETTING_STATIC_MEMBER_(NAME), field_##NAME)) \
result.NAME = field_##NAME.safeGet<TYPE>();

#define IMPLEMENT_GLUTEN_SET_(TYPE, NAME, DEFAULT, DESCRIPTION, UNIQ) context->setSetting(GLUTEN_SETTING_STATIC_MEMBER_(NAME), NAME);

#define IMPLEMENT_GLUTEN_SETTINGS(SETTINGS_CLASS_NAME, LIST_OF_SETTINGS_MACRO) \
SETTINGS_CLASS_NAME SETTINGS_CLASS_NAME::get(const DB::ContextPtr & context) \
{ \
SETTINGS_CLASS_NAME result; \
const DB::Settings & settings = context->getSettingsRef(); \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_GLUTEN_GET_, SKIP_ALIAS, _) \
return result; \
} \
void SETTINGS_CLASS_NAME::SETTINGS_CLASS_NAME::set(const DB::ContextMutablePtr & context) const \
{ \
LIST_OF_SETTINGS_MACRO(IMPLEMENT_GLUTEN_SET_, SKIP_ALIAS, _) \
}


}
Loading
Loading