From 8893a43ae8859acd684b0f3587ebd54f3671966d Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Wed, 4 Sep 2024 11:57:20 +0800 Subject: [PATCH] remvoe SparkMergeTreeWriter::writeTempPartAndFinalize --- .../Storages/MergeTree/SparkMergeTreeSink.cpp | 20 ++++++++++++++++++- .../Storages/MergeTree/SparkMergeTreeSink.h | 14 +++++++++---- .../MergeTree/SparkMergeTreeWriter.cpp | 10 +--------- .../Storages/MergeTree/SparkMergeTreeWriter.h | 2 -- 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp index 2fd2deacfe35d..793a4a2326762 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp @@ -192,6 +192,14 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeDataWriter::writeTempPart( SinkToStoragePtr SparkStorageMergeTree::write( const ASTPtr &, const StorageMetadataPtr & storage_in_memory_metadata, ContextPtr context, bool /*async_insert*/) { + GlutenMergeTreeWriteSettings settings{.partition_settings{MergeTreePartitionWriteSettings::get(context)}}; + settings.load(context); + SinkHelperPtr sink_helper = SinkHelper::create(table, settings, getContext()); +#ifndef NDEBUG + auto dest_storage = MergeTreeRelParser::getStorage(table, getContext()); + assert(dest_storage.get() == this); +#endif + return std::make_shared(*this, storage_in_memory_metadata, context); } @@ -254,9 +262,9 @@ SinkHelper::SinkHelper(const CustomStorageMergeTreePtr & data_, const GlutenMerg : write_settings(write_settings_) , data(data_) , isRemoteStorage(isRemoteStorage_) + , thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 100000) , metadata_snapshot(data->getInMemoryMetadataPtr()) , header(metadata_snapshot->getSampleBlock()) - , thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 100000) { } @@ -325,6 +333,16 @@ void SinkHelper::doMergePartsAsync(const std::vector & watch.elapsedMilliseconds()); }); } +void SinkHelper::writeTempPart( + DB::BlockWithPartition & block_with_partition, + const DB::StorageMetadataPtr & metadata_snapshot, + const ContextPtr & context, + const MergeTreePartitionWriteSettings & write_settings, + int part_num) +{ + auto tmp = dataRef().writer.writeTempPart(block_with_partition, metadata_snapshot, context, write_settings, part_num); + new_parts.emplace_back(tmp.part); +} void SinkHelper::checkAndMerge(bool force) { diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h index 9b617d7c8eed2..2b28fcd20110d 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.h @@ -75,10 +75,13 @@ class SparkMergeTreeDataWriter }; class SparkMergeTreeSink; +class SinkHelper; +using SinkHelperPtr = std::shared_ptr; class SparkStorageMergeTree final : public CustomStorageMergeTree { friend class SparkMergeTreeSink; + friend class SinkHelper; public: SparkStorageMergeTree(const MergeTreeTable & table_, const StorageInMemoryMetadata & metadata, const ContextMutablePtr & context_) @@ -161,8 +164,6 @@ class ConcurrentDeque mutable std::mutex mtx; }; -class SinkHelper; -using SinkHelperPtr = std::shared_ptr; class SinkHelper { protected: @@ -181,9 +182,15 @@ class SinkHelper protected: void doMergePartsAsync(const std::vector & prepare_merge_parts); virtual void cleanup() { } + SparkStorageMergeTree & dataRef() const { return assert_cast(*data); } public: - void emplacePart(const DB::MergeTreeDataPartPtr & part) { new_parts.emplace_back(part); } + void writeTempPart( + DB::BlockWithPartition & block_with_partition, + const DB::StorageMetadataPtr & metadata_snapshot, + const ContextPtr & context, + const MergeTreePartitionWriteSettings & write_settings, + int part_num); const std::deque & unsafeGet() const { return new_parts.unsafeGet(); } void checkAndMerge(bool force = false); @@ -198,7 +205,6 @@ class SinkHelper const DB::ContextMutablePtr & context); virtual CustomStorageMergeTree & dest_storage() { return *data; } - CustomStorageMergeTree & dataRef() const { return *data; } virtual void commit(const ReadSettings & read_settings, const WriteSettings & write_settings) { } void saveMetadata(const DB::ContextPtr & context); diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp index 576aa750714fb..f3c98f283934f 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp @@ -99,8 +99,7 @@ bool SparkMergeTreeWriter::blockToPart(Block & block) CurrentThread::flushUntrackedMemory(); before_write_memory = memory_tracker->get(); } - - dataWrapper->emplacePart(writeTempPartAndFinalize(item, dataWrapper->metadata_snapshot).part); + dataWrapper->writeTempPart(item, dataWrapper->metadata_snapshot, context, write_settings.partition_settings, part_num); part_num++; /// Reset earlier to free memory item.block.clear(); @@ -120,13 +119,6 @@ void SparkMergeTreeWriter::finalize() dataWrapper->saveMetadata(context); } -DB::MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPartAndFinalize( - DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot) const -{ - const SparkMergeTreeDataWriter writer(dataWrapper->dataRef()); - return writer.writeTempPart(block_with_partition, metadata_snapshot, context, write_settings.partition_settings, part_num); -} - std::vector SparkMergeTreeWriter::getAllPartInfo() const { std::vector res; diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h index 17d2ccb9ba169..7bf664ed6ad5a 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h @@ -59,8 +59,6 @@ class SparkMergeTreeWriter std::vector getAllPartInfo() const; private: - DB::MergeTreeDataWriter::TemporaryPart - writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot) const; bool chunkToPart(Chunk && plan_chunk); bool blockToPart(Block & block);