diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala index e553a8c1707c..9b4b552f08e7 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala @@ -74,7 +74,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite val ret = spark.sql("select count(*) from lineitem_mergetree_optimize").collect() assert(ret.apply(0).get(0) == 600572) - spark.sql("optimize lineitem_mergetree_optimize") assert( countFiles(new File(s"$basePath/lineitem_mergetree_optimize")) == 462 ) // many merged parts @@ -148,7 +147,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(ret.apply(0).get(0) == 600572) spark.sql("set spark.gluten.enabled=false") - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 815) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 812) spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 232) spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS") @@ -179,12 +178,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(ret.apply(0).get(0) == 600572) spark.sql("set spark.gluten.enabled=false") - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 411) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 398) spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS") - // for tables with more than one layer of nested table (like partition + bucket, or two partition col - // the 'tmp_merge' folder is not guarantee to be removed, causing this file number to be unstable -// assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 290) - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) > 270) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 286) spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 270) spark.sql("set spark.gluten.enabled=true") @@ -213,10 +209,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(ret.apply(0).get(0) == 600572) spark.sql("set spark.gluten.enabled=false") - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 411) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 398) spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS") -// assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 290) - assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) > 270) + assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 286) spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 270) spark.sql("set spark.gluten.enabled=true") @@ -308,7 +303,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite spark.sql("set spark.gluten.enabled=false") assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == { - if (sparkVersion.equals("3.2")) 940 else 1023 + if (sparkVersion.equals("3.2")) 931 else 1014 }) spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS") spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS") @@ -321,5 +316,49 @@ class GlutenClickHouseMergeTreeOptimizeSuite assert(ret2.apply(0).get(0) == 600572) } + test("test skip index after optimize") { + withSQLConf( + "spark.databricks.delta.optimize.maxFileSize" -> "2000000", + "spark.sql.adaptive.enabled" -> "false") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_index; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_index + |USING clickhouse + |LOCATION '$basePath/lineitem_mergetree_index' + |TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey') + | as select * from lineitem + |""".stripMargin) + + spark.sql("optimize lineitem_mergetree_index") + spark.sql("set spark.gluten.enabled=false") + spark.sql("vacuum lineitem_mergetree_index") + spark.sql("set spark.gluten.enabled=true") + + val df = spark + .sql(s""" + |select count(*) from lineitem_mergetree_index where l_orderkey = '600000' + |""".stripMargin) + + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assert(scanExec.size == 1) + val mergetreeScan = scanExec(0) + val ret = df.collect() + assert(ret.apply(0).get(0) == 2) + val marks = mergetreeScan.metrics("selectedMarks").value + assert(marks == 1) + + val directory = new File(s"$basePath/lineitem_mergetree_index") + val partDir = directory.listFiles().filter(f => f.getName.endsWith("merged")).head + assert( + partDir.listFiles().exists(p => p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx"))) + + } + } + } // scalastyle:off line.size.limit diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index eec239c73241..57ad97fae96f 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -80,6 +80,32 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage( auto storage_factory = StorageMergeTreeFactory::instance(); auto metadata = buildMetaData(names_and_types_list, context, merge_tree_table); + { + // use instance global table (without uuid) to restore metadata folder on current instance + // we need its lock + + auto global_storage = storage_factory.getStorage( + StorageID(merge_tree_table.database, merge_tree_table.table), + merge_tree_table.snapshot_id, + metadata->getColumns(), + [&]() -> CustomStorageMergeTreePtr + { + auto custom_storage_merge_tree = std::make_shared( + StorageID(merge_tree_table.database, merge_tree_table.table), + merge_tree_table.relative_path, + *metadata, + false, + context, + "", + MergeTreeData::MergingParams(), + buildMergeTreeSettings(merge_tree_table.table_configs)); + return custom_storage_merge_tree; + }); + + restoreMetaData(global_storage, merge_tree_table, *context); + } + + // return local table (with a uuid) for isolation auto storage = storage_factory.getStorage( StorageID(merge_tree_table.database, merge_tree_table.table, uuid), merge_tree_table.snapshot_id, @@ -146,7 +172,7 @@ MergeTreeRelParser::parseReadRel( return custom_storage_merge_tree; }); - restoreMetaData(storage, merge_tree_table, context); + restoreMetaData(storage, merge_tree_table, *context); for (const auto & [name, sizes] : storage->getColumnSizes()) column_sizes[name] = sizes.data_compressed; query_context.storage_snapshot = std::make_shared(*storage, metadata); diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp index c8f0b1f323fd..ee481c93f83f 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp @@ -97,9 +97,9 @@ CustomStorageMergeTree::CustomStorageMergeTree( std::atomic CustomStorageMergeTree::part_num; -DataPartsVector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set parts) +MergeTreeData::MutableDataPartsVector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set parts) { - DataPartsVector data_parts; + MutableDataPartsVector data_parts; const auto disk = getStoragePolicy()->getDisks().at(0); for (const auto& name : parts) { diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h index 0d5e14998bb4..dfaba5b71b3b 100644 --- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h +++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h @@ -52,7 +52,7 @@ class CustomStorageMergeTree final : public MergeTreeData std::vector getMutationsStatus() const override; bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override; std::map getUnfinishedMutationCommands() const override; - DataPartsVector loadDataPartsWithNames(std::unordered_set parts); + MutableDataPartsVector loadDataPartsWithNames(std::unordered_set parts); MergeTreeDataWriter writer; diff --git a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp index 7ece1d7ce206..a741744950f9 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp @@ -149,7 +149,10 @@ void MergeSparkMergeTreeTask::prepare() deduplicate_by_columns, cleanup, storage.merging_params, - txn); + txn, + // need_prefix = false, so CH won't create a tmp_ folder while merging. + // the tmp_ folder is problematic when on S3 (particularlly when renaming) + false); } @@ -157,9 +160,10 @@ void MergeSparkMergeTreeTask::finish() { new_part = merge_task->getFuture().get(); - MergeTreeData::Transaction transaction(storage, txn.get()); - storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction); - transaction.commit(); + // Since there is not tmp_ folder, we don't need renaming + // MergeTreeData::Transaction transaction(storage, txn.get()); + // storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction); + // transaction.commit(); ThreadFuzzer::maybeInjectSleep(); ThreadFuzzer::maybeInjectMemoryLimitException(); diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp index a7d167385337..688ca8be859f 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp @@ -43,7 +43,7 @@ std::unordered_map extractPartMetaData(ReadBuffer & in) return result; } -void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, ContextPtr & context) +void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context) { auto data_disk = storage->getStoragePolicy()->getAnyDisk(); if (!data_disk->isRemote()) @@ -60,11 +60,14 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & not_exists_part.emplace(part); } - if (not_exists_part.empty()) - return; - if (auto lock = storage->lockForAlter(context->getSettingsRef().lock_acquire_timeout)) + if (auto lock = storage->lockForAlter(context.getSettingsRef().lock_acquire_timeout)) { + // put this return clause in lockForAlter + // so that it will not return until other thread finishes restoring + if (not_exists_part.empty()) + return; + auto s3 = data_disk->getObjectStorage(); if (!metadata_disk->exists(table_path)) @@ -87,9 +90,35 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & auto item_path = part_path / item.first; auto out = metadata_disk->writeFile(item_path); out->write(item.second.data(), item.second.size()); + out->finalize(); + out->sync(); } } } } + +void saveFileStatus( + const DB::MergeTreeData & storage, + const DB::ContextPtr& context, + IDataPartStorage & data_part_storage) +{ + const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk(); + if (!disk->isRemote()) + return; + if (auto * const disk_metadata = dynamic_cast(disk->getMetadataStorage().get())) + { + const auto out = data_part_storage.writeFile("metadata.gluten", DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings()); + for (const auto it = data_part_storage.iterate(); it->isValid(); it->next()) + { + auto content = disk_metadata->readFileToString(it->path()); + writeString(it->name(), *out); + writeChar('\t', *out); + writeIntText(content.length(), *out); + writeChar('\n', *out); + writeString(content, *out); + } + out->finalize(); + } +} } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h index 47c5d615d757..59d7af4e35e7 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h +++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h @@ -23,7 +23,12 @@ namespace local_engine { -void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, ContextPtr & context); +void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context); + +void saveFileStatus( + const DB::MergeTreeData & storage, + const DB::ContextPtr& context, + IDataPartStorage & data_part_storage); } diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp index 8df171f999dd..40e716a56da4 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp @@ -21,6 +21,7 @@ #include #include #include +#include using namespace DB; @@ -77,32 +78,10 @@ SparkMergeTreeWriter::writeTempPartAndFinalize( { auto temp_part = writeTempPart(block_with_partition, metadata_snapshot); temp_part.finalize(); - saveFileStatus(temp_part); + saveFileStatus(storage, context, temp_part.part->getDataPartStorage()); return temp_part; } -void SparkMergeTreeWriter::saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart & temp_part) const -{ - auto & data_part_storage = temp_part.part->getDataPartStorage(); - - const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk(); - if (!disk->isRemote()) return; - if (auto *const disk_metadata = dynamic_cast(disk->getMetadataStorage().get())) - { - const auto out = data_part_storage.writeFile("metadata.gluten", DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings()); - for (const auto it = data_part_storage.iterate(); it->isValid(); it->next()) - { - auto content = disk_metadata->readFileToString(it->path()); - writeString(it->name(), *out); - writeChar('\t', *out); - writeIntText(content.length(), *out); - writeChar('\n', *out); - writeString(content, *out); - } - out->finalize(); - } -} - MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart( BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot) { diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h index d316f208ebf9..000d009fe068 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h @@ -85,7 +85,6 @@ class SparkMergeTreeWriter writeTempPart(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); DB::MergeTreeDataWriter::TemporaryPart writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); - void saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart & temp_part) const; String uuid; String partition_dir; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 368227163117..1518f8518c0e 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #ifdef __cplusplus @@ -1167,13 +1168,16 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn executeHere(task); std::unordered_set to_load{future_part->name}; - std::vector loaded = storage->loadDataPartsWithNames(to_load); + std::vector> loaded = storage->loadDataPartsWithNames(to_load); std::vector res; - for (const MergeTreeDataPartPtr & partPtr : loaded) + for (auto & partPtr : loaded) + { + local_engine::saveFileStatus(*storage, local_engine::SerializedPlanParser::global_context, partPtr->getDataPartStorage()); res.emplace_back( local_engine::PartInfo{partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count, /*partition_value*/ partition_values, bucket_dir}); + } auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);