Skip to content

Commit

Permalink
add save/load time measuring
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-f committed Dec 6, 2021
1 parent 07558fe commit c378f94
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 21 deletions.
66 changes: 46 additions & 20 deletions examples/stress.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include <cstdint>
#include <cstdlib>
#include <random>

#include <iostream>

Expand Down Expand Up @@ -30,9 +30,19 @@ namespace {


int _saved = 0, _loaded = 0;
int _savedBytes = 0, _loadedBytes = 0;
int _saveTime = 0, _loadTime = 0;

LogStream _logger(Logger::get("kademlia_stress"));

int randomSession(int maxID)
{
std::random_device rd;
std::default_random_engine eng(rd());
std::uniform_int_distribution<int> distr(0, maxID);
return distr(eng);
}

template <typename S>
void load(S& session, std::string const& key)
{
Expand All @@ -45,12 +55,13 @@ void load(S& session, std::string const& key)
else
{
Timespan elapsed = Timestamp() - ts;
_loadTime += elapsed.microseconds();
++_loaded;
std::string const& str{ data.begin(), data.end() };
_logger.information() << "Loaded \"" << key << /*"\" as \"" << str << */"\" in " << elapsed.microseconds() << " us" << std::endl;
_loadedBytes += data.size();
_logger.information() << "Loaded \"" << key << "\" (" << data.size() << " bytes) in " << elapsed.microseconds() << " us" << std::endl;
}
};

ts.update();
session.asyncLoad(key_vec, std::move(on_load));
_logger.debug() << "Async loading \"" << key << "\" ..." << std::endl;
}
Expand All @@ -60,19 +71,22 @@ void save(S& session, std::string const& key, std::string const& val)
{
std::vector<uint8_t> key_vec(key.begin(), key.end());
std::vector<uint8_t> val_vec(val.begin(), val.end());
std::size_t sz = val.size();
Timestamp ts;
auto on_save = [key, ts] (std::error_code const& error)
auto on_save = [key, ts, sz] (std::error_code const& error)
{
if (error)
_logger.error() << "Failed to save \"" << key << "\", error: " << error.message() << std::endl;
else
{
Timespan elapsed = Timestamp() - ts;
_saveTime += elapsed.microseconds();
++_saved;
_logger.information() << "Saved \"" << key << "\" in " << elapsed.microseconds() << " us" << std::endl;
_savedBytes += sz;
_logger.information() << "Saved \"" << key << "\" (" << sz << " bytes) in " << elapsed.microseconds() << " us" << std::endl;
}
};

ts.update();
session.asyncSave(key_vec, std::move(val_vec), std::move(on_save));
_logger.debug() << "Async saving \"" << key << /*"\": \"" << val <<*/ "\"" << std::endl;
}
Expand Down Expand Up @@ -102,8 +116,9 @@ int main(int argc, char** argv)
std::uint16_t bootPort4 = kd::getAvailablePort(SocketAddress::IPv4);
std::uint16_t bootPort6 = kd::getAvailablePort(SocketAddress::IPv6);

int reps = 5;
pool.addCapacity(reps*2);
int peers = 3;
pool.addCapacity(peers*2);
int chunks = 24, chunkSize = 50000;

Session firstSession{ k::endpoint{bootAddr4, bootPort4}, k::endpoint{bootAddr6, bootPort6} };
_logger.information() << "bootstrap session listening on " << bootAddr4 << ':' << bootPort4 << ", " <<
Expand All @@ -113,7 +128,7 @@ int main(int argc, char** argv)
uint16_t sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, bootPort6 + 1);

std::vector<Session*> sessions;
for (int i = 0; i < reps; ++i)
for (int i = 0; i < peers; ++i)
{
sessions.push_back(new Session{ k::endpoint{ "127.0.0.1", bootPort4 }
, k::endpoint{ "127.0.0.1", sessPort4 }
Expand All @@ -125,26 +140,27 @@ int main(int argc, char** argv)
sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, ++sessPort6);
}
Thread::sleep(100);
for (int i = 0; i < reps; ++i)
for (int i = 0; i < chunks; ++i)
{
std::string k("k"), v(65000, 'v');
std::string k("k"), v;
k += std::to_string(i);
v += std::to_string(i);
save(*sessions[0], k, v);
v.resize(chunkSize);
save(*sessions[randomSession(peers-1)], k, v);
while (_saved < i + 1) Thread::sleep(1);
}

_loaded = 0;
for (int i = 0; i < reps; ++i)
for (int i = 0; i < chunks; ++i)
{
std::string k("k");
k += std::to_string(i);
load(*sessions[i], k);
load(*sessions[randomSession(peers-1)], k);
while (_loaded < i + 1) Thread::sleep(1);
}

/*
_loaded = 0;
for (int i = 0; i < reps; ++i)
for (int i = 0; i < peers; ++i)
{
std::string k("k");
k += std::to_string(i);
Expand All @@ -154,18 +170,28 @@ int main(int argc, char** argv)
abortSession(firstSession);
_loaded = 0;
for (int i = 0; i < reps; ++i)
for (int i = 0; i < peers; ++i)
{
std::string k("k");
k += std::to_string(i);
load(*sessions[i], k);
while (_loaded < i + 1) Thread::sleep(1);
}

*/
for (auto& pS : sessions)
{
abortSession(*pS);
delete pS;
}

std::ostringstream ostr;
ostr << std::endl << "Summary\n=======\n" << peers << " peers, " <<
chunks << " chunks of " << chunkSize << " bytes\n"
"saved " << _savedBytes << " bytes, loaded " << _loadedBytes << " bytes\n"
"Total save time: " << float(_saveTime)/1000 << " [ms]\n"
"Total load time: " << float(_loadTime)/1000 << " [ms]" << std::endl;
_logger.information(ostr.str());

std::cout << "Goodbye!" << std::endl;
return 0;
}
catch (std::exception& ex)
Expand Down
2 changes: 1 addition & 1 deletion src/kademlia/log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ bool isLogEnabled(std::string const& module);
* can be costly, its result is cached in a static variable.
*/

#define KADEMLIA_ENABLE_DEBUG
//#define KADEMLIA_ENABLE_DEBUG
#ifdef KADEMLIA_ENABLE_DEBUG
# define LOG_DEBUG(module, thiz) \
for (bool used = false; ! used; used = true) \
Expand Down

0 comments on commit c378f94

Please sign in to comment.