diff --git a/examples/stress.cpp b/examples/stress.cpp index 40c5ef3..40d69db 100644 --- a/examples/stress.cpp +++ b/examples/stress.cpp @@ -1,5 +1,5 @@ #include -#include +#include #include @@ -30,9 +30,19 @@ namespace { int _saved = 0, _loaded = 0; +int _savedBytes = 0, _loadedBytes = 0; +int _saveTime = 0, _loadTime = 0; LogStream _logger(Logger::get("kademlia_stress")); +int randomSession(int maxID) +{ + std::random_device rd; + std::default_random_engine eng(rd()); + std::uniform_int_distribution distr(0, maxID); + return distr(eng); +} + template void load(S& session, std::string const& key) { @@ -45,12 +55,13 @@ void load(S& session, std::string const& key) else { Timespan elapsed = Timestamp() - ts; + _loadTime += elapsed.microseconds(); ++_loaded; - std::string const& str{ data.begin(), data.end() }; - _logger.information() << "Loaded \"" << key << /*"\" as \"" << str << */"\" in " << elapsed.microseconds() << " us" << std::endl; + _loadedBytes += data.size(); + _logger.information() << "Loaded \"" << key << "\" (" << data.size() << " bytes) in " << elapsed.microseconds() << " us" << std::endl; } }; - + ts.update(); session.asyncLoad(key_vec, std::move(on_load)); _logger.debug() << "Async loading \"" << key << "\" ..." << std::endl; } @@ -60,19 +71,22 @@ void save(S& session, std::string const& key, std::string const& val) { std::vector key_vec(key.begin(), key.end()); std::vector val_vec(val.begin(), val.end()); + std::size_t sz = val.size(); Timestamp ts; - auto on_save = [key, ts] (std::error_code const& error) + auto on_save = [key, ts, sz] (std::error_code const& error) { if (error) _logger.error() << "Failed to save \"" << key << "\", error: " << error.message() << std::endl; else { Timespan elapsed = Timestamp() - ts; + _saveTime += elapsed.microseconds(); ++_saved; - _logger.information() << "Saved \"" << key << "\" in " << elapsed.microseconds() << " us" << std::endl; + _savedBytes += sz; + _logger.information() << "Saved \"" << key << "\" (" << sz << " bytes) in " << elapsed.microseconds() << " us" << std::endl; } }; - + ts.update(); session.asyncSave(key_vec, std::move(val_vec), std::move(on_save)); _logger.debug() << "Async saving \"" << key << /*"\": \"" << val <<*/ "\"" << std::endl; } @@ -102,8 +116,9 @@ int main(int argc, char** argv) std::uint16_t bootPort4 = kd::getAvailablePort(SocketAddress::IPv4); std::uint16_t bootPort6 = kd::getAvailablePort(SocketAddress::IPv6); - int reps = 5; - pool.addCapacity(reps*2); + int peers = 3; + pool.addCapacity(peers*2); + int chunks = 24, chunkSize = 50000; Session firstSession{ k::endpoint{bootAddr4, bootPort4}, k::endpoint{bootAddr6, bootPort6} }; _logger.information() << "bootstrap session listening on " << bootAddr4 << ':' << bootPort4 << ", " << @@ -113,7 +128,7 @@ int main(int argc, char** argv) uint16_t sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, bootPort6 + 1); std::vector sessions; - for (int i = 0; i < reps; ++i) + for (int i = 0; i < peers; ++i) { sessions.push_back(new Session{ k::endpoint{ "127.0.0.1", bootPort4 } , k::endpoint{ "127.0.0.1", sessPort4 } @@ -125,26 +140,27 @@ int main(int argc, char** argv) sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, ++sessPort6); } Thread::sleep(100); - for (int i = 0; i < reps; ++i) + for (int i = 0; i < chunks; ++i) { - std::string k("k"), v(65000, 'v'); + std::string k("k"), v; k += std::to_string(i); v += std::to_string(i); - save(*sessions[0], k, v); + v.resize(chunkSize); + save(*sessions[randomSession(peers-1)], k, v); while (_saved < i + 1) Thread::sleep(1); } _loaded = 0; - for (int i = 0; i < reps; ++i) + for (int i = 0; i < chunks; ++i) { std::string k("k"); k += std::to_string(i); - load(*sessions[i], k); + load(*sessions[randomSession(peers-1)], k); while (_loaded < i + 1) Thread::sleep(1); } - +/* _loaded = 0; - for (int i = 0; i < reps; ++i) + for (int i = 0; i < peers; ++i) { std::string k("k"); k += std::to_string(i); @@ -154,18 +170,28 @@ int main(int argc, char** argv) abortSession(firstSession); _loaded = 0; - for (int i = 0; i < reps; ++i) + for (int i = 0; i < peers; ++i) { std::string k("k"); k += std::to_string(i); load(*sessions[i], k); while (_loaded < i + 1) Thread::sleep(1); } - +*/ for (auto& pS : sessions) + { abortSession(*pS); + delete pS; + } + + std::ostringstream ostr; + ostr << std::endl << "Summary\n=======\n" << peers << " peers, " << + chunks << " chunks of " << chunkSize << " bytes\n" + "saved " << _savedBytes << " bytes, loaded " << _loadedBytes << " bytes\n" + "Total save time: " << float(_saveTime)/1000 << " [ms]\n" + "Total load time: " << float(_loadTime)/1000 << " [ms]" << std::endl; + _logger.information(ostr.str()); - std::cout << "Goodbye!" << std::endl; return 0; } catch (std::exception& ex) diff --git a/src/kademlia/log.hpp b/src/kademlia/log.hpp index cbb4952..ff500ab 100644 --- a/src/kademlia/log.hpp +++ b/src/kademlia/log.hpp @@ -60,7 +60,7 @@ bool isLogEnabled(std::string const& module); * can be costly, its result is cached in a static variable. */ -#define KADEMLIA_ENABLE_DEBUG +//#define KADEMLIA_ENABLE_DEBUG #ifdef KADEMLIA_ENABLE_DEBUG # define LOG_DEBUG(module, thiz) \ for (bool used = false; ! used; used = true) \