Skip to content

Commit

Permalink
add more log and ut
Browse files Browse the repository at this point in the history
  • Loading branch information
loneylee committed May 13, 2024
1 parent 2201ca9 commit 5bc3278
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -633,5 +633,31 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
.count()
assert(result == 600572)
}

test("test mergetree insert with optimize basic") {
val table_name = "lineitem_mergetree_insert_optimize_basic_s3"
val dataPath = s"s3a://$BUCKET_NAME/$table_name"

withSQLConf(
("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true")
) {
spark.sql(s"""
|DROP TABLE IF EXISTS $table_name;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS $table_name
|USING clickhouse
|LOCATION '$dataPath'
| as select * from lineitem
|""".stripMargin)

val ret = spark.sql(s"select count(*) from $table_name").collect()
assert(ret.apply(0).get(0) == 600572)
assert(
!new File(s"$CH_DEFAULT_STORAGE_DIR/lineitem_mergetree_insert_optimize_basic").exists())
}
}
}
// scalastyle:off line.size.limit
6 changes: 5 additions & 1 deletion cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,11 @@ DB::Context::ConfigurationPtr BackendInitializerUtil::initConfig(std::map<std::s
if (key.starts_with(CH_RUNTIME_CONFIG_PREFIX) && key != CH_RUNTIME_CONFIG_FILE)
{
// Apply spark.gluten.sql.columnar.backend.ch.runtime_config.* to config
config->setString(key.substr(CH_RUNTIME_CONFIG_PREFIX.size()), value);
const auto name = key.substr(CH_RUNTIME_CONFIG_PREFIX.size());
if ((name == "storage_configuration.disks.s3.metadata_path" || name == "path") && !value.ends_with("/"))
config->setString(name, value + "/");
else
config->setString(name, value);
}
}
return config;
Expand Down
118 changes: 80 additions & 38 deletions cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(

if (dest_storage->getStoragePolicy()->getAnyDisk()->isRemote())
{
isRemoteStorage = true;
temp_storage = MergeTreeRelParser::copyToDefaultPolicyStorage(merge_tree_table, SerializedPlanParser::global_context);
storage = temp_storage;
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Create temp table {} for local merge.",
temp_storage->getStorageID().getFullNameNotQuoted());
}
else
storage = dest_storage;
Expand Down Expand Up @@ -140,10 +145,10 @@ void SparkMergeTreeWriter::manualFreeMemory(size_t before_write_memory)
// it may alloc memory in current thread, and free on global thread.
// Now, wo have not idea to clear global memory by used spark thread tracker.
// So we manually correct the memory usage.
auto disk = storage->getStoragePolicy()->getAnyDisk();
if (!disk->isRemote())
if (!isRemoteStorage)
return;

auto disk = storage->getStoragePolicy()->getAnyDisk();
std::lock_guard lock(memory_mutex);
auto * memory_tracker = CurrentThread::getMemoryTracker();
if (memory_tracker && CurrentMemoryTracker::before_free)
Expand Down Expand Up @@ -188,38 +193,72 @@ void SparkMergeTreeWriter::finalize()
}
}

SCOPE_EXIT({
if (temp_storage)
if (merge_after_insert)
{
finalizeMerge();
}

commitPartToRemoteStorage();
saveMetadata();
}

void SparkMergeTreeWriter::saveMetadata()
{
if (!isRemoteStorage)
return;

for (const auto merge_tree_data_part : new_parts.unsafeGet())
{
auto part = dest_storage->loadDataPartsWithNames({merge_tree_data_part->name});
if (part.empty())
{
auto read_settings = context->getReadSettings();
auto write_settings = context->getWriteSettings();
for (auto merge_tree_data_part : new_parts.unsafeGet())
{
String local_relative_path = storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;
String remote_relative_path = dest_storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;

storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
local_relative_path,
dest_storage->getStoragePolicy()->getAnyDisk(),
remote_relative_path,
read_settings,
write_settings,
nullptr);
}
temp_storage->dropAllData();
StorageMergeTreeFactory::freeStorage(temp_storage->getStorageID());
LOG_WARNING(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Save metadata failed because dest storage load part name {} empty.",
merge_tree_data_part->name);
continue;
}

for (auto merge_tree_data_part : new_parts.unsafeGet())
{
auto part = dest_storage->loadDataPartsWithNames({merge_tree_data_part->name});
saveFileStatus(
saveFileStatus(
*dest_storage, context, merge_tree_data_part->name, const_cast<IDataPartStorage &>(part.at(0)->getDataPartStorage()));
}
});
if (!merge_after_insert)
}
}

void SparkMergeTreeWriter::commitPartToRemoteStorage()
{
if (!isRemoteStorage)
return;

auto read_settings = context->getReadSettings();
auto write_settings = context->getWriteSettings();
Stopwatch watch;
for (const auto & merge_tree_data_part : new_parts.unsafeGet())
{
String local_relative_path = storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;
String remote_relative_path = dest_storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;

storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
local_relative_path,
dest_storage->getStoragePolicy()->getAnyDisk(),
remote_relative_path,
read_settings,
write_settings,
nullptr);
}
watch.stop();
LOG_INFO(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Upload to disk {} success, total elapsed {} ms",
storage->getStoragePolicy()->getAnyDisk()->getName(),
watch.elapsedMilliseconds());
StorageMergeTreeFactory::freeStorage(temp_storage->getStorageID());
temp_storage->dropAllData();
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"), "Clean temp table {} success.", temp_storage->getStorageID().getFullNameNotQuoted());
}

void SparkMergeTreeWriter::finalizeMerge()
{
// wait all merge task end and do final merge
thread_pool.wait();

Expand All @@ -232,7 +271,7 @@ void SparkMergeTreeWriter::finalize()
} while (before_merge_size != new_parts.size());

std::unordered_set<String> final_parts;
for (auto merge_tree_data_part : new_parts.unsafeGet())
for (const auto & merge_tree_data_part : new_parts.unsafeGet())
final_parts.emplace(merge_tree_data_part->name);

if (!temp_storage)
Expand All @@ -245,7 +284,7 @@ void SparkMergeTreeWriter::finalize()
GlobalThreadPool::instance().scheduleOrThrow(
[&]() -> void
{
for (auto disk : storage->getDisks())
for (const auto & disk : storage->getDisks())
{
auto full_path = storage->getFullPathOnDisk(disk);
disk->removeRecursive(full_path + "/" + tmp_part);
Expand Down Expand Up @@ -402,7 +441,7 @@ std::vector<PartInfo> SparkMergeTreeWriter::getAllPartInfo()
std::vector<PartInfo> res;
res.reserve(new_parts.size());

for (const auto part : new_parts.unsafeGet())
for (const auto & part : new_parts.unsafeGet())
{
res.emplace_back(
PartInfo{part->name, part->getMarksCount(), part->getBytesOnDisk(), part->rows_count, partition_values, bucket_dir});
Expand Down Expand Up @@ -451,12 +490,13 @@ void SparkMergeTreeWriter::checkAndMerge(bool force)

auto doMergeTask = [this](const std::vector<MergeTreeDataPartPtr> & prepare_merge_parts)
{
for (auto selected_part : prepare_merge_parts)
for (const auto & selected_part : prepare_merge_parts)
tmp_parts.emplace(selected_part->name);

thread_pool.scheduleOrThrow(
[this, prepare_merge_parts, thread_group = CurrentThread::getGroup()]() -> void
{
Stopwatch watch;
setThreadName("InsertWithMerge");
ThreadStatus thread_status;
thread_status.attachToGroup(thread_group);
Expand All @@ -467,20 +507,22 @@ void SparkMergeTreeWriter::checkAndMerge(bool force)
before_size += prepare_merge_part->getBytesOnDisk();

std::unordered_map<String, String> partition_values;
auto merged_parts = mergeParts(
const auto merged_parts = mergeParts(
prepare_merge_parts, partition_values, toString(UUIDHelpers::generateV4()), storage, partition_dir, bucket_dir);
for (const auto & merge_tree_data_part : merged_parts)
after_size += merge_tree_data_part->getBytesOnDisk();

LOG_DEBUG(
new_parts.emplace_back(merged_parts);
watch.stop();
LOG_INFO(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Mergetree merge on insert finished, before merge part size {}, part count {}, after part size {}, part count {}.",
"Merge success. Before merge part size {}, part count {}, after part size {}, part count {}, "
"total elapsed {} ms",
before_size,
prepare_merge_parts.size(),
after_size,
merged_parts.size());

new_parts.emplace_back(merged_parts);
merged_parts.size(),
watch.elapsedMilliseconds());
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class SparkMergeTreeWriter
void safeEmplaceBackPart(DB::MergeTreeDataPartPtr);
void safeAddPart(DB::MergeTreeDataPartPtr);
void manualFreeMemory(size_t before_write_memory);
void saveMetadata();
void commitPartToRemoteStorage();
void finalizeMerge();

CustomStorageMergeTreePtr storage = nullptr;
CustomStorageMergeTreePtr dest_storage = nullptr;
Expand All @@ -94,7 +97,7 @@ class SparkMergeTreeWriter
size_t merge_min_size = 1024 * 1024 * 1024;
size_t merge_limit_parts = 10;
std::mutex memory_mutex;
bool isRemoteDisk = false;
bool isRemoteStorage = false;
};

}

0 comments on commit 5bc3278

Please sign in to comment.