diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index 4bff46b6cd49e..b85ed25e15dc6 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -57,10 +57,10 @@ static Int64 findMinPosition(const NameSet & condition_table_columns, const Name DB::QueryPlanPtr MergeTreeRelParser::parseReadRel( DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const substrait::ReadRel::ExtensionTable & extension_table) { - auto merge_tree_table = MergeTreeTableInstance::parseMergeTreeTable(extension_table); + MergeTreeTableInstance merge_tree_table(extension_table); // ignore snapshot id for query merge_tree_table.snapshot_id = ""; - auto storage = MergeTreeTableInstance::restoreStorage(merge_tree_table, global_context); + auto storage = merge_tree_table.restoreStorage(global_context); DB::Block input; if (rel.has_base_schema() && rel.base_schema().names_size()) @@ -316,10 +316,10 @@ String MergeTreeRelParser::getCHFunctionName(const substrait::Expression_ScalarF String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_rel) { - auto merge_tree_table = MergeTreeTableInstance::parseFromAny(read_rel.advanced_extension().enhancement()); + MergeTreeTableInstance merge_tree_table(read_rel.advanced_extension().enhancement()); // ignore snapshot id for query merge_tree_table.snapshot_id = ""; - auto custom_storage_mergetree = MergeTreeTableInstance::restoreStorage(merge_tree_table, global_context); + auto storage = merge_tree_table.restoreStorage(global_context); auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema()); auto names_and_types_list = input.getNamesAndTypesList(); @@ -330,11 +330,10 @@ String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel & read_ std::vector selected_parts = StorageMergeTreeFactory::getDataPartsByNames( StorageID(merge_tree_table.database, merge_tree_table.table), merge_tree_table.snapshot_id, merge_tree_table.getPartNames()); - auto storage_snapshot - = std::make_shared(*custom_storage_mergetree, custom_storage_mergetree->getInMemoryMetadataPtr()); + auto storage_snapshot = std::make_shared(*storage, storage->getInMemoryMetadataPtr()); if (selected_parts.empty()) throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found."); - auto read_step = custom_storage_mergetree->reader.readFromParts( + auto read_step = storage->reader.readFromParts( selected_parts, /* alter_conversions = */ {}, diff --git a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp index 7b6c450398ea2..e72454ba6fb24 100644 --- a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp +++ b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp @@ -17,7 +17,7 @@ #include "SubstraitParserUtils.h" -#include +#include #include using namespace DB; @@ -38,4 +38,10 @@ void logDebugMessage(const google::protobuf::Message & message, const char * typ LOG_DEBUG(logger, "{}:\n{}", type, json); } } +std::string toString(const google::protobuf::Any & any) +{ + google::protobuf::StringValue sv; + sv.ParseFromString(any.value()); + return sv.value(); +} } \ No newline at end of file diff --git a/cpp-ch/local-engine/Parser/SubstraitParserUtils.h b/cpp-ch/local-engine/Parser/SubstraitParserUtils.h index 4c8ef1218400e..d93b80cdacaf3 100644 --- a/cpp-ch/local-engine/Parser/SubstraitParserUtils.h +++ b/cpp-ch/local-engine/Parser/SubstraitParserUtils.h @@ -69,4 +69,5 @@ Message BinaryToMessage(const std::string_view binary) void logDebugMessage(const google::protobuf::Message & message, const char * type); +std::string toString(const google::protobuf::Any & any); } // namespace local_engine diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index fb15cb74ad924..e74a74ae0fa69 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -86,7 +86,7 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr { try { - auto storage = MergeTreeTableInstance::restoreStorage(job_detail.table, context); + auto storage = job_detail.table.restoreStorage(context); auto storage_snapshot = std::make_shared(*storage, storage->getInMemoryMetadataPtr()); NamesAndTypesList names_and_types_list; diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp index 95f30eb17cdea..788b1b39fc203 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp @@ -16,10 +16,6 @@ */ #include "SparkMergeTreeMeta.h" -#include -#include -#include - #include #include #include @@ -29,6 +25,8 @@ #include #include #include +#include +#include #include using namespace DB; @@ -126,49 +124,6 @@ doBuildMetadata(const DB::NamesAndTypesList & columns, const ContextPtr & contex return metadata; } -} -namespace local_engine -{ - - -SparkStorageMergeTreePtr MergeTreeTable::getStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context) -{ - const DB::Block header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, merge_tree_table.low_card_key); - const auto metadata = buildMetaData(header, context, merge_tree_table); - - return StorageMergeTreeFactory::getStorage( - StorageID(merge_tree_table.database, merge_tree_table.table), - merge_tree_table.snapshot_id, - merge_tree_table, - [&]() -> SparkStorageMergeTreePtr - { - auto custom_storage_merge_tree = std::make_shared(merge_tree_table, *metadata, context); - return custom_storage_merge_tree; - }); -} - -SparkStorageMergeTreePtr MergeTreeTable::copyToDefaultPolicyStorage(const MergeTreeTable & table, ContextMutablePtr context) -{ - MergeTreeTable merge_tree_table{table}; - auto temp_uuid = UUIDHelpers::generateV4(); - String temp_uuid_str = toString(temp_uuid); - merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str; - merge_tree_table.snapshot_id = ""; - merge_tree_table.table_configs.storage_policy = ""; - merge_tree_table.relative_path = merge_tree_table.relative_path + "_" + temp_uuid_str; - return getStorage(merge_tree_table, context); -} - -SparkStorageMergeTreePtr MergeTreeTable::copyToVirtualStorage(const MergeTreeTable & table, const ContextMutablePtr & context) -{ - MergeTreeTable merge_tree_table{table}; - auto temp_uuid = UUIDHelpers::generateV4(); - String temp_uuid_str = toString(temp_uuid); - merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str; - merge_tree_table.snapshot_id = ""; - return getStorage(merge_tree_table, context); -} - void doParseMergeTreeTableString(MergeTreeTable & table, ReadBufferFromString & in) { assertString("MergeTree;", in); @@ -207,11 +162,52 @@ void doParseMergeTreeTableString(MergeTreeTable & table, ReadBufferFromString & assertChar('\n', in); } -MergeTreeTableInstance MergeTreeTableInstance::parseMergeTreeTableString(const std::string & info) +} +namespace local_engine +{ + +SparkStorageMergeTreePtr MergeTreeTable::getStorage(ContextMutablePtr context) const +{ + const DB::Block header = TypeParser::buildBlockFromNamedStruct(schema, low_card_key); + const auto metadata = buildMetaData(header, context); + + return StorageMergeTreeFactory::getStorage( + StorageID(database, table), + snapshot_id, + *this, + [&]() -> SparkStorageMergeTreePtr + { + auto custom_storage_merge_tree = std::make_shared(*this, *metadata, context); + return custom_storage_merge_tree; + }); +} + +SparkStorageMergeTreePtr MergeTreeTable::copyToDefaultPolicyStorage(const ContextMutablePtr & context) const +{ + MergeTreeTable merge_tree_table{*this}; + auto temp_uuid = UUIDHelpers::generateV4(); + String temp_uuid_str = toString(temp_uuid); + merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str; + merge_tree_table.snapshot_id = ""; + merge_tree_table.table_configs.storage_policy = ""; + merge_tree_table.relative_path = merge_tree_table.relative_path + "_" + temp_uuid_str; + return merge_tree_table.getStorage(context); +} + +SparkStorageMergeTreePtr MergeTreeTable::copyToVirtualStorage(const ContextMutablePtr & context) const +{ + MergeTreeTable merge_tree_table{*this}; + auto temp_uuid = UUIDHelpers::generateV4(); + String temp_uuid_str = toString(temp_uuid); + merge_tree_table.table = merge_tree_table.table + "_" + temp_uuid_str; + merge_tree_table.snapshot_id = ""; + return merge_tree_table.getStorage(context); +} + +MergeTreeTableInstance::MergeTreeTableInstance(const std::string & info) { - MergeTreeTableInstance result; ReadBufferFromString in(info); - doParseMergeTreeTableString(result, in); + doParseMergeTreeTableString(*this, in); while (!in.eof()) { @@ -222,37 +218,30 @@ MergeTreeTableInstance MergeTreeTableInstance::parseMergeTreeTableString(const s assertChar('\n', in); readIntText(part.end, in); assertChar('\n', in); - result.parts.emplace_back(part); + parts.emplace_back(part); } - - return result; } -MergeTreeTableInstance MergeTreeTableInstance::parseFromAny(const google::protobuf::Any & any) +MergeTreeTableInstance::MergeTreeTableInstance(const google::protobuf::Any & any) : MergeTreeTableInstance(toString(any)) { - google::protobuf::StringValue table; - table.ParseFromString(any.value()); - return parseMergeTreeTableString(table.value()); } -MergeTreeTableInstance MergeTreeTableInstance::parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table) +MergeTreeTableInstance::MergeTreeTableInstance(const substrait::ReadRel::ExtensionTable & extension_table) + : MergeTreeTableInstance(extension_table.detail()) { logDebugMessage(extension_table, "merge_tree_table"); - return parseFromAny(extension_table.detail()); } -SparkStorageMergeTreePtr -MergeTreeTableInstance::restoreStorage(const MergeTreeTableInstance & merge_tree_table, const ContextMutablePtr & context) +SparkStorageMergeTreePtr MergeTreeTableInstance::restoreStorage(const ContextMutablePtr & context) const { - auto result = getStorage(merge_tree_table, context); - restoreMetaData(result, merge_tree_table, *context); + auto result = getStorage(context); + restoreMetaData(result, *this, *context); return result; } -std::shared_ptr -buildMetaData(const DB::Block & header, const ContextPtr & context, const MergeTreeTable & table) +std::shared_ptr MergeTreeTable::buildMetaData(const DB::Block & header, const ContextPtr & context) const { - return doBuildMetadata(header.getNamesAndTypesList(), context, table); + return doBuildMetadata(header.getNamesAndTypesList(), context, *this); } std::unique_ptr buildMergeTreeSettings(const MergeTreeTableSettings & config) diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h index 942187b8caa0a..2a20430205a63 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h @@ -67,14 +67,16 @@ struct MergeTreeTable bool sameTable(const MergeTreeTable & other) const; - static SparkStorageMergeTreePtr getStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr context); + SparkStorageMergeTreePtr getStorage(ContextMutablePtr context) const; - // Create random table name and table path and use default storage policy. - // In insert case, mergetree data can be uploaded after merges in default storage(Local Disk). - static SparkStorageMergeTreePtr copyToDefaultPolicyStorage(const MergeTreeTable & table, ContextMutablePtr context); + /// Create random table name and table path and use default storage policy. + /// In insert case, mergetree data can be uploaded after merges in default storage(Local Disk). + SparkStorageMergeTreePtr copyToDefaultPolicyStorage(const ContextMutablePtr & context) const; - // Use same table path and data path as the original table. - static SparkStorageMergeTreePtr copyToVirtualStorage(const MergeTreeTable & table, const ContextMutablePtr & context); + /// Use same table path and data path as the original table. + SparkStorageMergeTreePtr copyToVirtualStorage(const ContextMutablePtr & context) const; + + std::shared_ptr buildMetaData(const DB::Block & header, const ContextPtr & context) const; }; struct MergeTreeTableInstance : MergeTreeTable @@ -83,14 +85,12 @@ struct MergeTreeTableInstance : MergeTreeTable std::unordered_set getPartNames() const; RangesInDataParts extractRange(DataPartsVector parts_vector) const; - static SparkStorageMergeTreePtr restoreStorage(const MergeTreeTableInstance & merge_tree_table, const ContextMutablePtr & context); - static MergeTreeTableInstance parseFromAny(const google::protobuf::Any & any); - static MergeTreeTableInstance parseMergeTreeTable(const substrait::ReadRel::ExtensionTable & extension_table); - static MergeTreeTableInstance parseMergeTreeTableString(const std::string & info); -}; + SparkStorageMergeTreePtr restoreStorage(const ContextMutablePtr & context) const; -std::shared_ptr -buildMetaData(const DB::Block & header, const ContextPtr & context, const MergeTreeTable & table); + explicit MergeTreeTableInstance(const google::protobuf::Any & any); + explicit MergeTreeTableInstance(const substrait::ReadRel::ExtensionTable & extension_table); + explicit MergeTreeTableInstance(const std::string & info); +}; std::unique_ptr buildMergeTreeSettings(const MergeTreeTableSettings & config); diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp index c1820ff034f84..32662a5abb677 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp @@ -72,12 +72,12 @@ void SparkMergeTreeSink::onFinish() SinkHelperPtr SparkMergeTreeSink::create( const MergeTreeTable & merge_tree_table, const SparkMergeTreeWriteSettings & write_settings_, const DB::ContextMutablePtr & context) { - auto dest_storage = MergeTreeTable::getStorage(merge_tree_table, context); + auto dest_storage = merge_tree_table.getStorage(context); bool isRemoteStorage = dest_storage->getStoragePolicy()->getAnyDisk()->isRemote(); bool insert_with_local_storage = !write_settings_.insert_without_local_storage; if (insert_with_local_storage && isRemoteStorage) { - auto temp = MergeTreeTable::copyToDefaultPolicyStorage(merge_tree_table, context); + auto temp = merge_tree_table.copyToDefaultPolicyStorage(context); LOG_DEBUG( &Poco::Logger::get("SparkMergeTreeWriter"), "Create temp table {} for local merge.", diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp index 088aa6126d5b7..1ede4960aafe4 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp @@ -54,7 +54,7 @@ std::unique_ptr SparkMergeTreeWriter::create( const DB::ContextMutablePtr & context) { const DB::Settings & settings = context->getSettingsRef(); - const auto dest_storage = MergeTreeTable::getStorage(merge_tree_table, context); + const auto dest_storage = merge_tree_table.getStorage(context); StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr(); Block header = metadata_snapshot->getSampleBlock(); ASTPtr none; diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp index 010c773c28113..b5f8ac048fb63 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp @@ -500,7 +500,7 @@ SinkToStoragePtr SparkWriteStorageMergeTree::write( settings.load(context); SinkHelperPtr sink_helper = SparkMergeTreeSink::create(table, settings, getContext()); #ifndef NDEBUG - auto dest_storage = MergeTreeTable::getStorage(table, getContext()); + auto dest_storage = table.getStorage(getContext()); assert(dest_storage.get() == this); #endif diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index ebd827ee70f13..5e862040ca727 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -901,9 +901,9 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_); auto extension_table = local_engine::BinaryToMessage( {reinterpret_cast(split_info_a.elems()), static_cast(split_info_a.length())}); - auto merge_tree_table = local_engine::MergeTreeTableInstance::parseMergeTreeTable(extension_table); - + local_engine::MergeTreeTableInstance merge_tree_table(extension_table); auto * writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, settings, query_context).release(); + return reinterpret_cast(writer); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } @@ -995,13 +995,10 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn auto extension_table = local_engine::BinaryToMessage( {reinterpret_cast(split_info_a.elems()), static_cast(split_info_a.length())}); - google::protobuf::StringValue table; - table.ParseFromString(extension_table.detail().value()); - auto merge_tree_table = local_engine::MergeTreeTableInstance::parseMergeTreeTableString(table.value()); - + local_engine::MergeTreeTableInstance merge_tree_table(extension_table); auto context = local_engine::QueryContextManager::instance().currentQueryContext(); // each task using its own CustomStorageMergeTree, don't reuse - auto temp_storage = local_engine::MergeTreeTable::copyToVirtualStorage(merge_tree_table, context); + auto temp_storage = merge_tree_table.copyToVirtualStorage(context); // prefetch all needed parts metadata before merge local_engine::restoreMetaData(temp_storage, merge_tree_table, *context); @@ -1296,7 +1293,7 @@ Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * std::unordered_set column_set; for (const auto & col : tokenizer) column_set.insert(col); - auto table = local_engine::MergeTreeTableInstance::parseMergeTreeTableString(table_def); + local_engine::MergeTreeTableInstance table(table_def); auto id = local_engine::CacheManager::instance().cacheParts(table, column_set); return local_engine::charTojstring(env, id.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr); diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp index 66a25f7ea1741..69b6fa0f6d446 100644 --- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp @@ -387,7 +387,7 @@ TEST(WritePipeline, SparkMergeTree) const Settings & settings = context->getSettingsRef(); const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_)); - const auto merge_tree_table = MergeTreeTableInstance::parseMergeTreeTable(extension_table); + MergeTreeTableInstance merge_tree_table(extension_table); EXPECT_EQ(merge_tree_table.database, "default"); EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree"); @@ -396,7 +396,7 @@ TEST(WritePipeline, SparkMergeTree) do_remove(merge_tree_table.relative_path); - const auto dest_storage = MergeTreeTable::getStorage(merge_tree_table, SerializedPlanParser::global_context); + const auto dest_storage = merge_tree_table.getStorage(SerializedPlanParser::global_context); EXPECT_TRUE(dest_storage); EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote()); DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr(); @@ -431,13 +431,13 @@ TEST(WritePipeline, SparkMergeTree) { const auto extension_table_hdfs = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_)); - const auto merge_tree_table_hdfs = MergeTreeTableInstance::parseMergeTreeTable(extension_table_hdfs); + MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs); EXPECT_EQ(merge_tree_table_hdfs.database, "default"); EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs"); EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs"); EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main"); - const auto dest_storage_hdfs = MergeTreeTable::getStorage(merge_tree_table_hdfs, SerializedPlanParser::global_context); + const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(SerializedPlanParser::global_context); EXPECT_TRUE(dest_storage_hdfs); EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote()); }