Skip to content

Commit

Permalink
Add async #207
Browse files Browse the repository at this point in the history
  • Loading branch information
nikkitashl committed Sep 14, 2022
1 parent e0755f0 commit 0db1e1d
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 38 deletions.
1 change: 1 addition & 0 deletions remote_helper/engines/base_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct IEngine {
sourc3::ReporterType progress;
int64_t verbosity = 0;
uint32_t depth = kInfiniteDepth;
bool is_async = true;

virtual ~BaseOptions() = default;

Expand Down
221 changes: 183 additions & 38 deletions remote_helper/engines/ipfs/ipfs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,11 @@ std::vector<ObjectWithContent> GetAllObjects(const std::string& root_ipfs_hash,
template <typename Context>
std::vector<ObjectWithContent> GetAllObjectsAsync(const std::string& root_ipfs_hash,
sourc3::IProgressReporter& progress,
IWalletClient& client,
Context& context) {
IWalletClient& client, Context& base_context) {
namespace ba = boost::asio;
std::vector<ObjectWithContent> objects;
ba::spawn(context, [&objects, &root_ipfs_hash, &client,
&progress](IWalletClient::AsyncContext context) {
ba::spawn(base_context, [&objects, &root_ipfs_hash, &client,
&progress](IWalletClient::AsyncContext context) {
CommitMetaBlock commit(GetStringFromIPFSAsync(root_ipfs_hash, client, context));
ba::spawn(context, [&](IWalletClient::AsyncContext context2) {
auto commit_content = GetStringFromIPFSAsync(commit.hash.ipfs, client, context2);
Expand All @@ -157,8 +156,12 @@ std::vector<ObjectWithContent> GetAllObjectsAsync(const std::string& root_ipfs_h
auto tree_objects = GetObjectsFromTreeMetaAsync(tree, progress, client, context3);
std::move(tree_objects.begin(), tree_objects.end(), std::back_inserter(objects));
for (auto&& parent_hash : commit.parent_hashes) {
auto parent_objects = GetAllObjectsAsync(parent_hash.ipfs, progress, client, context3);
std::move(parent_objects.begin(), parent_objects.end(), std::back_inserter(objects));
ba::spawn(context3, [&](IWalletClient::AsyncContext context4) {
auto parent_objects =
GetAllObjectsAsync(parent_hash.ipfs, progress, client, context4);
std::move(parent_objects.begin(), parent_objects.end(),
std::back_inserter(objects));
});
}
});
});
Expand All @@ -181,6 +184,21 @@ std::vector<ObjectWithContent> GetUploadedObjects(const std::vector<sourc3::Ref>
return objects;
}

std::vector<ObjectWithContent> GetUploadedObjectsAsync(const std::vector<sourc3::Ref>& refs,
IWalletClient& client,
sourc3::ReporterType reporter_type,
boost::asio::io_context& context) {
std::vector<ObjectWithContent> objects;
auto progress =
MakeProgress("Enumerate uploaded objects", client.GetUploadedObjectCount(), reporter_type);
for (const auto& ref : refs) {
auto ref_objects = GetAllObjectsAsync(ref.ipfs_hash, *progress, client, context);
std::move(ref_objects.begin(), ref_objects.end(), std::back_inserter(objects));
}
progress->Done();
return objects;
}

struct ObjectsAndMetas {
std::vector<ObjectWithContent> objects;
Metas metas;
Expand Down Expand Up @@ -211,6 +229,47 @@ ObjectsAndMetas GetAllObjectsWithMeta(const std::string& root_ipfs_hash,
return {std::move(objects), std::move(metas)};
}

template <typename Context>
ObjectsAndMetas GetAllObjectsWithMetaAsync(const std::string& root_ipfs_hash,
sourc3::IProgressReporter& progress,
IWalletClient& client, Context& base_context) {
namespace ba = boost::asio;
std::vector<ObjectWithContent> objects;
unordered_map<string, variant<TreeMetaBlock, CommitMetaBlock>> metas;
ba::spawn(base_context, [&](IWalletClient::AsyncContext context) {
CommitMetaBlock commit(GetStringFromIPFSAsync(root_ipfs_hash, client, context));
metas[root_ipfs_hash] = commit;
ba::spawn(context, [&](IWalletClient::AsyncContext context2) {
auto commit_content = GetStringFromIPFSAsync(commit.hash.ipfs, client, context2);
objects.emplace_back(GIT_OBJECT_COMMIT, std::move(commit.hash.oid),
std::move(commit.hash.ipfs), std::move(commit_content));
progress.AddProgress(1);
ba::spawn(context2, [&](IWalletClient::AsyncContext context3) {
TreeMetaBlock tree(GetStringFromIPFSAsync(commit.tree_meta_hash, client, context3));
metas[commit.tree_meta_hash] = tree;
ba::spawn(context3, [&](IWalletClient::AsyncContext context4) {
auto tree_objects =
GetObjectsFromTreeMetaAsync(tree, progress, client, context4);
std::move(tree_objects.begin(), tree_objects.end(),
std::back_inserter(objects));
for (auto&& parent_hash : commit.parent_hashes) {
ba::spawn(context4, [&](IWalletClient::AsyncContext context5) {
auto [parent_objects, parent_metas] = GetAllObjectsWithMetaAsync(
parent_hash.ipfs, progress, client, context5);
std::move(parent_objects.begin(), parent_objects.end(),
std::back_inserter(objects));
for (auto&& [key, value] : parent_metas) {
metas[std::move(key)] = std::move(value);
}
});
}
});
});
});
});
return {std::move(objects), std::move(metas)};
}

ObjectsAndMetas GetUploadedObjectsWithMetas(const std::vector<sourc3::Ref>& refs,
IWalletClient& client,
sourc3::ReporterType reporter_type) {
Expand All @@ -229,6 +288,26 @@ ObjectsAndMetas GetUploadedObjectsWithMetas(const std::vector<sourc3::Ref>& refs
return {std::move(objects), std::move(metas)};
}

ObjectsAndMetas GetUploadedObjectsWithMetasAsync(const std::vector<sourc3::Ref>& refs,
IWalletClient& client,
sourc3::ReporterType reporter_type,
boost::asio::io_context& context) {
std::vector<ObjectWithContent> objects;
unordered_map<string, variant<TreeMetaBlock, CommitMetaBlock>> metas;
auto progress =
MakeProgress("Enumerate uploaded objects", client.GetUploadedObjectCount(), reporter_type);
for (const auto& ref : refs) {
auto [ref_objects, ref_metas] =
GetAllObjectsWithMetaAsync(ref.ipfs_hash, *progress, client, context);
std::move(ref_objects.begin(), ref_objects.end(), std::back_inserter(objects));
for (auto&& [key, value] : ref_metas) {
metas[std::move(key)] = std::move(value);
}
}
progress->Done();
return {std::move(objects), std::move(metas)};
}

std::set<git_oid> GetOidsFromObjects(const std::vector<ObjectWithContent>& objects) {
std::set<git_oid> oids;
for (const auto& object : objects) {
Expand All @@ -237,6 +316,81 @@ std::set<git_oid> GetOidsFromObjects(const std::vector<ObjectWithContent>& objec
return oids;
}

void UploadObjects(ObjectCollector& collector, uint32_t& new_objects, uint32_t& new_metas,
Metas& metas, HashMapping& oid_to_ipfs, HashMapping& oid_to_meta,
sourc3::ReporterType reporter_type, IWalletClient& client) {
auto progress =
MakeProgress("Uploading objects to IPFS", collector.m_objects.size(), reporter_type);
size_t i = 0;
for (auto& obj : collector.m_objects) {
if (obj.type == GIT_OBJECT_BLOB) {
++new_objects;
} else {
++new_metas;
}

auto res = client.SaveObjectToIPFS(obj.GetData(), obj.GetSize());
auto r = ParseJsonAndTest(res);
auto hash_str = r.as_object()["result"].as_object()["hash"].as_string();
obj.ipfsHash = ByteBuffer(hash_str.cbegin(), hash_str.cend());
oid_to_ipfs[obj.oid] = std::string(hash_str.cbegin(), hash_str.cend());
auto meta_object = GetMetaBlock(collector, obj, oid_to_meta, oid_to_ipfs);
if (meta_object != nullptr) {
auto meta_buffer = StringToByteBuffer(meta_object->Serialize());
auto meta_res =
ParseJsonAndTest(client.SaveObjectToIPFS(meta_buffer.data(), meta_buffer.size()));
std::string hash =
meta_res.as_object()["result"].as_object()["hash"].as_string().c_str();
oid_to_meta[obj.oid] = hash;
if (obj.type == GIT_OBJECT_COMMIT) {
metas[hash] = *static_cast<CommitMetaBlock*>(meta_object.get());
} else if (obj.type == GIT_OBJECT_TREE) {
metas[hash] = *static_cast<TreeMetaBlock*>(meta_object.get());
}
}
progress->UpdateProgress(++i);
}
}

template <typename Context>
void UploadObjectsAsync(ObjectCollector& collector, uint32_t& new_objects, uint32_t& new_metas,
Metas& metas, HashMapping& oid_to_ipfs, HashMapping& oid_to_meta,
sourc3::ReporterType reporter_type, IWalletClient& client,
Context& base_context) {
auto progress =
MakeProgress("Uploading objects to IPFS", collector.m_objects.size(), reporter_type);
size_t i = 0;
for (auto& obj : collector.m_objects) {
if (obj.type == GIT_OBJECT_BLOB) {
++new_objects;
} else {
++new_metas;
}
boost::asio::spawn(base_context, [&](IWalletClient::AsyncContext context) {
auto res = client.SaveObjectToIPFSAsync(obj.GetData(), obj.GetSize(), context);
auto r = ParseJsonAndTest(res);
auto hash_str = r.as_object()["result"].as_object()["hash"].as_string();
obj.ipfsHash = ByteBuffer(hash_str.cbegin(), hash_str.cend());
oid_to_ipfs[obj.oid] = std::string(hash_str.cbegin(), hash_str.cend());
auto meta_object = GetMetaBlock(collector, obj, oid_to_meta, oid_to_ipfs);
if (meta_object != nullptr) {
auto meta_buffer = StringToByteBuffer(meta_object->Serialize());
auto meta_res = ParseJsonAndTest(
client.SaveObjectToIPFSAsync(meta_buffer.data(), meta_buffer.size(), context));
std::string hash =
meta_res.as_object()["result"].as_object()["hash"].as_string().c_str();
oid_to_meta[obj.oid] = hash;
if (obj.type == GIT_OBJECT_COMMIT) {
metas[hash] = *static_cast<CommitMetaBlock*>(meta_object.get());
} else if (obj.type == GIT_OBJECT_TREE) {
metas[hash] = *static_cast<TreeMetaBlock*>(meta_object.get());
}
}
progress->UpdateProgress(++i);
});
}
}

} // namespace

IEngine::CommandResult FullIPFSEngine::DoCommand(std::string_view command,
Expand Down Expand Up @@ -289,7 +443,10 @@ IEngine::CommandResult FullIPFSEngine::DoFetch(const vector<std::string_view>& a
git::RepoAccessor accessor(client_.GetRepoDir());
size_t total_objects = 0;

auto objects = GetUploadedObjects(RequestRefs(), client_, options_.progress);
auto objects = options_.is_async
? GetUploadedObjectsAsync(RequestRefs(), client_, options_.progress,
client_.GetContext())
: GetUploadedObjects(RequestRefs(), client_, options_.progress);
for (const auto& obj : objects) {
if (git_odb_exists(*accessor.m_odb, &obj.hash) != 0) {
received_objects.insert(ToString(obj.hash));
Expand Down Expand Up @@ -382,9 +539,13 @@ IEngine::CommandResult FullIPFSEngine::DoPush(const vector<std::string_view>& ar
git_oid_cpy(&lr, git_reference_target(*local_ref));
}

boost::asio::io_context& context = client_.GetContext();

auto remote_refs = RequestRefs();
auto [uploaded_objects, metas] =
GetUploadedObjectsWithMetas(remote_refs, client_, options_.progress);
(options_.is_async
? GetUploadedObjectsWithMetasAsync(remote_refs, client_, options_.progress, context)
: GetUploadedObjectsWithMetas(remote_refs, client_, options_.progress));
auto uploaded_oids = GetOidsFromObjects(uploaded_objects);
std::vector<git_oid> merge_bases;
for (const auto& remote_ref : remote_refs) {
Expand Down Expand Up @@ -486,36 +647,12 @@ IEngine::CommandResult FullIPFSEngine::DoPush(const vector<std::string_view>& ar
uint32_t new_objects = 0;
uint32_t new_metas = 0;
{
auto progress = MakeProgress("Uploading objects to IPFS", collector.m_objects.size(),
options_.progress);
size_t i = 0;
for (auto& obj : collector.m_objects) {
if (obj.type == GIT_OBJECT_BLOB) {
++new_objects;
} else {
++new_metas;
}

auto res = client_.SaveObjectToIPFS(obj.GetData(), obj.GetSize());
auto r = ParseJsonAndTest(res);
auto hash_str = r.as_object()["result"].as_object()["hash"].as_string();
obj.ipfsHash = ByteBuffer(hash_str.cbegin(), hash_str.cend());
oid_to_ipfs[obj.oid] = std::string(hash_str.cbegin(), hash_str.cend());
auto meta_object = GetMetaBlock(collector, obj, oid_to_meta, oid_to_ipfs);
if (meta_object != nullptr) {
auto meta_buffer = StringToByteBuffer(meta_object->Serialize());
auto meta_res = ParseJsonAndTest(
client_.SaveObjectToIPFS(meta_buffer.data(), meta_buffer.size()));
std::string hash =
meta_res.as_object()["result"].as_object()["hash"].as_string().c_str();
oid_to_meta[obj.oid] = hash;
if (obj.type == GIT_OBJECT_COMMIT) {
metas[hash] = *static_cast<CommitMetaBlock*>(meta_object.get());
} else if (obj.type == GIT_OBJECT_TREE) {
metas[hash] = *static_cast<TreeMetaBlock*>(meta_object.get());
}
}
progress->UpdateProgress(++i);
if (options_.is_async) {
UploadObjectsAsync(collector, new_objects, new_metas, metas, oid_to_ipfs, oid_to_meta,
options_.progress, client_, context);
} else {
UploadObjects(collector, new_objects, new_metas, metas, oid_to_ipfs, oid_to_meta,
options_.progress, client_);
}
}
if (!is_forced && !CheckCommitsLinking(metas, collector.m_refs, oid_to_meta)) {
Expand Down Expand Up @@ -584,6 +721,14 @@ IEngine::BaseOptions::SetResult FullIPFSEngine::Options::Set(std::string_view op
return SetResult::InvalidValue;
}
return SetResult::Ok;
} else if (option == "is_async") {
if (value == "true") {
is_async = true;
} else if (value == "false") {
is_async = false;
} else {
return SetResult::InvalidValue;
}
} /* else if (option == "verbosity") {
char* endPos;
auto v = std::strtol(value.data(), &endPos, 10);
Expand Down
1 change: 1 addition & 0 deletions remote_helper/wallets/base_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct IWalletClient {
virtual bool WaitForCompletion(WaitFunc&&) = 0;

virtual size_t GetTransactionCount() const = 0;
virtual boost::asio::io_context& GetContext() = 0;

const Options& GetOptions() const {
return options_;
Expand Down
4 changes: 4 additions & 0 deletions remote_helper/wallets/beam_wallet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class BeamWalletClient final : public IWalletClient {
return transactions_.size();
}

boost::asio::io_context& GetContext() final {
return ioc_;
}

private:
std::string InvokeWallet(std::string args) {
args.append(",repo_id=")
Expand Down

0 comments on commit 0db1e1d

Please sign in to comment.