Skip to content

Commit

Permalink
Sink with MergeTreeTable table;
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 3, 2024
1 parent 6d8b7e8 commit 7b6f4dd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 38 deletions.
37 changes: 14 additions & 23 deletions cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,21 @@ extern const int UNKNOWN_TYPE;
}
}

namespace
{
local_engine::MergeTreeTableInstance parseFromAny(const google::protobuf::Any & any)
{
google::protobuf::StringValue table;
table.ParseFromString(any.value());
return local_engine::parseMergeTreeTableString(table.value());
}
}

namespace local_engine
{
using namespace DB;

/// Find minimal position of any of the column in primary key.
/// Find minimal position of the column in primary key.
static Int64 findMinPosition(const NameSet & condition_table_columns, const NameToIndexMap & primary_key_positions)
{
Int64 min_position = std::numeric_limits<Int64>::max() - 1;
Expand All @@ -58,9 +68,7 @@ static Int64 findMinPosition(const NameSet & condition_table_columns, const Name
MergeTreeTableInstance MergeTreeRelParser::parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table)
{
logDebugMessage(extension_table, "merge_tree_table");
google::protobuf::StringValue table;
table.ParseFromString(extension_table.detail().value());
return parseMergeTreeTableString(table.value());
return parseFromAny(extension_table.detail());
}

CustomStorageMergeTreePtr MergeTreeRelParser::getStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context)
Expand All @@ -74,15 +82,7 @@ CustomStorageMergeTreePtr MergeTreeRelParser::getStorage(const MergeTreeTable &
merge_tree_table,
[&]() -> CustomStorageMergeTreePtr
{
auto custom_storage_merge_tree = std::make_shared<SparkStorageMergeTree>(
StorageID(merge_tree_table.database, merge_tree_table.table),
merge_tree_table.relative_path,
*metadata,
false,
context,
"",
MergeTreeData::MergingParams(),
buildMergeTreeSettings(merge_tree_table.table_configs));
auto custom_storage_merge_tree = std::make_shared<SparkStorageMergeTree>(merge_tree_table, *metadata, context);
return custom_storage_merge_tree;
});
}
Expand Down Expand Up @@ -377,18 +377,9 @@ String MergeTreeRelParser::getCHFunctionName(const substrait::Expression_ScalarF
return getPlanParser()->getFunctionName(func_signature, substrait_func);
}

MergeTreeTableInstance xparseMergeTreeTable(const substrait::extensions::AdvancedExtension & extension)
{
logDebugMessage(extension, "merge_tree_table");

google::protobuf::StringValue table;
table.ParseFromString(extension.enhancement().value());
return parseMergeTreeTableString(table.value());
}

String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_rel)
{
auto merge_tree_table = xparseMergeTreeTable(read_rel.advanced_extension());
auto merge_tree_table = parseFromAny(read_rel.advanced_extension().enhancement());
// ignore snapshot id for query
merge_tree_table.snapshot_id = "";
auto custom_storage_mergetree = restoreStorage(merge_tree_table, global_context);
Expand Down
27 changes: 12 additions & 15 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
#pragma once

#include "MergeTreeTool.h"


#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/MergeTree/CustomStorageMergeTree.h>
#include <Common/CHUtil.h>
Expand Down Expand Up @@ -78,25 +81,18 @@ class SparkStorageMergeTree final : public CustomStorageMergeTree
friend class SparkMergeTreeSink;

public:
SparkStorageMergeTree(
const StorageID & table_id_,
const String & relative_data_path_,
const StorageInMemoryMetadata & metadata,
bool attach,
const ContextMutablePtr & context_,
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_)
SparkStorageMergeTree(const MergeTreeTable & table_, const StorageInMemoryMetadata & metadata, const ContextMutablePtr & context_)
: CustomStorageMergeTree(
table_id_,
relative_data_path_,
StorageID(table_.database, table_.table),
table_.relative_path,
metadata,
attach,
false,
context_,
date_column_name,
merging_params_,
std::move(settings_),
"",
MergingParams(),
buildMergeTreeSettings(table_.table_configs),
false /*has_force_restore_data_flag*/)
, table(table_)
, writer(*this)
{
}
Expand All @@ -105,6 +101,7 @@ class SparkStorageMergeTree final : public CustomStorageMergeTree
write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;

private:
MergeTreeTable table;
SparkMergeTreeDataWriter writer;
};

Expand Down

0 comments on commit 7b6f4dd

Please sign in to comment.