Skip to content

Commit

Permalink
Test benchmark (#36)
Browse files Browse the repository at this point in the history
* add save/load time measuring

* fix: Enable true async mode #35

* chore(stress): Linux compile fixes
  • Loading branch information
aleks-f authored Dec 19, 2021
1 parent 07558fe commit d70a9b0
Show file tree
Hide file tree
Showing 33 changed files with 597 additions and 128 deletions.
106 changes: 65 additions & 41 deletions examples/stress.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <cstdint>
#include <cstdlib>

#include <random>
#include <atomic>
#include <iostream>

#include "kademlia/endpoint.hpp"
Expand Down Expand Up @@ -29,28 +29,39 @@ using Session = Kademlia::Session;
namespace {


int _saved = 0, _loaded = 0;
std::atomic<int> _saved(0), _loaded(0);
std::atomic<int> _savedBytes(0), _loadedBytes(0);
std::atomic<int> _saveTime(0), _loadTime(0);

LogStream _logger(Logger::get("kademlia_stress"));

int randomSession(int maxID)
{
std::random_device rd;
std::default_random_engine eng(rd());
std::uniform_int_distribution<int> distr(0, maxID);
return distr(eng);
}

template <typename S>
void load(S& session, std::string const& key)
{
std::vector<uint8_t> key_vec(key.begin(), key.end());
Timestamp ts;
auto on_load = [key, ts] (std::error_code const& error, Session::DataType const& data)
{
++_loaded;
if (error)
_logger.error() << "Failed to load \"" << key << "\", error: " << error.message() << std::endl;
else
{
Timespan elapsed = Timestamp() - ts;
++_loaded;
std::string const& str{ data.begin(), data.end() };
_logger.information() << "Loaded \"" << key << /*"\" as \"" << str << */"\" in " << elapsed.microseconds() << " us" << std::endl;
_loadTime += elapsed.microseconds();
_loadedBytes += static_cast<int>(data.size());
_logger.information() << "Loaded \"" << key << "\" (" << data.size() << " bytes) in " << elapsed.microseconds() << " us" << std::endl;
}
};

ts.update();
session.asyncLoad(key_vec, std::move(on_load));
_logger.debug() << "Async loading \"" << key << "\" ..." << std::endl;
}
Expand All @@ -60,19 +71,22 @@ void save(S& session, std::string const& key, std::string const& val)
{
std::vector<uint8_t> key_vec(key.begin(), key.end());
std::vector<uint8_t> val_vec(val.begin(), val.end());
std::size_t sz = val.size();
Timestamp ts;
auto on_save = [key, ts] (std::error_code const& error)
auto on_save = [key, ts, sz] (const std::error_code& error)
{
++_saved;
if (error)
_logger.error() << "Failed to save \"" << key << "\", error: " << error.message() << std::endl;
else
{
Timespan elapsed = Timestamp() - ts;
++_saved;
_logger.information() << "Saved \"" << key << "\" in " << elapsed.microseconds() << " us" << std::endl;
_saveTime += elapsed.microseconds();
_savedBytes += static_cast<int>(sz);
_logger.information() << "Saved \"" << key << "\" (" << sz << " bytes) in " << elapsed.microseconds() << " us" << std::endl;
}
};

ts.update();
session.asyncSave(key_vec, std::move(val_vec), std::move(on_save));
_logger.debug() << "Async saving \"" << key << /*"\": \"" << val <<*/ "\"" << std::endl;
}
Expand Down Expand Up @@ -102,70 +116,80 @@ int main(int argc, char** argv)
std::uint16_t bootPort4 = kd::getAvailablePort(SocketAddress::IPv4);
std::uint16_t bootPort6 = kd::getAvailablePort(SocketAddress::IPv6);

int reps = 5;
pool.addCapacity(reps*2);
int peers = 3;
pool.addCapacity(peers * 2);
int chunks = 24, chunkSize = 50000;

Session firstSession{ k::endpoint{bootAddr4, bootPort4}, k::endpoint{bootAddr6, bootPort6} };
std::vector<Session*> sessions;
sessions.push_back(new Session{ k::endpoint{bootAddr4, bootPort4}, k::endpoint{bootAddr6, bootPort6} });
_logger.information() << "bootstrap session listening on " << bootAddr4 << ':' << bootPort4 << ", " <<
'[' << bootAddr6 << "]:" << bootPort6 << std::endl;

uint16_t sessPort4 = kd::getAvailablePort(SocketAddress::IPv4, bootPort4 + 1);
uint16_t sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, bootPort6 + 1);

std::vector<Session*> sessions;
for (int i = 0; i < reps; ++i)
for (int i = 0; i < peers; ++i)
{
sessions.push_back(new Session{ k::endpoint{ "127.0.0.1", bootPort4 }
, k::endpoint{ "127.0.0.1", sessPort4 }
, k::endpoint{ "::1", sessPort6} });
Session* pSession = new Session{ k::endpoint{ "127.0.0.1", bootPort4 }
, k::endpoint{ "127.0.0.1", sessPort4 }
, k::endpoint{ "::1", sessPort6} };
sessions.push_back(pSession);
_logger.information() << "peer session connected to 127.0.0.1:" << bootPort4 <<
", listening on 127.0.0.1:" << sessPort4 << ", " <<
"[::1]:" << sessPort6 << std::endl;
sessPort4 = kd::getAvailablePort(SocketAddress::IPv4, ++sessPort4);
sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, ++sessPort6);
}
Thread::sleep(100);
for (int i = 0; i < reps; ++i)
{
std::string k("k"), v(65000, 'v');
k += std::to_string(i);
v += std::to_string(i);
save(*sessions[0], k, v);
while (_saved < i + 1) Thread::sleep(1);
}

_loaded = 0;
for (int i = 0; i < reps; ++i)
int i = 0;
for (; i < chunks; ++i)
{
std::string k("k");
std::string k("k"), v;
k += std::to_string(i);
load(*sessions[i], k);
while (_loaded < i + 1) Thread::sleep(1);
v += std::to_string(i);
v.resize(chunkSize);
save(*sessions[randomSession(peers-1)], k, v);
}
// wait for all save ops to complete
while (_saved < i) Thread::sleep(10);

_loaded = 0;
for (int i = 0; i < reps; ++i)
i = 0;
for (; i < chunks; ++i)
{
std::string k("k");
k += std::to_string(i);
load(firstSession, k);
while (_loaded < i + 1) Thread::sleep(1);
load(*sessions[randomSession(peers - 1)], k);
}
// wait for all load ops to complete
while (_loaded < i) Thread::sleep(10);

abortSession(firstSession);
/*
_loaded = 0;
for (int i = 0; i < reps; ++i)
i = 0;
for (; i < peers; ++i)
{
std::string k("k");
k += std::to_string(i);
load(*sessions[i], k);
while (_loaded < i + 1) Thread::sleep(1);
}

while (_loaded < i) Thread::sleep(1);
*/
for (auto& pS : sessions)
{
abortSession(*pS);
delete pS;
}

std::ostringstream ostr;
ostr << std::endl << "Summary\n=======\n" << peers << " peers, " <<
chunks << " chunks of " << chunkSize << " bytes\n"
"saved " << _savedBytes << " bytes, loaded " << _loadedBytes << " bytes\n"
"Save time: " << float(_saveTime)/1000 << " [ms]\n"
"Load time: " << float(_loadTime)/1000 << " [ms]\n"
"Total time:" << (float(_saveTime)+float(_loadTime))/1000 << " [ms]" << std::endl;
_logger.information(ostr.str());

std::cout << "Goodbye!" << std::endl;
return 0;
}
catch (std::exception& ex)
Expand Down
13 changes: 13 additions & 0 deletions include/kademlia/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "Poco/Net/DatagramSocket.h"
#include "Poco/Net/SocketProactor.h"
#include "kademlia/endpoint.hpp"
#include "kademlia/value_store.hpp"


namespace Kademlia {
Expand All @@ -36,6 +37,7 @@ class KADEMLIA_SYMBOL_VISIBILITY Session
using Endpoint = kademlia::endpoint;
using SaveHandlerType = std::function<void (const std::error_code&)>;
using LoadHandlerType = std::function<void (const std::error_code&, const DataType& data)>;
using ValueStoreType = kademlia::detail::value_store_type;

static const std::uint16_t DEFAULT_PORT;

Expand All @@ -46,6 +48,8 @@ class KADEMLIA_SYMBOL_VISIBILITY Session

~Session();

bool initialized() const;

void asyncSave(KeyType const& key, DataType&& data, SaveHandlerType&& handler);

template<typename K, typename D>
Expand All @@ -63,16 +67,25 @@ class KADEMLIA_SYMBOL_VISIBILITY Session
asyncLoad(KeyType(std::begin(key), std::end(key)), std::move(handler));
}

const ValueStoreType& data() const;

std::error_code run();

void abort();
std::error_code wait();

const Poco::Net::SocketProactor& ioService() const
{
return _ioService;
}

private:
using RunType = Poco::ActiveMethod<std::error_code, void, Session>;
using Result = Poco::ActiveResult<std::error_code>;
using ResultPtr = std::unique_ptr<Result>;

bool tryWaitForIOService(int ms);

Result& result()
{
if (!_pResult)
Expand Down
2 changes: 1 addition & 1 deletion poco
1 change: 0 additions & 1 deletion src/kademlia/DiscoverNeighborsTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ class DiscoverNeighborsTask final
<< int(h.type_) << ")" << std::endl;
search_ourselves(task);
return;

};

FindPeerResponseBody response;
Expand Down
54 changes: 43 additions & 11 deletions src/kademlia/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
#include "NotifyPeerTask.h"
#include "Tracker.h"
#include "Message.h"

#include "Poco/Mutex.h"
#include "Poco/ScopedLock.h"

namespace kademlia {
namespace detail {
Expand All @@ -68,14 +69,13 @@ class Engine final
{
public:
using key_type = std::vector<std::uint8_t>;
using data_type = std::vector<std::uint8_t>;
using routing_table_type = routing_table<Poco::Net::SocketAddress>;
using value_store_type = value_store<id, data_type>;

public:
Engine(Poco::Net::SocketProactor& io_service, endpoint const& ipv4, endpoint const& ipv6, id const& new_id = id{}):
Engine(Poco::Net::SocketProactor& io_service, endpoint const& ipv4, endpoint const& ipv6, id const& new_id = id{}, bool initialized = true):
random_engine_(std::random_device{}()),
my_id_(new_id == id{} ? id{ random_engine_ } : new_id),
_initialized(initialized),
network_(io_service,
MessageSocketType::ipv4(io_service, ipv4),
MessageSocketType::ipv6(io_service, ipv6),
Expand All @@ -93,39 +93,64 @@ class Engine final

Engine(Poco::Net::SocketProactor& io_service, endpoint const& initial_peer,
endpoint const& ipv4, endpoint const& ipv6, id const& new_id = id{}):
Engine(io_service, ipv4, ipv6, new_id)
Engine(io_service, ipv4, ipv6, new_id, false)
{
LOG_DEBUG(Engine, this) << "Engine bootstrapping using peer '" << initial_peer << "'." << std::endl;
bool initialized = false;
auto on_initialized = [&initialized]
auto on_initialized = [this]
{
initialized = true;
_initialized = true;
};

discover_neighbors(initial_peer, on_initialized);

while (!initialized)
while (!_initialized)
io_service.poll();
}

Engine(Engine const&) = delete;

Engine & operator = (Engine const&) = delete;

bool initialized() const
{
return _initialized;
}

void start()
{
network_.start();
}

template<typename HandlerType>
void asyncSave(key_type const& key, data_type&& data, HandlerType&& handler)
{
LOG_DEBUG( engine, this ) << "executing async save of key '" << toString(key) << "'." << std::endl;
LOG_DEBUG(Engine, this) << "executing async save of key '" << toString(key) << "'." << std::endl;
id valID(key);
Poco::Mutex::ScopedLock l(_mutex);
value_store_[valID] = data;
start_store_value_task(id(key), std::move(data), tracker_, routing_table_, std::forward<HandlerType>(handler));
}

template<typename HandlerType>
void asyncLoad(key_type const& key, HandlerType&& handler)
{
LOG_DEBUG( engine, this ) << "executing async load of key '" << toString( key ) << "'." << std::endl;
LOG_DEBUG(Engine, this) << "executing async load of key '" << toString( key ) << "'." << std::endl;
id valID(key);
Poco::Mutex::ScopedLock l(_mutex);
auto it = value_store_.find(valID);
if (it != value_store_.end())
{
handler(std::error_code(), it->second);
return;
}
start_find_value_task< data_type >(id(key), tracker_, routing_table_, std::forward<HandlerType>(handler));
}

const value_store_type& data() const
{
return value_store_;
}

private:
using pending_task_type = std::function<void ()>;
using MessageSocketType = MessageSocket<UnderlyingSocketType>;
Expand Down Expand Up @@ -175,6 +200,7 @@ class Engine final
<< failure.message() << ")." << std::endl;
return;
}
Poco::Mutex::ScopedLock l(_mutex);
value_store_[request.data_key_hash_] = std::move(request.data_value_);
}

Expand Down Expand Up @@ -230,9 +256,13 @@ class Engine final
return;
}

Poco::ScopedLockWithUnlock<Poco::Mutex> l(_mutex);
auto found = value_store_.find(request.value_to_find_);
if (found == value_store_.end())
{
l.unlock();
send_find_peer_response(sender, h.random_token_, request.value_to_find_);
}
else
{
FindValueResponseBody const response{ found->second };
Expand Down Expand Up @@ -314,11 +344,13 @@ class Engine final
private:
random_engine_type random_engine_;
id my_id_;
std::atomic<bool> _initialized;
NetworkType network_;
TrackerType tracker_;
routing_table_type routing_table_;
value_store_type value_store_;
std::size_t pending_notifications_count_;
Poco::Mutex _mutex;
};

} // namespace detail
Expand Down
Loading

0 comments on commit d70a9b0

Please sign in to comment.