Skip to content

Commit

Permalink
[Gluten-5152][CH] fix bugs for optimizing tables on s3 (apache#5282)
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
fix multiple bugs for optimizing tables on s3, including:

take care of metadata.gluten
race condition bug when two tasks are performing restoreMetadata on same executor at same time.
avoid creating tmp folder because it's problematic to rename on S3
(Fixes: apache#5152)

How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
  • Loading branch information
binmahone authored Apr 12, 2024
1 parent 5af585d commit a8b3161
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from lineitem_mergetree_optimize").collect()
assert(ret.apply(0).get(0) == 600572)

spark.sql("optimize lineitem_mergetree_optimize")
assert(
countFiles(new File(s"$basePath/lineitem_mergetree_optimize")) == 462
) // many merged parts
Expand Down Expand Up @@ -148,7 +147,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret.apply(0).get(0) == 600572)

spark.sql("set spark.gluten.enabled=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 815)
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 812)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2")) == 232)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
Expand Down Expand Up @@ -179,12 +178,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret.apply(0).get(0) == 600572)

spark.sql("set spark.gluten.enabled=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 411)
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 398)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
// for tables with more than one layer of nested table (like partition + bucket, or two partition col
// the 'tmp_merge' folder is not guarantee to be removed, causing this file number to be unstable
// assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 290)
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) > 270)
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 286)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3")) == 270)
spark.sql("set spark.gluten.enabled=true")
Expand Down Expand Up @@ -213,10 +209,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret.apply(0).get(0) == 600572)

spark.sql("set spark.gluten.enabled=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 411)
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 398)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
// assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 290)
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) > 270)
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 286)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4")) == 270)
spark.sql("set spark.gluten.enabled=true")
Expand Down Expand Up @@ -308,7 +303,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite

spark.sql("set spark.gluten.enabled=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6")) == {
if (sparkVersion.equals("3.2")) 940 else 1023
if (sparkVersion.equals("3.2")) 931 else 1014
})
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
Expand All @@ -321,5 +316,49 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret2.apply(0).get(0) == 600572)
}

test("test skip index after optimize") {
withSQLConf(
"spark.databricks.delta.optimize.maxFileSize" -> "2000000",
"spark.sql.adaptive.enabled" -> "false") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_index;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_index
|USING clickhouse
|LOCATION '$basePath/lineitem_mergetree_index'
|TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey')
| as select * from lineitem
|""".stripMargin)

spark.sql("optimize lineitem_mergetree_index")
spark.sql("set spark.gluten.enabled=false")
spark.sql("vacuum lineitem_mergetree_index")
spark.sql("set spark.gluten.enabled=true")

val df = spark
.sql(s"""
|select count(*) from lineitem_mergetree_index where l_orderkey = '600000'
|""".stripMargin)

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
val mergetreeScan = scanExec(0)
val ret = df.collect()
assert(ret.apply(0).get(0) == 2)
val marks = mergetreeScan.metrics("selectedMarks").value
assert(marks == 1)

val directory = new File(s"$basePath/lineitem_mergetree_index")
val partDir = directory.listFiles().filter(f => f.getName.endsWith("merged")).head
assert(
partDir.listFiles().exists(p => p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx")))

}
}

}
// scalastyle:off line.size.limit
28 changes: 27 additions & 1 deletion cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,32 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
auto storage_factory = StorageMergeTreeFactory::instance();
auto metadata = buildMetaData(names_and_types_list, context, merge_tree_table);

{
// use instance global table (without uuid) to restore metadata folder on current instance
// we need its lock

auto global_storage = storage_factory.getStorage(
StorageID(merge_tree_table.database, merge_tree_table.table),
merge_tree_table.snapshot_id,
metadata->getColumns(),
[&]() -> CustomStorageMergeTreePtr
{
auto custom_storage_merge_tree = std::make_shared<CustomStorageMergeTree>(
StorageID(merge_tree_table.database, merge_tree_table.table),
merge_tree_table.relative_path,
*metadata,
false,
context,
"",
MergeTreeData::MergingParams(),
buildMergeTreeSettings(merge_tree_table.table_configs));
return custom_storage_merge_tree;
});

restoreMetaData(global_storage, merge_tree_table, *context);
}

// return local table (with a uuid) for isolation
auto storage = storage_factory.getStorage(
StorageID(merge_tree_table.database, merge_tree_table.table, uuid),
merge_tree_table.snapshot_id,
Expand Down Expand Up @@ -146,7 +172,7 @@ MergeTreeRelParser::parseReadRel(
return custom_storage_merge_tree;
});

restoreMetaData(storage, merge_tree_table, context);
restoreMetaData(storage, merge_tree_table, *context);
for (const auto & [name, sizes] : storage->getColumnSizes())
column_sizes[name] = sizes.data_compressed;
query_context.storage_snapshot = std::make_shared<StorageSnapshot>(*storage, metadata);
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ CustomStorageMergeTree::CustomStorageMergeTree(

std::atomic<int> CustomStorageMergeTree::part_num;

DataPartsVector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string> parts)
MergeTreeData::MutableDataPartsVector CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string> parts)
{
DataPartsVector data_parts;
MutableDataPartsVector data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
for (const auto& name : parts)
{
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CustomStorageMergeTree final : public MergeTreeData
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;
std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;
DataPartsVector loadDataPartsWithNames(std::unordered_set<std::string> parts);
MutableDataPartsVector loadDataPartsWithNames(std::unordered_set<std::string> parts);


MergeTreeDataWriter writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,21 @@ void MergeSparkMergeTreeTask::prepare()
deduplicate_by_columns,
cleanup,
storage.merging_params,
txn);
txn,
// need_prefix = false, so CH won't create a tmp_ folder while merging.
// the tmp_ folder is problematic when on S3 (particularlly when renaming)
false);
}


void MergeSparkMergeTreeTask::finish()
{
new_part = merge_task->getFuture().get();

MergeTreeData::Transaction transaction(storage, txn.get());
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction);
transaction.commit();
// Since there is not tmp_ folder, we don't need renaming
// MergeTreeData::Transaction transaction(storage, txn.get());
// storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction);
// transaction.commit();

ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
Expand Down
37 changes: 33 additions & 4 deletions cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ std::unordered_map<String, String> extractPartMetaData(ReadBuffer & in)
return result;
}

void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, ContextPtr & context)
void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context)
{
auto data_disk = storage->getStoragePolicy()->getAnyDisk();
if (!data_disk->isRemote())
Expand All @@ -60,11 +60,14 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable &
not_exists_part.emplace(part);
}

if (not_exists_part.empty())
return;

if (auto lock = storage->lockForAlter(context->getSettingsRef().lock_acquire_timeout))
if (auto lock = storage->lockForAlter(context.getSettingsRef().lock_acquire_timeout))
{
// put this return clause in lockForAlter
// so that it will not return until other thread finishes restoring
if (not_exists_part.empty())
return;

auto s3 = data_disk->getObjectStorage();

if (!metadata_disk->exists(table_path))
Expand All @@ -87,9 +90,35 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable &
auto item_path = part_path / item.first;
auto out = metadata_disk->writeFile(item_path);
out->write(item.second.data(), item.second.size());
out->finalize();
out->sync();
}
}
}
}


void saveFileStatus(
const DB::MergeTreeData & storage,
const DB::ContextPtr& context,
IDataPartStorage & data_part_storage)
{
const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk();
if (!disk->isRemote())
return;
if (auto * const disk_metadata = dynamic_cast<MetadataStorageFromDisk *>(disk->getMetadataStorage().get()))
{
const auto out = data_part_storage.writeFile("metadata.gluten", DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
for (const auto it = data_part_storage.iterate(); it->isValid(); it->next())
{
auto content = disk_metadata->readFileToString(it->path());
writeString(it->name(), *out);
writeChar('\t', *out);
writeIntText(content.length(), *out);
writeChar('\n', *out);
writeString(content, *out);
}
out->finalize();
}
}
}
7 changes: 6 additions & 1 deletion cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
namespace local_engine
{

void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, ContextPtr & context);
void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable & mergeTreeTable, const Context & context);

void saveFileStatus(
const DB::MergeTreeData & storage,
const DB::ContextPtr& context,
IDataPartStorage & data_part_storage);

}

25 changes: 2 additions & 23 deletions cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Interpreters/ActionsDAG.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <rapidjson/prettywriter.h>
#include <Storages/Mergetree/MetaDataHelper.h>

using namespace DB;

Expand Down Expand Up @@ -77,32 +78,10 @@ SparkMergeTreeWriter::writeTempPartAndFinalize(
{
auto temp_part = writeTempPart(block_with_partition, metadata_snapshot);
temp_part.finalize();
saveFileStatus(temp_part);
saveFileStatus(storage, context, temp_part.part->getDataPartStorage());
return temp_part;
}

void SparkMergeTreeWriter::saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart & temp_part) const
{
auto & data_part_storage = temp_part.part->getDataPartStorage();

const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk();
if (!disk->isRemote()) return;
if (auto *const disk_metadata = dynamic_cast<MetadataStorageFromDisk *>(disk->getMetadataStorage().get()))
{
const auto out = data_part_storage.writeFile("metadata.gluten", DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
for (const auto it = data_part_storage.iterate(); it->isValid(); it->next())
{
auto content = disk_metadata->readFileToString(it->path());
writeString(it->name(), *out);
writeChar('\t', *out);
writeIntText(content.length(), *out);
writeChar('\n', *out);
writeString(content, *out);
}
out->finalize();
}
}

MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart(
BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class SparkMergeTreeWriter
writeTempPart(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot);
DB::MergeTreeDataWriter::TemporaryPart
writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot);
void saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart & temp_part) const;

String uuid;
String partition_dir;
Expand Down
8 changes: 6 additions & 2 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include <google/protobuf/wrappers.pb.h>
#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/Mergetree/MergeSparkMergeTreeTask.h>
#include <Storages/Mergetree/MetaDataHelper.h>


#ifdef __cplusplus
Expand Down Expand Up @@ -1167,13 +1168,16 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
executeHere(task);

std::unordered_set<std::string> to_load{future_part->name};
std::vector<DataPartPtr> loaded = storage->loadDataPartsWithNames(to_load);
std::vector<std::shared_ptr<DB::IMergeTreeDataPart>> loaded = storage->loadDataPartsWithNames(to_load);
std::vector<local_engine::PartInfo> res;
for (const MergeTreeDataPartPtr & partPtr : loaded)
for (auto & partPtr : loaded)
{
local_engine::saveFileStatus(*storage, local_engine::SerializedPlanParser::global_context, partPtr->getDataPartStorage());
res.emplace_back(
local_engine::PartInfo{partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count,
/*partition_value*/ partition_values,
bucket_dir});
}

auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);

Expand Down

0 comments on commit a8b3161

Please sign in to comment.