Skip to content

Commit

Permalink
version 2
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Jul 16, 2024
1 parent f32127b commit 83061b5
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,39 +1,87 @@
#include "CompactObjectStorageDiskTransaction.h"

#include <format>
#include <ranges>

namespace local_engine
{
int getFileOrder(const std::string & path)
{
if (path.ends_with("columns.txt"))
return 1;
if (path.ends_with("metadata_version.txt"))
return 2;
if (path.ends_with("count.txt"))
return 3;
if (path.ends_with("default_compression_codec.txt"))
return 4;
if (path.ends_with("checksums.txt"))
return 5;
if (path.ends_with("uuid.txt"))
return 6;
if (path.ends_with(".cmrk3") || path.ends_with(".cmrk2") || path.ends_with(".cmrk1") ||
path.ends_with(".mrk3") || path.ends_with(".mrk2") || path.ends_with(".mrk1"))
return 10;
if (path.ends_with("idx"))
return 20;
if (path.ends_with("bin"))
return 1000;
return 100;
}

bool isMetaDataFile(const std::string & path)
{
return !path.ends_with("bin");
}

using FileMappings = std::vector<std::pair<String, std::shared_ptr<DB::TemporaryFileOnDisk>>>;

void CompactObjectStorageDiskTransaction::commit()
{
auto metadata_tx = disk.getMetadataStorage()->createTransaction();
std::filesystem::path path = prefix_path;
path /= "data.bin";
std::filesystem::path data_path = std::filesystem::path(prefix_path) / "data.bin";
std::filesystem::path meta_path = std::filesystem::path(prefix_path) / "meta.bin";

auto object_storage = disk.getObjectStorage();
auto object_key = object_storage->generateObjectKeyForPath(path);
auto data_key = object_storage->generateObjectKeyForPath(data_path);
auto meta_key = object_storage->generateObjectKeyForPath(meta_path);

disk.createDirectories(prefix_path);
auto write_buffer = object_storage->writeObject(DB::StoredObject(object_key.serialize(), path), DB::WriteMode::Rewrite);
auto data_write_buffer = object_storage->writeObject(DB::StoredObject(data_key.serialize(), data_path), DB::WriteMode::Rewrite);
auto meta_write_buffer = object_storage->writeObject(DB::StoredObject(meta_key.serialize(), meta_path), DB::WriteMode::Rewrite);
String buffer;
buffer.resize(1024*1024);
size_t offset = 0;
buffer.resize(1024 * 1024);

for (const auto & item : files)
auto merge_files = [&](std::ranges::input_range auto && list, DB::WriteBuffer & out, const DB::ObjectStorageKey & key , const String &local_path)
{
DB::DiskObjectStorageMetadata metadata(object_storage->getCommonKeyPrefix(), item.first);
DB::ReadBufferFromFilePRead read(item.second->getAbsolutePath());
int file_size = 0;
while (int count = read.readBig(buffer.data(), buffer.size()))
{
file_size += count;
write_buffer->write(buffer.data(), count);
}
metadata.addObject(object_key, offset, file_size);
metadata_tx->writeStringToFile(item.first, metadata.serializeToString());
//
offset += file_size;
}
write_buffer->sync();
size_t offset = 0;
std::ranges::for_each(
list,
[&](auto & item)
{
DB::DiskObjectStorageMetadata metadata(object_storage->getCommonKeyPrefix(), item.first);
DB::ReadBufferFromFilePRead read(item.second->getAbsolutePath());
int file_size = 0;
while (int count = read.readBig(buffer.data(), buffer.size()))
{
file_size += count;
out.write(buffer.data(), count);
}
metadata.addObject(key, offset, file_size);
metadata_tx->writeStringToFile(item.first, metadata.serializeToString());
offset += file_size;
});

// You can load the complete file in advance through this metadata original, which improves the download efficiency of mergetree metadata.
DB::DiskObjectStorageMetadata whole_meta(object_storage->getCommonKeyPrefix(), local_path);
whole_meta.addObject(key, 0, offset);
metadata_tx->writeStringToFile(local_path, whole_meta.serializeToString());
out.sync();
};

merge_files(files | std::ranges::views::filter([](auto file) { return !isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path);
merge_files(files | std::ranges::views::filter([](auto file) { return isMetaDataFile(file.first); }), *meta_write_buffer, meta_key, meta_path);

metadata_tx->commit();
files.clear();
}
Expand All @@ -42,7 +90,7 @@ std::unique_ptr<DB::WriteBufferFromFileBase> CompactObjectStorageDiskTransaction
const std::string & path,
size_t buf_size,
DB::WriteMode mode,
const DB::WriteSettings & ,
const DB::WriteSettings &,
bool)
{
if (mode != DB::WriteMode::Rewrite)
Expand All @@ -51,11 +99,18 @@ std::unique_ptr<DB::WriteBufferFromFileBase> CompactObjectStorageDiskTransaction
}
if (prefix_path.empty())
prefix_path = path.substr(0, path.find_last_of('/'));
else
if (!path.starts_with(prefix_path))
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Don't support write file in different dirs, path {}, prefix path: {}", path, prefix_path);
else if (!path.starts_with(prefix_path))
throw DB::Exception(
DB::ErrorCodes::NOT_IMPLEMENTED,
"Don't support write file in different dirs, path {}, prefix path: {}",
path,
prefix_path);
auto tmp = std::make_shared<DB::TemporaryFileOnDisk>(tmp_data);
files.emplace_back(path, tmp);
auto tx = disk.getMetadataStorage()->createTransaction();
tx->createDirectoryRecursive(std::filesystem::path(path).parent_path());
tx->createEmptyMetadataFile(path);
tx->commit();
return std::make_unique<DB::WriteBufferFromFile>(tmp->getAbsolutePath(), buf_size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ extern const int NOT_IMPLEMENTED;

namespace local_engine
{

class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {
public:
explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const DB::DiskPtr tmp_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace local_engine
{
using namespace DB;

DB::DiskTransactionPtr GlutenDiskHDFS::createTransaction()
DiskTransactionPtr GlutenDiskHDFS::createTransaction()
{
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
}
Expand Down
25 changes: 22 additions & 3 deletions cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,26 @@ CustomStorageMergeTree::CustomStorageMergeTree(

std::atomic<int> CustomStorageMergeTree::part_num;



void CustomStorageMergeTree::prefectchMetaDataFile(std::unordered_set<std::string> parts)
{
auto disk = getDisks().front();
if (!disk->isRemote()) return;
std::vector<String> meta_paths;
std::ranges::for_each(parts, [&](const String & name) { meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); });
for (const auto & meta_path: meta_paths)
{
if (!disk->exists(meta_path)) continue;
auto in = disk->readFile(meta_path);
String ignore_data;
readStringUntilEOF(ignore_data, *in);
}
}

std::vector<MergeTreeDataPartPtr> CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string> parts)
{
auto parts_lock = lockParts();
prefectchMetaDataFile(parts);
std::vector<MergeTreeDataPartPtr> data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
for (const auto& name : parts)
Expand All @@ -161,8 +178,6 @@ std::vector<MergeTreeDataPartPtr> CustomStorageMergeTree::loadDataPartsWithNames
data_parts.emplace_back(res.part);
}

// without it "test mergetree optimize partitioned by one low card column" will log ERROR
calculateColumnAndSecondaryIndexSizesImpl();
return data_parts;
}

Expand Down Expand Up @@ -211,6 +226,7 @@ MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart(
res.part->loadVersionMetadata();

res.part->setState(to_state);
auto parts_lock = lockParts();

DataPartIteratorByInfo it;
bool inserted;
Expand Down Expand Up @@ -239,6 +255,9 @@ MergeTreeData::LoadPartResult CustomStorageMergeTree::loadDataPart(
if (res.part->hasLightweightDelete())
has_lightweight_delete_parts.store(true);

// without it "test mergetree optimize partitioned by one low card column" will log ERROR
calculateColumnAndSecondaryIndexSizesImpl();

LOG_TRACE(log, "Finished loading {} part {} on disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName());
return res;
}
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class CustomStorageMergeTree final : public MergeTreeData
private:
SimpleIncrement increment;

void prefectchMetaDataFile(std::unordered_set<std::string> parts);
void startBackgroundMovesIfNeeded() override;
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
LoadPartResult loadDataPart(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void MergeSparkMergeTreeTask::finish()
// MergeTreeData::Transaction transaction(storage, txn.get());
// storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction);
// transaction.commit();

new_part->getDataPartStoragePtr()->commitTransaction();
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ void SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
auto src_disk = storage->getStoragePolicy()->getAnyDisk();
auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
auto tx = dest_disk->createTransaction();
std::sort(files.begin(), files.end());
for (const auto & file : files)
{
auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings);
Expand Down

0 comments on commit 83061b5

Please sign in to comment.