diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp index 02b8867dbb912..f35b04dbeb998 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp @@ -93,11 +93,6 @@ SparkMergeTreeWriter::SparkMergeTreeWriter( extractPartitionValues(write_settings.partition_settings.partition_dir, partition_values); } -bool SparkMergeTreeWriter::useLocalStorage() const -{ - return dataWrapper->useLocalStorage(); -} - void SparkMergeTreeWriter::write(const DB::Block & block) { auto new_block = removeColumnSuffix(block); @@ -155,17 +150,17 @@ void SparkMergeTreeWriter::finalize() finalizeMerge(); dataWrapper->commitPartToRemoteStorageIfNeeded(new_parts.unsafeGet(), context->getReadSettings(), context->getWriteSettings()); - saveMetadata(); + dataWrapper->saveMetadata(new_parts.unsafeGet(), context); } -void SparkMergeTreeWriter::saveMetadata() +void StorageMergeTreeWrapper::saveMetadata(const std::deque & parts, const DB::ContextPtr & context) { - if (!dataWrapper->isRemoteStorage) + if (!isRemoteStorage) return; - for (const auto & merge_tree_data_part : new_parts.unsafeGet()) + for (const auto & merge_tree_data_part : parts) { - auto part = dataWrapper->dest_storage().loadDataPartsWithNames({merge_tree_data_part->name}); + auto part = dest_storage().loadDataPartsWithNames({merge_tree_data_part->name}); if (part.empty()) { LOG_WARNING( @@ -176,10 +171,7 @@ void SparkMergeTreeWriter::saveMetadata() } saveFileStatus( - dataWrapper->dest_storage(), - context, - merge_tree_data_part->name, - const_cast(part.at(0)->getDataPartStorage())); + dest_storage(), context, merge_tree_data_part->name, const_cast(part.at(0)->getDataPartStorage())); } } diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h index 4e0789a483297..8a06d4cfff7f4 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h @@ -59,16 +59,14 @@ class StorageMergeTreeWrapper protected: CustomStorageMergeTreePtr merge_tree = nullptr; bool isRemoteStorage; - bool localStorageFirst; DB::StorageMetadataPtr metadata_snapshot; DB::Block header; public: virtual ~StorageMergeTreeWrapper() = default; - explicit StorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, bool isRemoteStorage_, bool useLocalStorage) + explicit StorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, bool isRemoteStorage_) : merge_tree(data_) , isRemoteStorage(isRemoteStorage_) - , localStorageFirst(useLocalStorage) , metadata_snapshot(merge_tree->getInMemoryMetadataPtr()) , header(metadata_snapshot->getSampleBlock()) { @@ -76,8 +74,6 @@ class StorageMergeTreeWrapper static StorageMergeTreeWrapperPtr create(const MergeTreeTable & merge_tree_table, bool insert_with_local_storage, const DB::ContextMutablePtr & context); - bool useLocalStorage() const { return localStorageFirst; }; - virtual CustomStorageMergeTree & dest_storage() { return *merge_tree; } virtual CustomStorageMergeTreePtr temp_storage() { return nullptr; } CustomStorageMergeTreePtr data() const { return merge_tree; } @@ -87,13 +83,14 @@ class StorageMergeTreeWrapper const std::deque & parts, const ReadSettings & read_settings, const WriteSettings & write_settings) { } + void saveMetadata(const std::deque & parts, const DB::ContextPtr & context); }; class DirectStorageMergeTreeWrapper : public StorageMergeTreeWrapper { public: explicit DirectStorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, bool isRemoteStorage_) - : StorageMergeTreeWrapper(data_, isRemoteStorage_, false) + : StorageMergeTreeWrapper(data_, isRemoteStorage_) { } }; @@ -104,7 +101,7 @@ class CopyToRemoteStorageMergeTreeWrapper : public StorageMergeTreeWrapper public: explicit CopyToRemoteStorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, const CustomStorageMergeTreePtr & org_) - : StorageMergeTreeWrapper(data_, true, true), org_storage(org_) + : StorageMergeTreeWrapper(data_, true), org_storage(org_) { assert(merge_tree != org_storage); } @@ -135,11 +132,9 @@ class SparkMergeTreeWriter void checkAndMerge(bool force = false); void safeEmplaceBackPart(DB::MergeTreeDataPartPtr); void safeAddPart(DB::MergeTreeDataPartPtr); - void saveMetadata(); void finalizeMerge(); bool chunkToPart(Chunk && plan_chunk); bool blockToPart(Block & block); - bool useLocalStorage() const; const GlutenMergeTreeWriteSettings write_settings; DB::ContextPtr context;