Skip to content

Commit

Permalink
fix: Enable true async mode #35
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-f committed Dec 19, 2021
1 parent c378f94 commit 640da1e
Show file tree
Hide file tree
Showing 33 changed files with 565 additions and 122 deletions.
68 changes: 33 additions & 35 deletions examples/stress.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <cstdint>
#include <random>

#include <atomic>
#include <iostream>

#include "kademlia/endpoint.hpp"
Expand Down Expand Up @@ -29,9 +29,9 @@ using Session = Kademlia::Session;
namespace {


int _saved = 0, _loaded = 0;
int _savedBytes = 0, _loadedBytes = 0;
int _saveTime = 0, _loadTime = 0;
std::atomic<int> _saved = 0, _loaded = 0;
std::atomic<int> _savedBytes = 0, _loadedBytes = 0;
std::atomic<int> _saveTime = 0, _loadTime = 0;

LogStream _logger(Logger::get("kademlia_stress"));

Expand All @@ -50,14 +50,14 @@ void load(S& session, std::string const& key)
Timestamp ts;
auto on_load = [key, ts] (std::error_code const& error, Session::DataType const& data)
{
++_loaded;
if (error)
_logger.error() << "Failed to load \"" << key << "\", error: " << error.message() << std::endl;
else
{
Timespan elapsed = Timestamp() - ts;
_loadTime += elapsed.microseconds();
++_loaded;
_loadedBytes += data.size();
_loadedBytes += static_cast<int>(data.size());
_logger.information() << "Loaded \"" << key << "\" (" << data.size() << " bytes) in " << elapsed.microseconds() << " us" << std::endl;
}
};
Expand All @@ -73,16 +73,16 @@ void save(S& session, std::string const& key, std::string const& val)
std::vector<uint8_t> val_vec(val.begin(), val.end());
std::size_t sz = val.size();
Timestamp ts;
auto on_save = [key, ts, sz] (std::error_code const& error)
auto on_save = [key, ts, sz] (const std::error_code& error)
{
++_saved;
if (error)
_logger.error() << "Failed to save \"" << key << "\", error: " << error.message() << std::endl;
else
{
Timespan elapsed = Timestamp() - ts;
_saveTime += elapsed.microseconds();
++_saved;
_savedBytes += sz;
_savedBytes += static_cast<int>(sz);
_logger.information() << "Saved \"" << key << "\" (" << sz << " bytes) in " << elapsed.microseconds() << " us" << std::endl;
}
};
Expand Down Expand Up @@ -117,66 +117,63 @@ int main(int argc, char** argv)
std::uint16_t bootPort6 = kd::getAvailablePort(SocketAddress::IPv6);

int peers = 3;
pool.addCapacity(peers*2);
pool.addCapacity(peers * 2);
int chunks = 24, chunkSize = 50000;

Session firstSession{ k::endpoint{bootAddr4, bootPort4}, k::endpoint{bootAddr6, bootPort6} };
std::vector<Session*> sessions;
sessions.push_back(new Session{ k::endpoint{bootAddr4, bootPort4}, k::endpoint{bootAddr6, bootPort6} });
_logger.information() << "bootstrap session listening on " << bootAddr4 << ':' << bootPort4 << ", " <<
'[' << bootAddr6 << "]:" << bootPort6 << std::endl;

uint16_t sessPort4 = kd::getAvailablePort(SocketAddress::IPv4, bootPort4 + 1);
uint16_t sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, bootPort6 + 1);

std::vector<Session*> sessions;
for (int i = 0; i < peers; ++i)
{
sessions.push_back(new Session{ k::endpoint{ "127.0.0.1", bootPort4 }
, k::endpoint{ "127.0.0.1", sessPort4 }
, k::endpoint{ "::1", sessPort6} });
Session* pSession = new Session{ k::endpoint{ "127.0.0.1", bootPort4 }
, k::endpoint{ "127.0.0.1", sessPort4 }
, k::endpoint{ "::1", sessPort6} };
sessions.push_back(pSession);
_logger.information() << "peer session connected to 127.0.0.1:" << bootPort4 <<
", listening on 127.0.0.1:" << sessPort4 << ", " <<
"[::1]:" << sessPort6 << std::endl;
sessPort4 = kd::getAvailablePort(SocketAddress::IPv4, ++sessPort4);
sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, ++sessPort6);
}
Thread::sleep(100);
for (int i = 0; i < chunks; ++i)

int i = 0;
for (; i < chunks; ++i)
{
std::string k("k"), v;
k += std::to_string(i);
v += std::to_string(i);
v.resize(chunkSize);
save(*sessions[randomSession(peers-1)], k, v);
while (_saved < i + 1) Thread::sleep(1);
}
// wait for all save ops to complete
while (_saved < i) Thread::sleep(10);

_loaded = 0;
for (int i = 0; i < chunks; ++i)
i = 0;
for (; i < chunks; ++i)
{
std::string k("k");
k += std::to_string(i);
load(*sessions[randomSession(peers-1)], k);
while (_loaded < i + 1) Thread::sleep(1);
}
/*
_loaded = 0;
for (int i = 0; i < peers; ++i)
{
std::string k("k");
k += std::to_string(i);
load(firstSession, k);
while (_loaded < i + 1) Thread::sleep(1);
load(*sessions[randomSession(peers - 1)], k);
}
// wait for all load ops to complete
while (_loaded < i) Thread::sleep(10);

abortSession(firstSession);
/*
_loaded = 0;
for (int i = 0; i < peers; ++i)
i = 0;
for (; i < peers; ++i)
{
std::string k("k");
k += std::to_string(i);
load(*sessions[i], k);
while (_loaded < i + 1) Thread::sleep(1);
}
while (_loaded < i) Thread::sleep(1);
*/
for (auto& pS : sessions)
{
Expand All @@ -188,8 +185,9 @@ int main(int argc, char** argv)
ostr << std::endl << "Summary\n=======\n" << peers << " peers, " <<
chunks << " chunks of " << chunkSize << " bytes\n"
"saved " << _savedBytes << " bytes, loaded " << _loadedBytes << " bytes\n"
"Total save time: " << float(_saveTime)/1000 << " [ms]\n"
"Total load time: " << float(_loadTime)/1000 << " [ms]" << std::endl;
"Save time: " << float(_saveTime)/1000 << " [ms]\n"
"Load time: " << float(_loadTime)/1000 << " [ms]\n"
"Total time:" << (float(_saveTime)+float(_loadTime))/1000 << " [ms]" << std::endl;
_logger.information(ostr.str());

return 0;
Expand Down
13 changes: 13 additions & 0 deletions include/kademlia/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "Poco/Net/DatagramSocket.h"
#include "Poco/Net/SocketProactor.h"
#include "kademlia/endpoint.hpp"
#include "kademlia/value_store.hpp"


namespace Kademlia {
Expand All @@ -36,6 +37,7 @@ class KADEMLIA_SYMBOL_VISIBILITY Session
using Endpoint = kademlia::endpoint;
using SaveHandlerType = std::function<void (const std::error_code&)>;
using LoadHandlerType = std::function<void (const std::error_code&, const DataType& data)>;
using ValueStoreType = kademlia::detail::value_store_type;

static const std::uint16_t DEFAULT_PORT;

Expand All @@ -46,6 +48,8 @@ class KADEMLIA_SYMBOL_VISIBILITY Session

~Session();

bool initialized() const;

void asyncSave(KeyType const& key, DataType&& data, SaveHandlerType&& handler);

template<typename K, typename D>
Expand All @@ -63,16 +67,25 @@ class KADEMLIA_SYMBOL_VISIBILITY Session
asyncLoad(KeyType(std::begin(key), std::end(key)), std::move(handler));
}

const ValueStoreType& data() const;

std::error_code run();

void abort();
std::error_code wait();

const Poco::Net::SocketProactor& ioService() const
{
return _ioService;
}

private:
using RunType = Poco::ActiveMethod<std::error_code, void, Session>;
using Result = Poco::ActiveResult<std::error_code>;
using ResultPtr = std::unique_ptr<Result>;

bool tryWaitForIOService(int ms);

Result& result()
{
if (!_pResult)
Expand Down
2 changes: 1 addition & 1 deletion poco
1 change: 0 additions & 1 deletion src/kademlia/DiscoverNeighborsTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ class DiscoverNeighborsTask final
<< int(h.type_) << ")" << std::endl;
search_ourselves(task);
return;

};

FindPeerResponseBody response;
Expand Down
54 changes: 43 additions & 11 deletions src/kademlia/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
#include "NotifyPeerTask.h"
#include "Tracker.h"
#include "Message.h"

#include "Poco/Mutex.h"
#include "Poco/ScopedLock.h"

namespace kademlia {
namespace detail {
Expand All @@ -68,14 +69,13 @@ class Engine final
{
public:
using key_type = std::vector<std::uint8_t>;
using data_type = std::vector<std::uint8_t>;
using routing_table_type = routing_table<Poco::Net::SocketAddress>;
using value_store_type = value_store<id, data_type>;

public:
Engine(Poco::Net::SocketProactor& io_service, endpoint const& ipv4, endpoint const& ipv6, id const& new_id = id{}):
Engine(Poco::Net::SocketProactor& io_service, endpoint const& ipv4, endpoint const& ipv6, id const& new_id = id{}, bool initialized = true):
random_engine_(std::random_device{}()),
my_id_(new_id == id{} ? id{ random_engine_ } : new_id),
_initialized(initialized),
network_(io_service,
MessageSocketType::ipv4(io_service, ipv4),
MessageSocketType::ipv6(io_service, ipv6),
Expand All @@ -93,39 +93,64 @@ class Engine final

Engine(Poco::Net::SocketProactor& io_service, endpoint const& initial_peer,
endpoint const& ipv4, endpoint const& ipv6, id const& new_id = id{}):
Engine(io_service, ipv4, ipv6, new_id)
Engine(io_service, ipv4, ipv6, new_id, false)
{
LOG_DEBUG(Engine, this) << "Engine bootstrapping using peer '" << initial_peer << "'." << std::endl;
bool initialized = false;
auto on_initialized = [&initialized]
auto on_initialized = [this]
{
initialized = true;
_initialized = true;
};

discover_neighbors(initial_peer, on_initialized);

while (!initialized)
while (!_initialized)
io_service.poll();
}

Engine(Engine const&) = delete;

Engine & operator = (Engine const&) = delete;

bool initialized() const
{
return _initialized;
}

void start()
{
network_.start();
}

template<typename HandlerType>
void asyncSave(key_type const& key, data_type&& data, HandlerType&& handler)
{
LOG_DEBUG( engine, this ) << "executing async save of key '" << toString(key) << "'." << std::endl;
LOG_DEBUG(Engine, this) << "executing async save of key '" << toString(key) << "'." << std::endl;
id valID(key);
Poco::Mutex::ScopedLock l(_mutex);
value_store_[valID] = data;
start_store_value_task(id(key), std::move(data), tracker_, routing_table_, std::forward<HandlerType>(handler));
}

template<typename HandlerType>
void asyncLoad(key_type const& key, HandlerType&& handler)
{
LOG_DEBUG( engine, this ) << "executing async load of key '" << toString( key ) << "'." << std::endl;
LOG_DEBUG(Engine, this) << "executing async load of key '" << toString( key ) << "'." << std::endl;
id valID(key);
Poco::Mutex::ScopedLock l(_mutex);
auto it = value_store_.find(valID);
if (it != value_store_.end())
{
handler(std::error_code(), it->second);
return;
}
start_find_value_task< data_type >(id(key), tracker_, routing_table_, std::forward<HandlerType>(handler));
}

const value_store_type& data() const
{
return value_store_;
}

private:
using pending_task_type = std::function<void ()>;
using MessageSocketType = MessageSocket<UnderlyingSocketType>;
Expand Down Expand Up @@ -175,6 +200,7 @@ class Engine final
<< failure.message() << ")." << std::endl;
return;
}
Poco::Mutex::ScopedLock l(_mutex);
value_store_[request.data_key_hash_] = std::move(request.data_value_);
}

Expand Down Expand Up @@ -230,9 +256,13 @@ class Engine final
return;
}

Poco::ScopedLockWithUnlock<Poco::Mutex> l(_mutex);
auto found = value_store_.find(request.value_to_find_);
if (found == value_store_.end())
{
l.unlock();
send_find_peer_response(sender, h.random_token_, request.value_to_find_);
}
else
{
FindValueResponseBody const response{ found->second };
Expand Down Expand Up @@ -314,11 +344,13 @@ class Engine final
private:
random_engine_type random_engine_;
id my_id_;
std::atomic<bool> _initialized;
NetworkType network_;
TrackerType tracker_;
routing_table_type routing_table_;
value_store_type value_store_;
std::size_t pending_notifications_count_;
Poco::Mutex _mutex;
};

} // namespace detail
Expand Down
7 changes: 4 additions & 3 deletions src/kademlia/FindValueTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ class FindValueTask final : public LookupTask
RoutingTableType & routing_table, LoadHandlerType load_handler):
LookupTask(searched_key,
routing_table.find(searched_key),
routing_table.end()),
routing_table.end(),
tracker.addressV4(),
tracker.addressV6()),
tracker_(tracker),
load_handler_(std::move(load_handler)),
is_finished_()
Expand Down Expand Up @@ -132,8 +134,7 @@ class FindValueTask final : public LookupTask
static void try_candidates(std::shared_ptr<FindValueTask> task,
std::size_t concurrent_requests_count = CONCURRENT_FIND_PEER_REQUESTS_COUNT)
{
auto const closest_candidates = task->select_new_closest_candidates
(concurrent_requests_count);
auto closest_candidates = task->select_new_closest_candidates(concurrent_requests_count);

FindValueRequestBody const request{ task->get_key() };
for (auto const& c : closest_candidates)
Expand Down
Loading

0 comments on commit 640da1e

Please sign in to comment.