Skip to content

Commit

Permalink
compatible for s3
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Jul 16, 2024
1 parent 83061b5 commit 616c5b0
Showing 1 changed file with 30 additions and 11 deletions.
41 changes: 30 additions & 11 deletions cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,23 +232,42 @@ void SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
auto read_settings = context->getReadSettings();
auto write_settings = context->getWriteSettings();
Stopwatch watch;

// Temporary support for S3
bool s3_disk = dest_storage->getStoragePolicy()->getAnyDisk()->getName().contains("s3");
for (const auto & merge_tree_data_part : new_parts.unsafeGet())
{
String local_relative_path = storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;
String remote_relative_path = dest_storage->getRelativeDataPath() + "/" + merge_tree_data_part->name;
std::vector<String> files;
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files);
auto src_disk = storage->getStoragePolicy()->getAnyDisk();
auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
auto tx = dest_disk->createTransaction();
for (const auto & file : files)

if (s3_disk)
{
storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
local_relative_path,
dest_storage->getStoragePolicy()->getAnyDisk(),
remote_relative_path,
read_settings,
write_settings,
nullptr);
}
else
{
auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings);
auto write_buffer = tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
copyData(*read_buffer, *write_buffer);
write_buffer->finalize();
std::vector<String> files;
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files);
auto src_disk = storage->getStoragePolicy()->getAnyDisk();
auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
auto tx = dest_disk->createTransaction();
for (const auto & file : files)
{
auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings);
auto write_buffer = tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
copyData(*read_buffer, *write_buffer);
write_buffer->finalize();
}
tx->commit();
}
tx->commit();


LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Upload part {} to disk {} success.",
Expand Down

0 comments on commit 616c5b0

Please sign in to comment.