diff --git a/cpp-ch/local-engine/Common/CHUtil.h b/cpp-ch/local-engine/Common/CHUtil.h index 23bd972cb9422..3c741c7ffa22a 100644 --- a/cpp-ch/local-engine/Common/CHUtil.h +++ b/cpp-ch/local-engine/Common/CHUtil.h @@ -82,7 +82,8 @@ class BlockUtil /// The column names may be different in two blocks. /// and the nullability also could be different, with TPCDS-Q1 as an example. - static DB::ColumnWithTypeAndName convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column); + static DB::ColumnWithTypeAndName + convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, const DB::ColumnWithTypeAndName & sample_column); }; class PODArrayUtil @@ -214,7 +215,8 @@ class BackendInitializerUtil static void registerAllFactories(); static void applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr, DB::Settings &); static void updateNewSettings(const DB::ContextMutablePtr &, const DB::Settings &); - static std::vector wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config); + static std::vector + wrapDiskPathConfig(const String & path_prefix, const String & path_suffix, Poco::Util::AbstractConfiguration & config); static std::map getBackendConfMap(std::string_view plan); @@ -260,64 +262,12 @@ class MemoryUtil static UInt64 getMemoryRSS(); }; -template -class ConcurrentDeque -{ -public: - std::optional pop_front() - { - std::lock_guard lock(mtx); - - if (deq.empty()) - return {}; - - T t = deq.front(); - deq.pop_front(); - return t; - } - - void emplace_back(T value) - { - std::lock_guard lock(mtx); - deq.emplace_back(value); - } - - void emplace_back(std::vector values) - { - std::lock_guard lock(mtx); - deq.insert(deq.end(), values.begin(), values.end()); - } - - void emplace_front(T value) - { - std::lock_guard lock(mtx); - deq.emplace_front(value); - } - - size_t size() - { - std::lock_guard lock(mtx); - return deq.size(); - } - - bool empty() - { - std::lock_guard lock(mtx); - return deq.empty(); - } - - std::deque unsafeGet() { return deq; } - -private: - std::deque deq; - mutable std::mutex mtx; -}; - class JoinUtil { public: static void reorderJoinOutput(DB::QueryPlan & plan, DB::Names cols); - static std::pair getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join); + static std::pair + getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool is_existence_join); static std::pair getCrossJoinKindAndStrictness(substrait::CrossRel_JoinType join_type); }; diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp index b91dc68423f9f..1a8b71eb1ecce 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp @@ -152,7 +152,7 @@ std::vector mergeParts( std::vector selected_parts, std::unordered_map & partition_values, const String & new_part_uuid, - CustomStorageMergeTreePtr storage, + CustomStorageMergeTree & storage, const String & partition_dir, const String & bucket_dir) { @@ -179,7 +179,7 @@ std::vector mergeParts( // Copying a vector of columns `deduplicate by columns. DB::IExecutableTask::TaskResultCallback f = [](bool) {}; auto task = std::make_shared( - *storage, storage->getInMemoryMetadataPtr(), false, std::vector{}, false, entry, + storage, storage.getInMemoryMetadataPtr(), false, std::vector{}, false, entry, DB::TableLockHolder{}, f); task->setCurrentTransaction(DB::MergeTreeTransactionHolder{}, DB::MergeTreeTransactionPtr{}); @@ -187,7 +187,7 @@ std::vector mergeParts( executeHere(task); std::unordered_set to_load{future_part->name}; - std::vector merged = storage->loadDataPartsWithNames(to_load); + std::vector merged = storage.loadDataPartsWithNames(to_load); return merged; } diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h index a2f9999ceec49..b093f687ab10c 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h +++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h @@ -26,16 +26,13 @@ namespace local_engine void restoreMetaData(const CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context); void saveFileStatus( - const DB::MergeTreeData & storage, - const DB::ContextPtr& context, - const String & part_name, - IDataPartStorage & data_part_storage); + const DB::MergeTreeData & storage, const DB::ContextPtr & context, const String & part_name, IDataPartStorage & data_part_storage); std::vector mergeParts( std::vector selected_parts, std::unordered_map & partition_values, const String & new_part_uuid, - CustomStorageMergeTreePtr storage, + CustomStorageMergeTree & storage, const String & partition_dir, const String & bucket_dir); diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp index f35b04dbeb998..7768b329a79fe 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp @@ -58,35 +58,13 @@ Block removeColumnSuffix(const Block & block) namespace local_engine { -StorageMergeTreeWrapperPtr StorageMergeTreeWrapper::create( - const MergeTreeTable & merge_tree_table, bool insert_with_local_storage, const DB::ContextMutablePtr & context) -{ - auto dest_storage = MergeTreeRelParser::parseStorage(merge_tree_table, context); - bool isRemoteStorage = dest_storage->getStoragePolicy()->getAnyDisk()->isRemote(); - if (insert_with_local_storage && isRemoteStorage) - { - auto temp = MergeTreeRelParser::copyToDefaultPolicyStorage(merge_tree_table, context); - LOG_DEBUG( - &Poco::Logger::get("SparkMergeTreeWriter"), - "Create temp table {} for local merge.", - temp->getStorageID().getFullNameNotQuoted()); - return std::make_shared(temp, dest_storage); - } - - return std::make_shared(dest_storage, isRemoteStorage); -} - SparkMergeTreeWriter::SparkMergeTreeWriter( const MergeTreeTable & merge_tree_table, const GlutenMergeTreeWriteSettings & write_settings_, const DB::ContextPtr & context_) : write_settings(write_settings_) + , dataWrapper(StorageMergeTreeWrapper::create(merge_tree_table, write_settings, SerializedPlanParser::global_context)) , context(context_) - , thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 100000) { const DB::Settings & settings = context->getSettingsRef(); - - dataWrapper = StorageMergeTreeWrapper::create( - merge_tree_table, !write_settings.insert_without_local_storage, SerializedPlanParser::global_context); - squashing = std::make_unique(dataWrapper->header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); if (!write_settings.partition_settings.partition_dir.empty()) @@ -104,7 +82,7 @@ void SparkMergeTreeWriter::write(const DB::Block & block) bool has_part = chunkToPart(squashing->add({new_block.getColumns(), new_block.rows()})); if (has_part && write_settings.merge_after_insert) - checkAndMerge(); + dataWrapper->checkAndMerge(); } bool SparkMergeTreeWriter::chunkToPart(Chunk && plan_chunk) @@ -133,7 +111,7 @@ bool SparkMergeTreeWriter::blockToPart(Block & block) before_write_memory = memory_tracker->get(); } - new_parts.emplace_back(writeTempPartAndFinalize(item, dataWrapper->metadata_snapshot).part); + dataWrapper->emplacePart(writeTempPartAndFinalize(item, dataWrapper->metadata_snapshot).part); part_num++; /// Reset earlier to free memory item.block.clear(); @@ -147,17 +125,48 @@ void SparkMergeTreeWriter::finalize() { chunkToPart(squashing->flush()); if (write_settings.merge_after_insert) - finalizeMerge(); + dataWrapper->finalizeMerge(); - dataWrapper->commitPartToRemoteStorageIfNeeded(new_parts.unsafeGet(), context->getReadSettings(), context->getWriteSettings()); - dataWrapper->saveMetadata(new_parts.unsafeGet(), context); + dataWrapper->commit(context->getReadSettings(), context->getWriteSettings()); + dataWrapper->saveMetadata(context); } -void StorageMergeTreeWrapper::saveMetadata(const std::deque & parts, const DB::ContextPtr & context) +StorageMergeTreeWrapperPtr StorageMergeTreeWrapper::create( + const MergeTreeTable & merge_tree_table, const GlutenMergeTreeWriteSettings & write_settings_, const DB::ContextMutablePtr & context) +{ + auto dest_storage = MergeTreeRelParser::parseStorage(merge_tree_table, context); + bool isRemoteStorage = dest_storage->getStoragePolicy()->getAnyDisk()->isRemote(); + bool insert_with_local_storage = !write_settings_.insert_without_local_storage; + if (insert_with_local_storage && isRemoteStorage) + { + auto temp = MergeTreeRelParser::copyToDefaultPolicyStorage(merge_tree_table, context); + LOG_DEBUG( + &Poco::Logger::get("SparkMergeTreeWriter"), + "Create temp table {} for local merge.", + temp->getStorageID().getFullNameNotQuoted()); + return std::make_shared(temp, dest_storage, write_settings_); + } + + return std::make_shared(dest_storage, write_settings_, isRemoteStorage); +} + +StorageMergeTreeWrapper::StorageMergeTreeWrapper( + const CustomStorageMergeTreePtr & data_, const GlutenMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_) + : write_settings(write_settings_) + , data(data_) + , isRemoteStorage(isRemoteStorage_) + , metadata_snapshot(data->getInMemoryMetadataPtr()) + , header(metadata_snapshot->getSampleBlock()) + , thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 100000) +{ +} + +void StorageMergeTreeWrapper::saveMetadata(const DB::ContextPtr & context) { if (!isRemoteStorage) return; + const std::deque & parts = new_parts.unsafeGet(); for (const auto & merge_tree_data_part : parts) { auto part = dest_storage().loadDataPartsWithNames({merge_tree_data_part->name}); @@ -175,12 +184,13 @@ void StorageMergeTreeWrapper::saveMetadata(const std::deque & parts, const ReadSettings & read_settings, const WriteSettings & write_settings) +void CopyToRemoteStorageMergeTreeWrapper::commit(const ReadSettings & read_settings, const WriteSettings & write_settings) { LOG_DEBUG( &Poco::Logger::get("SparkMergeTreeWriter"), "Begin upload to disk {}.", dest_storage().getStoragePolicy()->getAnyDisk()->getName()); + const std::deque & parts = new_parts.unsafeGet(); + Stopwatch watch; for (const auto & merge_tree_data_part : parts) { @@ -219,7 +229,7 @@ void CopyToRemoteStorageMergeTreeWrapper::commitPartToRemoteStorageIfNeeded( &Poco::Logger::get("SparkMergeTreeWriter"), "Clean temp table {} success.", temp_storage()->getStorageID().getFullNameNotQuoted()); } -void SparkMergeTreeWriter::finalizeMerge() +void StorageMergeTreeWrapper::finalizeMerge() { LOG_DEBUG(&Poco::Logger::get("SparkMergeTreeWriter"), "Waiting all merge task end and do final merge"); // waiting all merge task end and do final merge @@ -232,32 +242,34 @@ void SparkMergeTreeWriter::finalizeMerge() checkAndMerge(true); thread_pool.wait(); } while (before_merge_size != new_parts.size()); + cleanup(); +} +void DirectStorageMergeTreeWrapper::cleanup() +{ + // default storage need clean temp. std::unordered_set final_parts; for (const auto & merge_tree_data_part : new_parts.unsafeGet()) final_parts.emplace(merge_tree_data_part->name); - // default storage need clean temp. - if (!dataWrapper->temp_storage()) + for (const auto & tmp_part : tmp_parts) { - for (const auto & tmp_part : tmp_parts) - { - if (final_parts.contains(tmp_part)) - continue; + if (final_parts.contains(tmp_part)) + continue; - GlobalThreadPool::instance().scheduleOrThrow( - [storage_ = dataWrapper->data(), tmp = tmp_part]() -> void + GlobalThreadPool::instance().scheduleOrThrow( + [storage_ = data, tmp = tmp_part]() -> void + { + for (const auto & disk : storage_->getDisks()) { - for (const auto & disk : storage_->getDisks()) - { - auto rel_path = storage_->getRelativeDataPath() + "/" + tmp; - disk->removeRecursive(rel_path); - } - }); - } + auto rel_path = storage_->getRelativeDataPath() + "/" + tmp; + disk->removeRecursive(rel_path); + } + }); } } + DB::MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPartAndFinalize( DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot) const { @@ -265,12 +277,13 @@ DB::MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPartAndFin return writer.writeTempPart(block_with_partition, metadata_snapshot, context, write_settings.partition_settings, part_num); } -std::vector SparkMergeTreeWriter::getAllPartInfo() +std::vector SparkMergeTreeWriter::getAllPartInfo() const { std::vector res; - res.reserve(new_parts.size()); + auto parts = dataWrapper->unsafeGet(); + res.reserve(parts.size()); - for (const auto & part : new_parts.unsafeGet()) + for (const auto & part : parts) { res.emplace_back(PartInfo{ part->name, @@ -315,57 +328,58 @@ String SparkMergeTreeWriter::partInfosToJson(const std::vector & part_ return result.GetString(); } -void SparkMergeTreeWriter::checkAndMerge(bool force) +void StorageMergeTreeWrapper::doMergePartsAsync(const std::vector & prepare_merge_parts) +{ + for (const auto & selected_part : prepare_merge_parts) + tmp_parts.emplace(selected_part->name); + + // check thread group initialized in task thread + currentThreadGroupMemoryUsage(); + thread_pool.scheduleOrThrow( + [this, prepare_merge_parts, thread_group = CurrentThread::getGroup()]() -> void + { + Stopwatch watch; + CurrentThread::detachFromGroupIfNotDetached(); + CurrentThread::attachToGroup(thread_group); + size_t before_size = 0; + size_t after_size = 0; + for (const auto & prepare_merge_part : prepare_merge_parts) + before_size += prepare_merge_part->getBytesOnDisk(); + + std::unordered_map partition_values; + const auto merged_parts = mergeParts( + prepare_merge_parts, + partition_values, + toString(UUIDHelpers::generateV4()), + dataRef(), + write_settings.partition_settings.partition_dir, + write_settings.partition_settings.bucket_dir); + for (const auto & merge_tree_data_part : merged_parts) + after_size += merge_tree_data_part->getBytesOnDisk(); + + new_parts.emplace_back(merged_parts); + watch.stop(); + LOG_INFO( + &Poco::Logger::get("SparkMergeTreeWriter"), + "Merge success. Before merge part size {}, part count {}, after part size {}, part count {}, " + "total elapsed {} ms", + before_size, + prepare_merge_parts.size(), + after_size, + merged_parts.size(), + watch.elapsedMilliseconds()); + }); +} + +void StorageMergeTreeWrapper::checkAndMerge(bool force) { // Only finalize should force merge. if (!force && new_parts.size() < write_settings.merge_limit_parts) return; - auto doMergeTask = [this](const std::vector & prepare_merge_parts) - { - for (const auto & selected_part : prepare_merge_parts) - tmp_parts.emplace(selected_part->name); - // check thread group initialized in task thread - currentThreadGroupMemoryUsage(); - thread_pool.scheduleOrThrow( - [this, prepare_merge_parts, thread_group = CurrentThread::getGroup()]() -> void - { - Stopwatch watch; - CurrentThread::detachFromGroupIfNotDetached(); - CurrentThread::attachToGroup(thread_group); - size_t before_size = 0; - size_t after_size = 0; - for (const auto & prepare_merge_part : prepare_merge_parts) - before_size += prepare_merge_part->getBytesOnDisk(); - - std::unordered_map partition_values; - const auto merged_parts = mergeParts( - prepare_merge_parts, - partition_values, - toString(UUIDHelpers::generateV4()), - dataWrapper->data(), - write_settings.partition_settings.partition_dir, - write_settings.partition_settings.bucket_dir); - for (const auto & merge_tree_data_part : merged_parts) - after_size += merge_tree_data_part->getBytesOnDisk(); - - new_parts.emplace_back(merged_parts); - watch.stop(); - LOG_INFO( - &Poco::Logger::get("SparkMergeTreeWriter"), - "Merge success. Before merge part size {}, part count {}, after part size {}, part count {}, " - "total elapsed {} ms", - before_size, - prepare_merge_parts.size(), - after_size, - merged_parts.size(), - watch.elapsedMilliseconds()); - }); - }; - std::vector selected_parts; selected_parts.reserve(write_settings.merge_limit_parts); - size_t totol_size = 0; + size_t total_size = 0; std::vector skip_parts; while (const auto merge_tree_data_part_option = new_parts.pop_front()) @@ -378,19 +392,19 @@ void SparkMergeTreeWriter::checkAndMerge(bool force) } selected_parts.emplace_back(merge_tree_data_part); - totol_size += merge_tree_data_part->getBytesOnDisk(); - if (write_settings.merge_min_size > totol_size && write_settings.merge_limit_parts > selected_parts.size()) + total_size += merge_tree_data_part->getBytesOnDisk(); + if (write_settings.merge_min_size > total_size && write_settings.merge_limit_parts > selected_parts.size()) continue; - doMergeTask(selected_parts); + doMergePartsAsync(selected_parts); selected_parts.clear(); - totol_size = 0; + total_size = 0; } if (!selected_parts.empty()) { if (force && selected_parts.size() > 1) - doMergeTask(selected_parts); + doMergePartsAsync(selected_parts); else new_parts.emplace_back(selected_parts); } diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h index 8a06d4cfff7f4..1c70f668bc13a 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h @@ -48,71 +48,137 @@ struct PartInfo bool operator<(const PartInfo & rhs) const { return disk_size < rhs.disk_size; } }; +// TODO: Remove ConcurrentDeque +template +class ConcurrentDeque +{ +public: + std::optional pop_front() + { + std::lock_guard lock(mtx); + + if (deq.empty()) + return {}; + + T t = deq.front(); + deq.pop_front(); + return t; + } + + void emplace_back(T value) + { + std::lock_guard lock(mtx); + deq.emplace_back(value); + } + + void emplace_back(std::vector values) + { + std::lock_guard lock(mtx); + deq.insert(deq.end(), values.begin(), values.end()); + } + + void emplace_front(T value) + { + std::lock_guard lock(mtx); + deq.emplace_front(value); + } + + size_t size() + { + std::lock_guard lock(mtx); + return deq.size(); + } + + bool empty() + { + std::lock_guard lock(mtx); + return deq.empty(); + } + + /// !!! unsafe get, only called when background tasks are finished + const std::deque & unsafeGet() const { return deq; } + +private: + std::deque deq; + mutable std::mutex mtx; +}; + class SparkMergeTreeWriter; class StorageMergeTreeWrapper; using StorageMergeTreeWrapperPtr = std::shared_ptr; class StorageMergeTreeWrapper { - friend class SparkMergeTreeWriter; - protected: - CustomStorageMergeTreePtr merge_tree = nullptr; + const GlutenMergeTreeWriteSettings write_settings; + CustomStorageMergeTreePtr data; bool isRemoteStorage; - DB::StorageMetadataPtr metadata_snapshot; - DB::Block header; + + ConcurrentDeque new_parts; + std::unordered_set tmp_parts{}; + ThreadPool thread_pool; public: - virtual ~StorageMergeTreeWrapper() = default; - explicit StorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, bool isRemoteStorage_) - : merge_tree(data_) - , isRemoteStorage(isRemoteStorage_) - , metadata_snapshot(merge_tree->getInMemoryMetadataPtr()) - , header(metadata_snapshot->getSampleBlock()) - { - } - static StorageMergeTreeWrapperPtr - create(const MergeTreeTable & merge_tree_table, bool insert_with_local_storage, const DB::ContextMutablePtr & context); + const DB::StorageMetadataPtr metadata_snapshot; + const DB::Block header; + +protected: + void doMergePartsAsync(const std::vector & prepare_merge_parts); + virtual void cleanup() { } - virtual CustomStorageMergeTree & dest_storage() { return *merge_tree; } - virtual CustomStorageMergeTreePtr temp_storage() { return nullptr; } - CustomStorageMergeTreePtr data() const { return merge_tree; } - CustomStorageMergeTree & dataRef() const { return *merge_tree; } +public: + void emplacePart(const DB::MergeTreeDataPartPtr & part) { new_parts.emplace_back(part); } - virtual void commitPartToRemoteStorageIfNeeded( - const std::deque & parts, const ReadSettings & read_settings, const WriteSettings & write_settings) - { - } - void saveMetadata(const std::deque & parts, const DB::ContextPtr & context); + const std::deque & unsafeGet() const { return new_parts.unsafeGet(); } + void checkAndMerge(bool force = false); + void finalizeMerge(); + + virtual ~StorageMergeTreeWrapper() = default; + explicit StorageMergeTreeWrapper( + const CustomStorageMergeTreePtr & data_, const GlutenMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_); + static StorageMergeTreeWrapperPtr create( + const MergeTreeTable & merge_tree_table, + const GlutenMergeTreeWriteSettings & write_settings_, + const DB::ContextMutablePtr & context); + + virtual CustomStorageMergeTree & dest_storage() { return *data; } + CustomStorageMergeTree & dataRef() const { return *data; } + + virtual void commit(const ReadSettings & read_settings, const WriteSettings & write_settings) { } + void saveMetadata(const DB::ContextPtr & context); }; class DirectStorageMergeTreeWrapper : public StorageMergeTreeWrapper { +protected: + void cleanup() override; + public: - explicit DirectStorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, bool isRemoteStorage_) - : StorageMergeTreeWrapper(data_, isRemoteStorage_) + explicit DirectStorageMergeTreeWrapper( + const CustomStorageMergeTreePtr & data_, const GlutenMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_) + : StorageMergeTreeWrapper(data_, write_settings_, isRemoteStorage_) { } }; class CopyToRemoteStorageMergeTreeWrapper : public StorageMergeTreeWrapper { - CustomStorageMergeTreePtr org_storage; + CustomStorageMergeTreePtr dest; public: - explicit CopyToRemoteStorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, const CustomStorageMergeTreePtr & org_) - : StorageMergeTreeWrapper(data_, true), org_storage(org_) + explicit CopyToRemoteStorageMergeTreeWrapper( + const CustomStorageMergeTreePtr & temp, + const CustomStorageMergeTreePtr & dest_, + const GlutenMergeTreeWriteSettings & write_settings_) + : StorageMergeTreeWrapper(temp, write_settings_, true), dest(dest_) { - assert(merge_tree != org_storage); + assert(data != dest); } - CustomStorageMergeTree & dest_storage() override { return *org_storage; } - CustomStorageMergeTreePtr temp_storage() override { return merge_tree; } + CustomStorageMergeTree & dest_storage() override { return *dest; } + const CustomStorageMergeTreePtr & temp_storage() const { return data; } - void commitPartToRemoteStorageIfNeeded( - const std::deque & parts, - const ReadSettings & read_settings, - const WriteSettings & write_settings) override; + void commit(const ReadSettings & read_settings, const WriteSettings & write_settings) override; }; class SparkMergeTreeWriter @@ -124,32 +190,22 @@ class SparkMergeTreeWriter void write(const DB::Block & block); void finalize(); - std::vector getAllPartInfo(); + std::vector getAllPartInfo() const; private: DB::MergeTreeDataWriter::TemporaryPart writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot) const; - void checkAndMerge(bool force = false); - void safeEmplaceBackPart(DB::MergeTreeDataPartPtr); - void safeAddPart(DB::MergeTreeDataPartPtr); - void finalizeMerge(); bool chunkToPart(Chunk && plan_chunk); bool blockToPart(Block & block); const GlutenMergeTreeWriteSettings write_settings; + StorageMergeTreeWrapperPtr dataWrapper; DB::ContextPtr context; + std::unordered_map partition_values; - StorageMergeTreeWrapperPtr dataWrapper = nullptr; - std::unique_ptr squashing; - int part_num = 1; - ConcurrentDeque new_parts; - std::unordered_map partition_values; - std::unordered_set tmp_parts; - ThreadPool thread_pool; - - std::mutex memory_mutex; + int part_num = 1; }; } diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 3fd56062e875b..3e5369026ac36 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1014,7 +1014,7 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn std::unordered_map partition_values; std::vector loaded - = local_engine::mergeParts(selected_parts, partition_values, uuid_str, temp_storage, partition_dir, bucket_dir); + = local_engine::mergeParts(selected_parts, partition_values, uuid_str, *temp_storage, partition_dir, bucket_dir); std::vector res; for (auto & partPtr : loaded) @@ -1081,7 +1081,15 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild DB::CompressedReadBuffer input(read_buffer_from_java_array); local_engine::configureCompressedReadBuffer(input); const auto * obj = make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin( - hash_table_id, input, row_count_, join_key, join_type_, has_mixed_join_condition, is_existence_join, struct_string, is_null_aware_anti_join)); + hash_table_id, + input, + row_count_, + join_key, + join_type_, + has_mixed_join_condition, + is_existence_join, + struct_string, + is_null_aware_anti_join)); return obj->instance(); LOCAL_ENGINE_JNI_METHOD_END(env, 0) }