Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH] Support merge MergeTree files #6472

Merged
merged 8 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
it.next()
files += 1
}
assertResult(72)(files)
assertResult(4)(files)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240711
CH_COMMIT=4ab4aa7fe04
CH_COMMIT=6632e76fd32940940749b53335ccc4843f3f2638

Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "CompactObjectStorageDiskTransaction.h"

#include <format>
#include <ranges>

namespace local_engine
{
int getFileOrder(const std::string & path)
{
if (path.ends_with("columns.txt"))
return 1;
if (path.ends_with("metadata_version.txt"))
return 2;
if (path.ends_with("count.txt"))
return 3;
if (path.ends_with("default_compression_codec.txt"))
return 4;
if (path.ends_with("checksums.txt"))
return 5;
if (path.ends_with("uuid.txt"))
return 6;
if (path.ends_with(".cmrk3") || path.ends_with(".cmrk2") || path.ends_with(".cmrk1") ||
path.ends_with(".mrk3") || path.ends_with(".mrk2") || path.ends_with(".mrk1"))
return 10;
if (path.ends_with("idx"))
return 20;
if (path.ends_with("bin"))
return 1000;
return 100;
}

bool isMetaDataFile(const std::string & path)
{
return !path.ends_with("bin");
}

using FileMappings = std::vector<std::pair<String, std::shared_ptr<DB::TemporaryFileOnDisk>>>;

void CompactObjectStorageDiskTransaction::commit()
{
auto metadata_tx = disk.getMetadataStorage()->createTransaction();
std::filesystem::path data_path = std::filesystem::path(prefix_path) / "data.bin";
std::filesystem::path meta_path = std::filesystem::path(prefix_path) / "meta.bin";

auto object_storage = disk.getObjectStorage();
auto data_key = object_storage->generateObjectKeyForPath(data_path);
auto meta_key = object_storage->generateObjectKeyForPath(meta_path);

disk.createDirectories(prefix_path);
auto data_write_buffer = object_storage->writeObject(DB::StoredObject(data_key.serialize(), data_path), DB::WriteMode::Rewrite);
auto meta_write_buffer = object_storage->writeObject(DB::StoredObject(meta_key.serialize(), meta_path), DB::WriteMode::Rewrite);
String buffer;
buffer.resize(1024 * 1024);

auto merge_files = [&](std::ranges::input_range auto && list, DB::WriteBuffer & out, const DB::ObjectStorageKey & key , const String &local_path)
{
size_t offset = 0;
std::ranges::for_each(
list,
[&](auto & item)
{
DB::DiskObjectStorageMetadata metadata(object_storage->getCommonKeyPrefix(), item.first);
DB::ReadBufferFromFilePRead read(item.second->getAbsolutePath());
int file_size = 0;
while (int count = read.readBig(buffer.data(), buffer.size()))
{
file_size += count;
out.write(buffer.data(), count);
}
metadata.addObject(key, offset, file_size);
metadata_tx->writeStringToFile(item.first, metadata.serializeToString());
offset += file_size;
});

// You can load the complete file in advance through this metadata original, which improves the download efficiency of mergetree metadata.
DB::DiskObjectStorageMetadata whole_meta(object_storage->getCommonKeyPrefix(), local_path);
whole_meta.addObject(key, 0, offset);
metadata_tx->writeStringToFile(local_path, whole_meta.serializeToString());
out.sync();
};

merge_files(files | std::ranges::views::filter([](auto file) { return !isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path);
merge_files(files | std::ranges::views::filter([](auto file) { return isMetaDataFile(file.first); }), *meta_write_buffer, meta_key, meta_path);

metadata_tx->commit();
files.clear();
}

std::unique_ptr<DB::WriteBufferFromFileBase> CompactObjectStorageDiskTransaction::writeFile(
const std::string & path,
size_t buf_size,
DB::WriteMode mode,
const DB::WriteSettings &,
bool)
{
if (mode != DB::WriteMode::Rewrite)
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `writeFile` with Append is not implemented");
}
if (prefix_path.empty())
prefix_path = path.substr(0, path.find_last_of('/'));
else if (!path.starts_with(prefix_path))
throw DB::Exception(
DB::ErrorCodes::NOT_IMPLEMENTED,
"Don't support write file in different dirs, path {}, prefix path: {}",
path,
prefix_path);
auto tmp = std::make_shared<DB::TemporaryFileOnDisk>(tmp_data);
files.emplace_back(path, tmp);
auto tx = disk.getMetadataStorage()->createTransaction();
tx->createDirectoryRecursive(std::filesystem::path(path).parent_path());
tx->createEmptyMetadataFile(path);
tx->commit();
return std::make_unique<DB::WriteBufferFromFile>(tmp->getAbsolutePath(), buf_size);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Disks/IDiskTransaction.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Interpreters/TemporaryDataOnDisk.h>


namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
}

namespace local_engine
{

class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {
public:
explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const DB::DiskPtr tmp_)
: disk(disk_), tmp_data(tmp_)
{
chassert(!tmp_->isRemote());
}

void commit() override;

void undo() override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `undo` is not implemented");
}

void createDirectory(const std::string & path) override
{
disk.createDirectory(path);
}

void createDirectories(const std::string & path) override
{
disk.createDirectories(path);
}

void createFile(const std::string & path) override
{
disk.createFile(path);
}

void clearDirectory(const std::string & path) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `clearDirectory` is not implemented");
}

void moveDirectory(const std::string & from_path, const std::string & to_path) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `moveDirectory` is not implemented");
}

void moveFile(const String & from_path, const String & to_path) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `moveFile` is not implemented");
}

void replaceFile(const std::string & from_path, const std::string & to_path) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `replaceFile` is not implemented");
}

void copyFile(const std::string & from_file_path, const std::string & to_file_path, const DB::ReadSettings & read_settings, const DB::WriteSettings & write_settings) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `copyFile` is not implemented");
}

std::unique_ptr<DB::WriteBufferFromFileBase> writeFile( /// NOLINT
const std::string & path,
size_t buf_size,
DB::WriteMode mode,
const DB::WriteSettings & settings,
bool /*autocommit */) override;


void writeFileUsingBlobWritingFunction(const String & path, DB::WriteMode mode, WriteBlobFunction && write_blob_function) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `writeFileUsingBlobWritingFunction` is not implemented");
}

void removeFile(const std::string & path) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeFile` is not implemented");
}

void removeFileIfExists(const std::string & path) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeFileIfExists` is not implemented");
}

void removeDirectory(const std::string & path) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeDirectory` is not implemented");
}

void removeRecursive(const std::string & path) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeRecursive` is not implemented");
}

void removeSharedFile(const std::string & path, bool keep_shared_data) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeSharedFile` is not implemented");
}

void removeSharedRecursive(const std::string & path, bool keep_all_shared_data, const DB::NameSet & file_names_remove_metadata_only) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeSharedRecursive` is not implemented");
}

void removeSharedFileIfExists(const std::string & path, bool keep_shared_data) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeSharedFileIfExists` is not implemented");
}

void removeSharedFiles(const DB::RemoveBatchRequest & files, bool keep_all_batch_data, const DB::NameSet & file_names_remove_metadata_only) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `removeSharedFiles` is not implemented");
}

void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override
{
disk.setLastModified(path, timestamp);
}

void chmod(const String & path, mode_t mode) override
{
disk.chmod(path, mode);
}

void setReadOnly(const std::string & path) override
{
disk.setReadOnly(path);
}

void createHardLink(const std::string & src_path, const std::string & dst_path) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `createHardLink` is not implemented");
}

void truncateFile(const std::string & /* src_path */, size_t /* target_size */) override
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `truncateFile` is not implemented");
}

private:
DB::IDisk & disk;
DB::DiskPtr tmp_data;
std::vector<std::pair<String, std::shared_ptr<DB::TemporaryFileOnDisk>>> files;
String prefix_path = "";
};
}

7 changes: 7 additions & 0 deletions cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@

#include <Common/Throttler.h>
#include <Parser/SerializedPlanParser.h>

#include "CompactObjectStorageDiskTransaction.h"
#if USE_HDFS

namespace local_engine
{
using namespace DB;

DiskTransactionPtr GlutenDiskHDFS::createTransaction()
{
return std::make_shared<CompactObjectStorageDiskTransaction>(*this, SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
}

void GlutenDiskHDFS::createDirectory(const String & path)
{
DiskObjectStorage::createDirectory(path);
Expand Down
16 changes: 15 additions & 1 deletion cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <Common/Throttler.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#if USE_HDFS
#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
#endif
Expand Down Expand Up @@ -51,6 +53,8 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage
throttler = std::make_shared<DB::Throttler>(max_speed);
}

DB::DiskTransactionPtr createTransaction() override;

void createDirectory(const String & path) override;

void createDirectories(const String & path) override;
Expand All @@ -72,7 +76,17 @@ class GlutenDiskHDFS : public DB::DiskObjectStorage
{
DB::ObjectStoragePtr tmp = object_storage_creator(config, context);
hdfs_object_storage = typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(tmp);
object_storage = hdfs_object_storage;
// only for java ut
bool is_cache = object_storage->supportsCache();
if (is_cache)
{
auto cache_os = reinterpret_cast<DB::CachedObjectStorage*>(object_storage.get());
object_storage = hdfs_object_storage;
auto cache = DB::FileCacheFactory::instance().getOrCreate(cache_os->getCacheName(), cache_os->getCacheSettings(), "storage_configuration.disks.hdfs_cache");
wrapWithCache(cache, cache_os->getCacheSettings(), cache_os->getCacheConfigName());
}
else
object_storage = hdfs_object_storage;
}
private:
std::shared_ptr<GlutenHDFSObjectStorage> hdfs_object_storage;
Expand Down
Loading
Loading