From a8b316141d00e8c849f6098206f3decad9f1657a Mon Sep 17 00:00:00 2001 From: Hongbin Ma Date: Fri, 12 Apr 2024 10:09:51 +0800 Subject: [PATCH] [Gluten-5152][CH] fix bugs for optimizing tables on s3 (#5282) What changes were proposed in this pull request? fix multiple bugs for optimizing tables on s3, including: take care of metadata.gluten race condition bug when two tasks are performing restoreMetadata on same executor at same time. avoid creating tmp folder because it's problematic to rename on S3 (Fixes: #5152) How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) --- ...utenClickHouseMergeTreeOptimizeSuite.scala | 61 +++++++++++++++---- .../Parser/MergeTreeRelParser.cpp | 28 ++++++++- .../Storages/CustomStorageMergeTree.cpp | 4 +- .../Storages/CustomStorageMergeTree.h | 2 +- .../Mergetree/MergeSparkMergeTreeTask.cpp | 12 ++-- .../Storages/Mergetree/MetaDataHelper.cpp | 37 +++++++++-- .../Storages/Mergetree/MetaDataHelper.h | 7 ++- .../Mergetree/SparkMergeTreeWriter.cpp | 25 +------- .../Storages/Mergetree/SparkMergeTreeWriter.h | 1 - cpp-ch/local-engine/local_engine_jni.cpp | 8 ++- 10 files changed, 135 insertions(+), 50 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala index e553a8c1707c..9b4b552f08e7 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala @@ -74,7 +74,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite val ret = spark.sql("select count(*) from lineitem_mergetree_optimize").collect() assert(ret.apply(0).get(0) == 600572) - spark.sql("optimize lineitem_mergetree_optimize") assert( countFiles(new File(s"$basePath/lineitem_mergetree_optimize")) == 462 ) // many merged parts @@ -148,7 +147,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(ret.apply(0).get(0) == 600572) spark.sql("set spark.gluten.enabled=false") - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 815) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 812) spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 232) spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS") @@ -179,12 +178,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(ret.apply(0).get(0) == 600572) spark.sql("set spark.gluten.enabled=false") - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 411) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 398) spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS") - // for tables with more than one layer of nested table (like partition + bucket, or two partition col - // the 'tmp_merge' folder is not guarantee to be removed, causing this file number to be unstable -// assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 290) - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) > 270) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 286) spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 270) spark.sql("set spark.gluten.enabled=true") @@ -213,10 +209,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(ret.apply(0).get(0) == 600572) spark.sql("set spark.gluten.enabled=false") - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 411) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 398) spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS") -// assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 290) - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) > 270) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 286) spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 270) spark.sql("set spark.gluten.enabled=true") @@ -308,7 +303,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("set spark.gluten.enabled=false") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == { - if (sparkVersion.equals("3.2")) 940 else 1023 + if (sparkVersion.equals("3.2")) 931 else 1014 }) spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS") @@ -321,5 +316,49 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(ret2.apply(0).get(0) == 600572) } + test("test skip index after optimize") { + withSQLConf( + "spark.databricks.delta.optimize.maxFileSize" -> "2000000", + "spark.sql.adaptive.enabled" -> "false") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_index; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_index + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_index' + |TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey') + | as select * from lineitem + |""".stripMargin) + + spark.sql("optimize lineitem_mergetree_index") + spark.sql("set spark.gluten.enabled=false") + spark.sql("vacuum lineitem_mergetree_index") + spark.sql("set spark.gluten.enabled=true") + + val df = spark + .sql(s""" + |select count(*) from lineitem_mergetree_index where l_orderkey = '600000' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_index") + val partDir = directory.listFiles().filter(f => f.getName.endsWith("merged")).head + assert( + partDir.listFiles().exists(p => p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx"))) + + } + } + } // scalastyle:off line.size.limit diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index eec239c73241..57ad97fae96f 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -80,6 +80,32 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage( auto storage_factory = StorageMergeTreeFactory::instance(); auto metadata = buildMetaData(names_and_types_list, context, merge_tree_table); + { + // use instance global table (without uuid) to restore metadata folder on current instance + // we need its lock + + auto global_storage = storage_factory.getStorage( + StorageID(merge_tree_table.database, merge_tree_table.table), + merge_tree_table.snapshot_id, + metadata->getColumns(), + [&]() -> CustomStorageMergeTreePtr + { + auto custom_storage_merge_tree = std::make_shared( + StorageID(merge_tree_table.database, merge_tree_table.table), + merge_tree_table.relative_path, + *metadata, + false, + context, + "", + MergeTreeData::MergingParams(), + buildMergeTreeSettings(merge_tree_table.table_configs)); + return custom_storage_merge_tree; + }); + + restoreMetaData(global_storage, merge_tree_table, *context); + } + + // return local table (with a uuid) for isolation auto storage = storage_factory.getStorage( StorageID(merge_tree_table.database, merge_tree_table.table, uuid), merge_tree_table.snapshot_id, @@ -146,7 +172,7 @@ MergeTreeRelParser::parseReadRel( return custom_storage_merge_tree; }); - restoreMetaData(storage, merge_tree_table, context); + restoreMetaData(storage, merge_tree_table, *context); for (const auto & [name, sizes] : storage->getColumnSizes()) column_sizes[name] = sizes.data_compressed; query_context.storage_snapshot = std::make_shared(*storage, metadata); diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp index c8f0b1f323fd..ee481c93f83f 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp @@ -97,9 +97,9 @@ CustomStorageMergeTree::CustomStorageMergeTree( std::atomic CustomStorageMergeTree::part_num; -DataPartsVector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set parts) +MergeTreeData::MutableDataPartsVector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set parts) { - DataPartsVector data_parts; + MutableDataPartsVector data_parts; const auto disk = getStoragePolicy()->getDisks().at(0); for (const auto& name : parts) { diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h index 0d5e14998bb4..dfaba5b71b3b 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h @@ -52,7 +52,7 @@ class CustomStorageMergeTree final : public MergeTreeData std::vector getMutationsStatus() const override; bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override; std::map getUnfinishedMutationCommands() const override; - DataPartsVector loadDataPartsWithNames(std::unordered_set parts); + MutableDataPartsVector loadDataPartsWithNames(std::unordered_set parts); MergeTreeDataWriter writer; diff --git a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp index 7ece1d7ce206..a741744950f9 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp @@ -149,7 +149,10 @@ void MergeSparkMergeTreeTask::prepare() deduplicate_by_columns, cleanup, storage.merging_params, - txn); + txn, + // need_prefix = false, so CH won't create a tmp_ folder while merging. + // the tmp_ folder is problematic when on S3 (particularlly when renaming) + false); } @@ -157,9 +160,10 @@ void MergeSparkMergeTreeTask::finish() { new_part = merge_task->getFuture().get(); - MergeTreeData::Transaction transaction(storage, txn.get()); - storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction); - transaction.commit(); + // Since there is not tmp_ folder, we don't need renaming + // MergeTreeData::Transaction transaction(storage, txn.get()); + // storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction); + // transaction.commit(); ThreadFuzzer::maybeInjectSleep(); ThreadFuzzer::maybeInjectMemoryLimitException(); diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp index a7d167385337..688ca8be859f 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp @@ -43,7 +43,7 @@ std::unordered_map extractPartMetaData(ReadBuffer & in) return result; } -void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, ContextPtr & context) +void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context) { auto data_disk = storage->getStoragePolicy()->getAnyDisk(); if (!data_disk->isRemote()) @@ -60,11 +60,14 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & not_exists_part.emplace(part); } - if (not_exists_part.empty()) - return; - if (auto lock = storage->lockForAlter(context->getSettingsRef().lock_acquire_timeout)) + if (auto lock = storage->lockForAlter(context.getSettingsRef().lock_acquire_timeout)) { + // put this return clause in lockForAlter + // so that it will not return until other thread finishes restoring + if (not_exists_part.empty()) + return; + auto s3 = data_disk->getObjectStorage(); if (!metadata_disk->exists(table_path)) @@ -87,9 +90,35 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & auto item_path = part_path / item.first; auto out = metadata_disk->writeFile(item_path); out->write(item.second.data(), item.second.size()); + out->finalize(); + out->sync(); } } } } + +void saveFileStatus( + const DB::MergeTreeData & storage, + const DB::ContextPtr& context, + IDataPartStorage & data_part_storage) +{ + const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk(); + if (!disk->isRemote()) + return; + if (auto * const disk_metadata = dynamic_cast(disk->getMetadataStorage().get())) + { + const auto out = data_part_storage.writeFile("metadata.gluten", DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings()); + for (const auto it = data_part_storage.iterate(); it->isValid(); it->next()) + { + auto content = disk_metadata->readFileToString(it->path()); + writeString(it->name(), *out); + writeChar('\t', *out); + writeIntText(content.length(), *out); + writeChar('\n', *out); + writeString(content, *out); + } + out->finalize(); + } +} } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h index 47c5d615d757..59d7af4e35e7 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h +++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h @@ -23,7 +23,12 @@ namespace local_engine { -void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, ContextPtr & context); +void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context); + +void saveFileStatus( + const DB::MergeTreeData & storage, + const DB::ContextPtr& context, + IDataPartStorage & data_part_storage); } diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp index 8df171f999dd..40e716a56da4 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp @@ -21,6 +21,7 @@ #include #include #include +#include using namespace DB; @@ -77,32 +78,10 @@ SparkMergeTreeWriter::writeTempPartAndFinalize( { auto temp_part = writeTempPart(block_with_partition, metadata_snapshot); temp_part.finalize(); - saveFileStatus(temp_part); + saveFileStatus(storage, context, temp_part.part->getDataPartStorage()); return temp_part; } -void SparkMergeTreeWriter::saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart & temp_part) const -{ - auto & data_part_storage = temp_part.part->getDataPartStorage(); - - const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk(); - if (!disk->isRemote()) return; - if (auto *const disk_metadata = dynamic_cast(disk->getMetadataStorage().get())) - { - const auto out = data_part_storage.writeFile("metadata.gluten", DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings()); - for (const auto it = data_part_storage.iterate(); it->isValid(); it->next()) - { - auto content = disk_metadata->readFileToString(it->path()); - writeString(it->name(), *out); - writeChar('\t', *out); - writeIntText(content.length(), *out); - writeChar('\n', *out); - writeString(content, *out); - } - out->finalize(); - } -} - MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot) { diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h index d316f208ebf9..000d009fe068 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h @@ -85,7 +85,6 @@ class SparkMergeTreeWriter writeTempPart(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); DB::MergeTreeDataWriter::TemporaryPart writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); - void saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart & temp_part) const; String uuid; String partition_dir; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 368227163117..1518f8518c0e 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #ifdef __cplusplus @@ -1167,13 +1168,16 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn executeHere(task); std::unordered_set to_load{future_part->name}; - std::vector loaded = storage->loadDataPartsWithNames(to_load); + std::vector> loaded = storage->loadDataPartsWithNames(to_load); std::vector res; - for (const MergeTreeDataPartPtr & partPtr : loaded) + for (auto & partPtr : loaded) + { + local_engine::saveFileStatus(*storage, local_engine::SerializedPlanParser::global_context, partPtr->getDataPartStorage()); res.emplace_back( local_engine::PartInfo{partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count, /*partition_value*/ partition_values, bucket_dir}); + } auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);