diff --git a/examples/stress.cpp b/examples/stress.cpp index 40c5ef3..07eb31e 100644 --- a/examples/stress.cpp +++ b/examples/stress.cpp @@ -1,6 +1,6 @@ #include -#include - +#include +#include #include #include "kademlia/endpoint.hpp" @@ -29,10 +29,20 @@ using Session = Kademlia::Session; namespace { -int _saved = 0, _loaded = 0; +std::atomic _saved(0), _loaded(0); +std::atomic _savedBytes(0), _loadedBytes(0); +std::atomic _saveTime(0), _loadTime(0); LogStream _logger(Logger::get("kademlia_stress")); +int randomSession(int maxID) +{ + std::random_device rd; + std::default_random_engine eng(rd()); + std::uniform_int_distribution distr(0, maxID); + return distr(eng); +} + template void load(S& session, std::string const& key) { @@ -40,17 +50,18 @@ void load(S& session, std::string const& key) Timestamp ts; auto on_load = [key, ts] (std::error_code const& error, Session::DataType const& data) { + ++_loaded; if (error) _logger.error() << "Failed to load \"" << key << "\", error: " << error.message() << std::endl; else { Timespan elapsed = Timestamp() - ts; - ++_loaded; - std::string const& str{ data.begin(), data.end() }; - _logger.information() << "Loaded \"" << key << /*"\" as \"" << str << */"\" in " << elapsed.microseconds() << " us" << std::endl; + _loadTime += elapsed.microseconds(); + _loadedBytes += static_cast(data.size()); + _logger.information() << "Loaded \"" << key << "\" (" << data.size() << " bytes) in " << elapsed.microseconds() << " us" << std::endl; } }; - + ts.update(); session.asyncLoad(key_vec, std::move(on_load)); _logger.debug() << "Async loading \"" << key << "\" ..." << std::endl; } @@ -60,19 +71,22 @@ void save(S& session, std::string const& key, std::string const& val) { std::vector key_vec(key.begin(), key.end()); std::vector val_vec(val.begin(), val.end()); + std::size_t sz = val.size(); Timestamp ts; - auto on_save = [key, ts] (std::error_code const& error) + auto on_save = [key, ts, sz] (const std::error_code& error) { + ++_saved; if (error) _logger.error() << "Failed to save \"" << key << "\", error: " << error.message() << std::endl; else { Timespan elapsed = Timestamp() - ts; - ++_saved; - _logger.information() << "Saved \"" << key << "\" in " << elapsed.microseconds() << " us" << std::endl; + _saveTime += elapsed.microseconds(); + _savedBytes += static_cast(sz); + _logger.information() << "Saved \"" << key << "\" (" << sz << " bytes) in " << elapsed.microseconds() << " us" << std::endl; } }; - + ts.update(); session.asyncSave(key_vec, std::move(val_vec), std::move(on_save)); _logger.debug() << "Async saving \"" << key << /*"\": \"" << val <<*/ "\"" << std::endl; } @@ -102,70 +116,80 @@ int main(int argc, char** argv) std::uint16_t bootPort4 = kd::getAvailablePort(SocketAddress::IPv4); std::uint16_t bootPort6 = kd::getAvailablePort(SocketAddress::IPv6); - int reps = 5; - pool.addCapacity(reps*2); + int peers = 3; + pool.addCapacity(peers * 2); + int chunks = 24, chunkSize = 50000; - Session firstSession{ k::endpoint{bootAddr4, bootPort4}, k::endpoint{bootAddr6, bootPort6} }; + std::vector sessions; + sessions.push_back(new Session{ k::endpoint{bootAddr4, bootPort4}, k::endpoint{bootAddr6, bootPort6} }); _logger.information() << "bootstrap session listening on " << bootAddr4 << ':' << bootPort4 << ", " << '[' << bootAddr6 << "]:" << bootPort6 << std::endl; uint16_t sessPort4 = kd::getAvailablePort(SocketAddress::IPv4, bootPort4 + 1); uint16_t sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, bootPort6 + 1); - std::vector sessions; - for (int i = 0; i < reps; ++i) + for (int i = 0; i < peers; ++i) { - sessions.push_back(new Session{ k::endpoint{ "127.0.0.1", bootPort4 } - , k::endpoint{ "127.0.0.1", sessPort4 } - , k::endpoint{ "::1", sessPort6} }); + Session* pSession = new Session{ k::endpoint{ "127.0.0.1", bootPort4 } + , k::endpoint{ "127.0.0.1", sessPort4 } + , k::endpoint{ "::1", sessPort6} }; + sessions.push_back(pSession); _logger.information() << "peer session connected to 127.0.0.1:" << bootPort4 << ", listening on 127.0.0.1:" << sessPort4 << ", " << "[::1]:" << sessPort6 << std::endl; sessPort4 = kd::getAvailablePort(SocketAddress::IPv4, ++sessPort4); sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, ++sessPort6); } - Thread::sleep(100); - for (int i = 0; i < reps; ++i) - { - std::string k("k"), v(65000, 'v'); - k += std::to_string(i); - v += std::to_string(i); - save(*sessions[0], k, v); - while (_saved < i + 1) Thread::sleep(1); - } - _loaded = 0; - for (int i = 0; i < reps; ++i) + int i = 0; + for (; i < chunks; ++i) { - std::string k("k"); + std::string k("k"), v; k += std::to_string(i); - load(*sessions[i], k); - while (_loaded < i + 1) Thread::sleep(1); + v += std::to_string(i); + v.resize(chunkSize); + save(*sessions[randomSession(peers-1)], k, v); } + // wait for all save ops to complete + while (_saved < i) Thread::sleep(10); _loaded = 0; - for (int i = 0; i < reps; ++i) + i = 0; + for (; i < chunks; ++i) { std::string k("k"); k += std::to_string(i); - load(firstSession, k); - while (_loaded < i + 1) Thread::sleep(1); + load(*sessions[randomSession(peers - 1)], k); } + // wait for all load ops to complete + while (_loaded < i) Thread::sleep(10); - abortSession(firstSession); +/* _loaded = 0; - for (int i = 0; i < reps; ++i) + i = 0; + for (; i < peers; ++i) { std::string k("k"); k += std::to_string(i); load(*sessions[i], k); - while (_loaded < i + 1) Thread::sleep(1); } - + while (_loaded < i) Thread::sleep(1); +*/ for (auto& pS : sessions) + { abortSession(*pS); + delete pS; + } + + std::ostringstream ostr; + ostr << std::endl << "Summary\n=======\n" << peers << " peers, " << + chunks << " chunks of " << chunkSize << " bytes\n" + "saved " << _savedBytes << " bytes, loaded " << _loadedBytes << " bytes\n" + "Save time: " << float(_saveTime)/1000 << " [ms]\n" + "Load time: " << float(_loadTime)/1000 << " [ms]\n" + "Total time:" << (float(_saveTime)+float(_loadTime))/1000 << " [ms]" << std::endl; + _logger.information(ostr.str()); - std::cout << "Goodbye!" << std::endl; return 0; } catch (std::exception& ex) diff --git a/include/kademlia/Session.h b/include/kademlia/Session.h index ff6ff93..551059b 100644 --- a/include/kademlia/Session.h +++ b/include/kademlia/Session.h @@ -22,6 +22,7 @@ #include "Poco/Net/DatagramSocket.h" #include "Poco/Net/SocketProactor.h" #include "kademlia/endpoint.hpp" +#include "kademlia/value_store.hpp" namespace Kademlia { @@ -36,6 +37,7 @@ class KADEMLIA_SYMBOL_VISIBILITY Session using Endpoint = kademlia::endpoint; using SaveHandlerType = std::function; using LoadHandlerType = std::function; + using ValueStoreType = kademlia::detail::value_store_type; static const std::uint16_t DEFAULT_PORT; @@ -46,6 +48,8 @@ class KADEMLIA_SYMBOL_VISIBILITY Session ~Session(); + bool initialized() const; + void asyncSave(KeyType const& key, DataType&& data, SaveHandlerType&& handler); template @@ -63,16 +67,25 @@ class KADEMLIA_SYMBOL_VISIBILITY Session asyncLoad(KeyType(std::begin(key), std::end(key)), std::move(handler)); } + const ValueStoreType& data() const; + std::error_code run(); void abort(); std::error_code wait(); + const Poco::Net::SocketProactor& ioService() const + { + return _ioService; + } + private: using RunType = Poco::ActiveMethod; using Result = Poco::ActiveResult; using ResultPtr = std::unique_ptr; + bool tryWaitForIOService(int ms); + Result& result() { if (!_pResult) diff --git a/poco b/poco index da70f91..79affbb 160000 --- a/poco +++ b/poco @@ -1 +1 @@ -Subproject commit da70f917965a5f1d0ad26e658bca48bf8a901545 +Subproject commit 79affbbe6e1b77b5b0380faee280a1a6f7289271 diff --git a/src/kademlia/DiscoverNeighborsTask.h b/src/kademlia/DiscoverNeighborsTask.h index 04e8fc4..95e2096 100644 --- a/src/kademlia/DiscoverNeighborsTask.h +++ b/src/kademlia/DiscoverNeighborsTask.h @@ -111,7 +111,6 @@ class DiscoverNeighborsTask final << int(h.type_) << ")" << std::endl; search_ourselves(task); return; - }; FindPeerResponseBody response; diff --git a/src/kademlia/Engine.h b/src/kademlia/Engine.h index ca5cb26..86bc09e 100644 --- a/src/kademlia/Engine.h +++ b/src/kademlia/Engine.h @@ -57,7 +57,8 @@ #include "NotifyPeerTask.h" #include "Tracker.h" #include "Message.h" - +#include "Poco/Mutex.h" +#include "Poco/ScopedLock.h" namespace kademlia { namespace detail { @@ -68,14 +69,13 @@ class Engine final { public: using key_type = std::vector; - using data_type = std::vector; using routing_table_type = routing_table; - using value_store_type = value_store; public: - Engine(Poco::Net::SocketProactor& io_service, endpoint const& ipv4, endpoint const& ipv6, id const& new_id = id{}): + Engine(Poco::Net::SocketProactor& io_service, endpoint const& ipv4, endpoint const& ipv6, id const& new_id = id{}, bool initialized = true): random_engine_(std::random_device{}()), my_id_(new_id == id{} ? id{ random_engine_ } : new_id), + _initialized(initialized), network_(io_service, MessageSocketType::ipv4(io_service, ipv4), MessageSocketType::ipv6(io_service, ipv6), @@ -93,18 +93,17 @@ class Engine final Engine(Poco::Net::SocketProactor& io_service, endpoint const& initial_peer, endpoint const& ipv4, endpoint const& ipv6, id const& new_id = id{}): - Engine(io_service, ipv4, ipv6, new_id) + Engine(io_service, ipv4, ipv6, new_id, false) { LOG_DEBUG(Engine, this) << "Engine bootstrapping using peer '" << initial_peer << "'." << std::endl; - bool initialized = false; - auto on_initialized = [&initialized] + auto on_initialized = [this] { - initialized = true; + _initialized = true; }; discover_neighbors(initial_peer, on_initialized); - while (!initialized) + while (!_initialized) io_service.poll(); } @@ -112,20 +111,46 @@ class Engine final Engine & operator = (Engine const&) = delete; + bool initialized() const + { + return _initialized; + } + + void start() + { + network_.start(); + } + template void asyncSave(key_type const& key, data_type&& data, HandlerType&& handler) { - LOG_DEBUG( engine, this ) << "executing async save of key '" << toString(key) << "'." << std::endl; + LOG_DEBUG(Engine, this) << "executing async save of key '" << toString(key) << "'." << std::endl; + id valID(key); + Poco::Mutex::ScopedLock l(_mutex); + value_store_[valID] = data; start_store_value_task(id(key), std::move(data), tracker_, routing_table_, std::forward(handler)); } template void asyncLoad(key_type const& key, HandlerType&& handler) { - LOG_DEBUG( engine, this ) << "executing async load of key '" << toString( key ) << "'." << std::endl; + LOG_DEBUG(Engine, this) << "executing async load of key '" << toString( key ) << "'." << std::endl; + id valID(key); + Poco::Mutex::ScopedLock l(_mutex); + auto it = value_store_.find(valID); + if (it != value_store_.end()) + { + handler(std::error_code(), it->second); + return; + } start_find_value_task< data_type >(id(key), tracker_, routing_table_, std::forward(handler)); } + const value_store_type& data() const + { + return value_store_; + } + private: using pending_task_type = std::function; using MessageSocketType = MessageSocket; @@ -175,6 +200,7 @@ class Engine final << failure.message() << ")." << std::endl; return; } + Poco::Mutex::ScopedLock l(_mutex); value_store_[request.data_key_hash_] = std::move(request.data_value_); } @@ -230,9 +256,13 @@ class Engine final return; } + Poco::ScopedLockWithUnlock l(_mutex); auto found = value_store_.find(request.value_to_find_); if (found == value_store_.end()) + { + l.unlock(); send_find_peer_response(sender, h.random_token_, request.value_to_find_); + } else { FindValueResponseBody const response{ found->second }; @@ -314,11 +344,13 @@ class Engine final private: random_engine_type random_engine_; id my_id_; + std::atomic _initialized; NetworkType network_; TrackerType tracker_; routing_table_type routing_table_; value_store_type value_store_; std::size_t pending_notifications_count_; + Poco::Mutex _mutex; }; } // namespace detail diff --git a/src/kademlia/FindValueTask.h b/src/kademlia/FindValueTask.h index 236e8ec..1ea5e24 100644 --- a/src/kademlia/FindValueTask.h +++ b/src/kademlia/FindValueTask.h @@ -101,7 +101,9 @@ class FindValueTask final : public LookupTask RoutingTableType & routing_table, LoadHandlerType load_handler): LookupTask(searched_key, routing_table.find(searched_key), - routing_table.end()), + routing_table.end(), + tracker.addressV4(), + tracker.addressV6()), tracker_(tracker), load_handler_(std::move(load_handler)), is_finished_() @@ -132,8 +134,7 @@ class FindValueTask final : public LookupTask static void try_candidates(std::shared_ptr task, std::size_t concurrent_requests_count = CONCURRENT_FIND_PEER_REQUESTS_COUNT) { - auto const closest_candidates = task->select_new_closest_candidates - (concurrent_requests_count); + auto closest_candidates = task->select_new_closest_candidates(concurrent_requests_count); FindValueRequestBody const request{ task->get_key() }; for (auto const& c : closest_candidates) diff --git a/src/kademlia/LookupTask.cpp b/src/kademlia/LookupTask.cpp index b5406c0..b2c5924 100644 --- a/src/kademlia/LookupTask.cpp +++ b/src/kademlia/LookupTask.cpp @@ -1,5 +1,5 @@ #include "LookupTask.h" - +#include "constants.hpp" namespace kademlia { namespace detail { @@ -7,20 +7,24 @@ namespace detail { void LookupTask::flag_candidate_as_valid(id const& candidate_id) { + Poco::Mutex::ScopedLock l(_mutex); auto i = find_candidate(candidate_id); if (i == candidates_.end()) return; if (in_flight_requests_count_) --in_flight_requests_count_; + i->second.attempts_ = 0; i->second.state_ = candidate::STATE_RESPONDED; } void LookupTask::flag_candidate_as_invalid(id const& candidate_id) { + Poco::Mutex::ScopedLock l(_mutex); auto i = find_candidate(candidate_id); if (i == candidates_.end()) return; if (in_flight_requests_count_) --in_flight_requests_count_; + ++i->second.attempts_; i->second.state_ = candidate::STATE_TIMEDOUT; } @@ -28,17 +32,27 @@ void LookupTask::flag_candidate_as_invalid(id const& candidate_id) std::vector LookupTask::select_new_closest_candidates(std::size_t max_count) { std::vector candidates; + Poco::Mutex::ScopedLock l(_mutex); // Iterate over all candidates until we picked // candidates_max_count not-contacted candidates. for (auto i = candidates_.begin(), e = candidates_.end() ; i != e && in_flight_requests_count_ < max_count ; ++ i) { - if (i->second.state_ == candidate::STATE_UNKNOWN) + if (!isSelf(i->second.peer_.endpoint_)) { - i->second.state_ = candidate::STATE_CONTACTED; - ++ in_flight_requests_count_; - candidates.push_back(i->second.peer_); + // TODO: strictly speaking, only STATE_UNKNOWN should be checked here, + // but we also check STATE_TIMEDOUT as well because, + // when running in truly async mode with a significant I/O load, + // some peers fail to respond with the lookup value (the exact + // reason why should be investigated) + if (i->second.state_ == candidate::STATE_UNKNOWN || + (i->second.state_ == candidate::STATE_TIMEDOUT && i->second.attempts_ < MAX_FIND_PEER_ATTEMPT_COUNT)) + { + i->second.state_ = candidate::STATE_CONTACTED; + ++in_flight_requests_count_; + candidates.push_back(i->second.peer_); + } } } @@ -46,14 +60,26 @@ std::vector LookupTask::select_new_closest_candidates(std::size_t max_coun } +bool LookupTask::has_valid_candidate() const +{ + Poco::Mutex::ScopedLock l(_mutex); + for (auto i = candidates_.begin(); i != candidates_.end(); ++i) + { + if (i->second.state_ == candidate::STATE_RESPONDED) + return true; + } + return false; +} + + std::vector LookupTask::select_closest_valid_candidates(std::size_t max_count) { std::vector candidates; - + Poco::Mutex::ScopedLock l(_mutex); // Iterate over all candidates until we picked // candidates_max_count responsive candidates. for (auto i = candidates_.begin(), e = candidates_.end() - ; i != e && candidates.size() second.state_ == candidate::STATE_RESPONDED) @@ -77,6 +103,7 @@ id const& LookupTask::get_key() const void LookupTask::add_candidate(Peer const& p) { + Poco::Mutex::ScopedLock l(_mutex); LOG_DEBUG(LookupTask, this) << "adding (" << candidates_.size() << ")'" << p <<"' key:(" << key_ << ')' << std::endl; diff --git a/src/kademlia/LookupTask.h b/src/kademlia/LookupTask.h index bb18273..fc0156a 100644 --- a/src/kademlia/LookupTask.h +++ b/src/kademlia/LookupTask.h @@ -36,6 +36,7 @@ #include "Peer.h" #include "kademlia/log.hpp" +#include "Poco/Mutex.h" namespace kademlia { namespace detail { @@ -52,6 +53,8 @@ class LookupTask std::vector select_closest_valid_candidates(std::size_t max_count); + bool has_valid_candidate() const; + template void add_candidates(Peers const& peers) { @@ -69,15 +72,24 @@ class LookupTask ~LookupTask() = default; template - LookupTask(id const & key, Iterator i, Iterator e) + LookupTask(id const & key, Iterator i, Iterator e, + const Poco::Net::SocketAddress& addressV4, + const Poco::Net::SocketAddress& addressV6) : key_{ key }, in_flight_requests_count_{ 0 }, - candidates_{} + candidates_{}, + _addressV4(addressV4), + _addressV6(addressV6) { for (; i != e; ++i) add_candidate(Peer{ i->first, i->second }); } + virtual bool isSelf(Poco::Net::SocketAddress& endpoint) + { + return (endpoint == _addressV4 || endpoint == _addressV6); + } + private: struct candidate final { @@ -88,20 +100,22 @@ class LookupTask STATE_CONTACTED, STATE_RESPONDED, STATE_TIMEDOUT, - } state_; + } state_ = STATE_UNKNOWN; + int attempts_ = 0; }; using candidates_type = std::map; -private: void add_candidate(Peer const& p); candidates_type::iterator find_candidate(id const& candidate_id); -private: id key_; std::size_t in_flight_requests_count_; candidates_type candidates_; + Poco::Net::SocketAddress _addressV4; + Poco::Net::SocketAddress _addressV6; + mutable Poco::Mutex _mutex; }; inline std::size_t LookupTask::inFlightRequests() const diff --git a/src/kademlia/MessageSocket.h b/src/kademlia/MessageSocket.h index 3346e8d..d95ca44 100644 --- a/src/kademlia/MessageSocket.h +++ b/src/kademlia/MessageSocket.h @@ -178,7 +178,6 @@ class MessageSocket final if (!failure) std::advance(e, bytes_received); callback(failure, current_message_sender_, i, e); }; - _socket.asyncReceiveFrom(reception_buffer_, current_message_sender_, std::move(on_completion)); } @@ -195,9 +194,9 @@ class MessageSocket final _socket.asyncSendTo(std::move(message), to, std::move(on_completion)); } - SocketAddress local_endpoint() const + SocketAddress address() const { - return { _socket.address().host(), _socket.address().port() }; + return _socket.address(); } private: diff --git a/src/kademlia/Network.h b/src/kademlia/Network.h index 732841b..2a8ae9b 100644 --- a/src/kademlia/Network.h +++ b/src/kademlia/Network.h @@ -58,17 +58,25 @@ class Network final on_message_received_(on_message_received) { start_message_reception(on_message_received); - LOG_DEBUG(Network, this) << "Network created at '" << socket_ipv4_.local_endpoint().toString() - << "' and '" << socket_ipv6_.local_endpoint().toString() << "'." << std::endl; + LOG_DEBUG(Network, this) << "Network created at '" << socket_ipv4_.address().toString() + << "' and '" << socket_ipv6_.address().toString() << "'." << std::endl; } Network(Network const&) = delete; Network& operator = (Network const&) = delete; + void start() + { + start_message_reception(on_message_received_); + LOG_DEBUG(Network, this) << "started message reception." << std::endl; + } + template void send(Message&& message, const Poco::Net::SocketAddress& e, OnMessageSent const& on_message_sent) { + if (get_socket_for(e).address().toString() == e.toString()) + std::cout << this << '\t' << get_socket_for(e).address().toString() << " => " << e.toString() << std::endl; get_socket_for(e).async_send(std::move(message), e, on_message_sent); } @@ -78,6 +86,16 @@ class Network final return MessageSocketType::resolve_endpoint(e); } + Poco::Net::SocketAddress addressV4() + { + return socket_ipv4_.address(); + } + + Poco::Net::SocketAddress addressV6() + { + return socket_ipv6_.address(); + } + private: void start_message_reception(on_message_received_type on_message_received) { @@ -106,7 +124,8 @@ class Network final std::cerr << "failure=" << failure.message() << std::endl; throw std::system_error{failure}; } - + if (current_subnet.address().toString() == sender.toString()) + std::cout << this << '\t' << current_subnet.address().toString() << " <= " << sender.toString() << std::endl; on_message_received_(sender, i, e); schedule_receive_on_socket(current_subnet); }; diff --git a/src/kademlia/NotifyPeerTask.h b/src/kademlia/NotifyPeerTask.h index 3aa1d27..d3a7666 100644 --- a/src/kademlia/NotifyPeerTask.h +++ b/src/kademlia/NotifyPeerTask.h @@ -58,7 +58,11 @@ class NotifyPeerTask final : public LookupTask private: template NotifyPeerTask(detail::id const & key, TrackerType & tracker, RoutingTableType & routing_table, OnFinishType on_finish): - LookupTask(key, routing_table.find(key), routing_table.end()), + LookupTask(key, + routing_table.find(key), + routing_table.end(), + tracker.addressV4(), + tracker.addressV6()), tracker_(tracker), on_finish_( on_finish ) { @@ -72,7 +76,7 @@ class NotifyPeerTask final : public LookupTask FindPeerRequestBody const request{ task->get_key() }; - auto const closest_peers = task->select_new_closest_candidates(CONCURRENT_FIND_PEER_REQUESTS_COUNT); + auto closest_peers = task->select_new_closest_candidates(CONCURRENT_FIND_PEER_REQUESTS_COUNT); LOG_DEBUG(NotifyPeerTask, task.get()) << "sending find Peer to notify " << closest_peers.size() << " owner buckets." << std::endl; @@ -127,6 +131,7 @@ class NotifyPeerTask final : public LookupTask return; } + task->add_candidates(response.peers_); // If new candidate have been discovered, notify them. task->add_candidates(response.peers_); try_to_notify_neighbors(task); diff --git a/src/kademlia/ResponseCallbacks.h b/src/kademlia/ResponseCallbacks.h index 5cc9e64..51bd85b 100644 --- a/src/kademlia/ResponseCallbacks.h +++ b/src/kademlia/ResponseCallbacks.h @@ -55,11 +55,6 @@ class ResponseCallbacks final std::error_code dispatch_response(Poco::Net::SocketAddress const& sender, Header const& h , buffer::const_iterator i, buffer::const_iterator e ); - bool has(id const& message_id) const - { - return callbacks_.find(message_id) != callbacks_.end(); - } - private: using callbacks = std::map; callbacks callbacks_; diff --git a/src/kademlia/ResponseRouter.cpp b/src/kademlia/ResponseRouter.cpp index e9aeed5..c882ec3 100644 --- a/src/kademlia/ResponseRouter.cpp +++ b/src/kademlia/ResponseRouter.cpp @@ -18,10 +18,14 @@ namespace detail { buffer::const_iterator i, buffer::const_iterator e) { LOG_DEBUG(ResponseRouter, this) << "dispatching response from " << sender.toString() << std::endl; + std::error_code failure; // Try to forward the message to its associated callback. - auto failure = response_callbacks_.dispatch_response(sender, h, i, e); + { + Poco::Mutex::ScopedLock l(_mutex); + failure = response_callbacks_.dispatch_response(sender, h, i, e); + } if (failure == UNASSOCIATED_MESSAGE_ID)// Unknown or unassociated responses discarded. - LOG_DEBUG(ResponseRouter, this) << "dropping unknown response." << std::endl; + LOG_DEBUG(ResponseRouter, this) << "dropping unknown response from " << sender.toString() << std::endl; } } } diff --git a/src/kademlia/ResponseRouter.h b/src/kademlia/ResponseRouter.h index fdc139a..cb6335b 100644 --- a/src/kademlia/ResponseRouter.h +++ b/src/kademlia/ResponseRouter.h @@ -33,6 +33,7 @@ #include "ResponseCallbacks.h" #include "kademlia/Timer.h" #include "kademlia/log.hpp" +#include "Poco/Mutex.h" namespace kademlia { @@ -57,22 +58,27 @@ class ResponseRouter final { auto on_timeout = [ this, on_error, response_id ] () { - // If a callback has been removed, that means + // If a callback is removed, that means // the message has never been received // hence report the timeout to the client. - if (response_callbacks_.remove_callback( response_id )) + Poco::Mutex::ScopedLock l(_mutex); + if (response_callbacks_.remove_callback( response_id )) on_error(make_error_code(std::errc::timed_out)); }; // Associate the response id with the // on_response_received callback. - response_callbacks_.push_callback(response_id, on_response_received); + { + Poco::Mutex::ScopedLock l(_mutex); + response_callbacks_.push_callback(response_id, on_response_received); + } timer_.expires_from_now(callback_ttl, on_timeout); } private: ResponseCallbacks response_callbacks_; Timer timer_; + Poco::Mutex _mutex; }; } // namespace detail diff --git a/src/kademlia/Session.cpp b/src/kademlia/Session.cpp index 0477022..7b2ffc9 100644 --- a/src/kademlia/Session.cpp +++ b/src/kademlia/Session.cpp @@ -15,14 +15,20 @@ #include "kademlia/Session.h" #include "kademlia/error.hpp" +#include "kademlia/constants.hpp" #include "error_impl.hpp" #include "SocketAdapter.h" #include "Engine.h" #include "Poco/Timespan.h" +#include "Poco/Thread.h" +#include "Poco/Stopwatch.h" namespace Kademlia { using Endpoint = Session::Endpoint; +using Poco::Timespan; +using Poco::Thread; +using Poco::Stopwatch; class EngineImpl { @@ -54,10 +60,13 @@ const std::uint16_t Session::DEFAULT_PORT = 27980; Session::Session(Endpoint const& ipv4, Endpoint const& ipv6, int ms) try: _runMethod(this, &Kademlia::Session::run), - _ioService(Poco::Timespan(Poco::Timespan::TimeDiff(ms)*1000)), + _ioService(Timespan(Timespan::TimeDiff(ms)*1000)), _pEngine(new EngineImpl(_ioService, ipv4, ipv6)) { result(); + if (!tryWaitForIOService(static_cast(kademlia::detail::INITIAL_CONTACT_RECEIVE_TIMEOUT.count()))) + throw Poco::TimeoutException("Session: IO service not available."); + Poco::Thread::sleep(100); } catch (std::exception& ex) { @@ -72,12 +81,15 @@ catch (...) Session::Session(Endpoint const& initPeer, Endpoint const& ipv4, Endpoint const& ipv6, int ms) -try: +try : _runMethod(this, &Kademlia::Session::run), - _ioService(Poco::Timespan(Poco::Timespan::TimeDiff(ms)*1000)), + _ioService(Poco::Timespan(Poco::Timespan::TimeDiff(ms) * 1000)), _pEngine(new EngineImpl(_ioService, initPeer, ipv4, ipv6)) { result(); + if (!tryWaitForIOService(static_cast(kademlia::detail::INITIAL_CONTACT_RECEIVE_TIMEOUT.count()))) + throw Poco::TimeoutException("Session: IO service not available."); + Poco::Thread::sleep(100); } catch (std::exception& ex) { @@ -98,6 +110,47 @@ Session::~Session() } +bool Session::tryWaitForIOService(int ms) +{ + Stopwatch sw; + sw.start(); + + while (!_ioService.isRunning()) + { + if ((sw.elapsed()/1000) > ms) return false; + Poco::Thread::sleep(10); + } + + while (_ioService.hasSocketHandlers() == 0) + { + if ((sw.elapsed()/1000) > ms) return false; + Poco::Thread::sleep(10); + } + + // TODO: there is a small opportunity here for this check + // to be performed before IO completion has started, + // in which case, we'll consider inititialization + // complete when it is not, opening a possibilty + // for "misssing peers" errors; there should be a + // way for proactor to tell us that the IO completion + // (1) was going on, and now (2) it is not; currently, + // we are only checking (2) + while (_ioService.ioCompletionInProgress()) + { + if ((sw.elapsed()/1000) > ms) return false; + Poco::Thread::sleep(10); + } + + return true; +} + + +bool Session::initialized() const +{ + return _pEngine->engine().initialized(); +} + + std::error_code Session::run() { Poco::FastMutex::ScopedLock l(_mutex); @@ -136,5 +189,9 @@ void Session::asyncLoad(KeyType const& key, LoadHandlerType handler ) _pEngine->engine().asyncLoad(key, std::move(handler)); } +const Session::ValueStoreType& Session::data() const +{ + return _pEngine->engine().data(); +} } // namespace kademlia diff --git a/src/kademlia/StoreValueTask.h b/src/kademlia/StoreValueTask.h index aeec8d8..bc818fe 100644 --- a/src/kademlia/StoreValueTask.h +++ b/src/kademlia/StoreValueTask.h @@ -62,15 +62,19 @@ class StoreValueTask final : public LookupTask template< typename RoutingTableType, typename HandlerType > StoreValueTask(detail::id const & key, DataType&& data, TrackerType & tracker , RoutingTableType & routing_table, HandlerType && save_handler): - LookupTask(key, routing_table.find(key), routing_table.end()) + LookupTask(key, + routing_table.find(key), + routing_table.end(), + tracker.addressV4(), + tracker.addressV6()) , tracker_(tracker) , data_(std::move(data)) , save_handler_(std::forward< HandlerType >(save_handler)) { LOG_DEBUG(StoreValueTask, this) << "create store value task for '" - << key << "' value(" << toString(data) - << ")." << std::endl; + << key /*<< "' value(" << toString(data) + << ")."*/ << std::endl; } void notify_caller(std::error_code const& failure) @@ -173,17 +177,16 @@ class StoreValueTask final : public LookupTask static void send_store_requests(std::shared_ptr task) { auto const & candidates = task->select_closest_valid_candidates(REDUNDANT_SAVE_COUNT); - - LOG_DEBUG(StoreValueTask, task.get()) - << "sending store request to " - << candidates.size() << " candidates" << std::endl; - for (auto c : candidates) - send_store_request(c, task); - if (candidates.empty()) task->notify_caller(make_error_code(MISSING_PEERS)); else + { + LOG_DEBUG(StoreValueTask, task.get()) + << "sending store request to " + << candidates.size() << " candidates" << std::endl; + for (auto c : candidates) send_store_request(c, task); task->notify_caller(std::error_code{}); + } } static void send_store_request(Peer const& current_candidate, std::shared_ptr task) diff --git a/src/kademlia/Timer.cpp b/src/kademlia/Timer.cpp index c585c17..5cf1303 100644 --- a/src/kademlia/Timer.cpp +++ b/src/kademlia/Timer.cpp @@ -24,9 +24,7 @@ // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "Timer.h" - #include "kademlia/error_impl.hpp" -#include "Poco/Clock.h" using namespace std::chrono; using namespace Poco; @@ -49,6 +47,7 @@ void Timer::schedule_next_tick(time_point const& expiration_time) Poco::Mutex::ScopedLock l(_mutex); if (!timeouts_.empty()) { + LOG_DEBUG(Timer, this) << "\ttimeouts=" << timeouts_.size() << std::endl; // The callbacks to execute are the first // n callbacks with the same keys. auto begin = timeouts_.begin(); @@ -83,7 +82,7 @@ void Timer::schedule_next_tick(time_point const& expiration_time) Poco::Timestamp::TimeDiff Timer::getTimeout(time_point const& expiration_time) { auto t = duration_cast(expiration_time.time_since_epoch()); - Poco::Timestamp ts(t.count()-Clock().raw()); + Poco::Timestamp ts(t.count() - duration_cast(clock().now().time_since_epoch()).count()); return ts.epochMicroseconds()/1000; } diff --git a/src/kademlia/Timer.h b/src/kademlia/Timer.h index 95fd5d9..ac78072 100644 --- a/src/kademlia/Timer.h +++ b/src/kademlia/Timer.h @@ -43,7 +43,7 @@ class Timer final { public: using clock = std::chrono::steady_clock; - using duration = clock::duration; + using duration = std::chrono::milliseconds; public: explicit Timer(Poco::Net::SocketProactor& ioService); @@ -51,7 +51,7 @@ class Timer final template< typename Callback > void expires_from_now(duration const& timeout, Callback const& on_timer_expired) { - auto expiration_time = clock::now() + timeout; + auto expiration_time = clock::now() + std::chrono::duration_cast(timeout); // this lambda is a workaround to enforce asio behavior (timer cancellation // is considered a task); it removes all but one scheduled completion handler, diff --git a/src/kademlia/Tracker.h b/src/kademlia/Tracker.h index d30b1ff..472189c 100644 --- a/src/kademlia/Tracker.h +++ b/src/kademlia/Tracker.h @@ -55,6 +55,7 @@ class Tracker final public: Tracker(Poco::Net::SocketProactor& io_service, id const& my_id, NetworkType & network, random_engine_type & random_engine): + io_service_(io_service), response_router_(io_service), message_serializer_(my_id), network_(network), @@ -66,10 +67,16 @@ class Tracker final Tracker(Tracker const&) = delete; Tracker& operator = (Tracker const&) = delete; + void waitOnIO() + { + //while (!io_service_.isRunning()) Poco::Thread::sleep(10); + } + template< typename Request, typename OnResponseReceived, typename OnError > void send_request(Request const& request, const Poco::Net::SocketAddress& e, Timer::duration const& timeout , OnResponseReceived const& on_response_received, OnError const& on_error) { + waitOnIO(); id const response_id(random_engine_); // Generate the request buffer. auto message = message_serializer_.serialize(request, response_id); @@ -94,6 +101,7 @@ class Tracker final template void send_request(Request const& request, Poco::Net::SocketAddress const& e) { + waitOnIO(); id const response_id(random_engine_); send_response(response_id, request, e); } @@ -115,7 +123,18 @@ class Tracker final response_router_.handle_new_response(s, h, i, e); } + Poco::Net::SocketAddress addressV4() + { + return network_.addressV4(); + } + + Poco::Net::SocketAddress addressV6() + { + return network_.addressV6(); + } + private: + Poco::Net::SocketProactor& io_service_; ResponseRouter response_router_; MessageSerializer message_serializer_; NetworkType & network_; diff --git a/src/kademlia/constants.cpp b/src/kademlia/constants.cpp index 167e28b..a3bb219 100644 --- a/src/kademlia/constants.cpp +++ b/src/kademlia/constants.cpp @@ -30,6 +30,7 @@ namespace detail { std::size_t const ROUTING_TABLE_BUCKET_SIZE{ 20 }; std::size_t const CONCURRENT_FIND_PEER_REQUESTS_COUNT{ 3 }; +std::size_t const MAX_FIND_PEER_ATTEMPT_COUNT{ 3 }; std::size_t const REDUNDANT_SAVE_COUNT{ 3 }; std::chrono::milliseconds const INITIAL_CONTACT_RECEIVE_TIMEOUT{ 1000 }; diff --git a/src/kademlia/constants.hpp b/src/kademlia/constants.hpp index 9d93109..1c3d20d 100644 --- a/src/kademlia/constants.hpp +++ b/src/kademlia/constants.hpp @@ -35,16 +35,13 @@ namespace kademlia { namespace detail { -// k + extern std::size_t const ROUTING_TABLE_BUCKET_SIZE; -// a extern std::size_t const CONCURRENT_FIND_PEER_REQUESTS_COUNT; -// c +extern std::size_t const MAX_FIND_PEER_ATTEMPT_COUNT; extern std::size_t const REDUNDANT_SAVE_COUNT; -// extern std::chrono::milliseconds const INITIAL_CONTACT_RECEIVE_TIMEOUT; -// extern std::chrono::milliseconds const PEER_LOOKUP_TIMEOUT; } // namespace detail diff --git a/src/kademlia/id.cpp b/src/kademlia/id.cpp index 5a92744..c8d019f 100644 --- a/src/kademlia/id.cpp +++ b/src/kademlia/id.cpp @@ -76,6 +76,8 @@ id::id std::generate( blocks_.begin(), blocks_.end() , std::bind( distribution, std::ref( random_engine ) ) ); + // an additional effort to prevent duplicates + std::shuffle(blocks_.begin(), blocks_.end(), random_engine); } id::id diff --git a/src/kademlia/log.cpp b/src/kademlia/log.cpp index 564d4d1..065b6ae 100644 --- a/src/kademlia/log.cpp +++ b/src/kademlia/log.cpp @@ -119,7 +119,7 @@ struct LogInitializer //enableLogFor("NotifyPeerTask"); //enableLogFor("ResponseCallbacks"); //enableLogFor("ResponseRouter"); - enableLogFor("routing_table"); + //enableLogFor("routing_table"); //enableLogFor("Session"); //enableLogFor("SessionImpl"); //enableLogFor("SocketAdapter"); diff --git a/src/kademlia/log.hpp b/src/kademlia/log.hpp index cbb4952..569f5c4 100644 --- a/src/kademlia/log.hpp +++ b/src/kademlia/log.hpp @@ -60,7 +60,7 @@ bool isLogEnabled(std::string const& module); * can be costly, its result is cached in a static variable. */ -#define KADEMLIA_ENABLE_DEBUG + #ifdef KADEMLIA_ENABLE_DEBUG # define LOG_DEBUG(module, thiz) \ for (bool used = false; ! used; used = true) \ diff --git a/src/kademlia/value_store.hpp b/src/kademlia/value_store.hpp index bd8e638..e114abf 100644 --- a/src/kademlia/value_store.hpp +++ b/src/kademlia/value_store.hpp @@ -34,6 +34,7 @@ #include #include #include +#include "id.hpp" #include "Poco/Hash.h" @@ -59,6 +60,10 @@ using value_store = std::unordered_map , Value , value_store_key_hasher< Key > >; +using data_type = std::vector; + +using value_store_type = value_store; + } // namespace detail } // namespace kademlia diff --git a/test/TestEngine.h b/test/TestEngine.h index 89b26a6..3c79d36 100644 --- a/test/TestEngine.h +++ b/test/TestEngine.h @@ -26,9 +26,10 @@ #include #include "Poco/Net/SocketProactor.h" #include "kademlia/Session.h" -#include +#include "kademlia/endpoint.hpp" #include "kademlia/log.hpp" #include "kademlia/buffer.hpp" +#include "kademlia/value_store.hpp" #include "kademlia/Engine.h" #include "FakeSocket.h" @@ -57,14 +58,14 @@ class TestEngine final void asyncSave(std::string const& key, std::string&& data, Callable & callable) { impl::key_type const k{ key.begin(), key.end() }; - engine_.asyncSave(k, impl::data_type{data.begin(), data.end()}, callable); + engine_.asyncSave(k, kademlia::detail::data_type{data.begin(), data.end()}, callable); } template< typename Callable > void asyncLoad(std::string const& key, Callable & callable) { impl::key_type const k{ key.begin(), key.end() }; - auto c = [ callable ](std::error_code const& failure, impl::data_type const& data) + auto c = [ callable ](std::error_code const& failure, kademlia::detail::data_type const& data) { callable(failure, std::string{ data.begin(), data.end() }); }; diff --git a/test/unit_tests/CMakeLists.txt b/test/unit_tests/CMakeLists.txt index e553137..205d74a 100644 --- a/test/unit_tests/CMakeLists.txt +++ b/test/unit_tests/CMakeLists.txt @@ -76,6 +76,7 @@ build_test(unit_tests_lib EndpointTest.cpp MessageTest.cpp MessageSerializerTest.cpp + IntegrationTest.cpp LookupTaskTest.cpp StoreValueTaskTest.cpp FindValueTaskTest.cpp diff --git a/test/unit_tests/EngineTest.cpp b/test/unit_tests/EngineTest.cpp index e3f15d9..daf82d9 100644 --- a/test/unit_tests/EngineTest.cpp +++ b/test/unit_tests/EngineTest.cpp @@ -108,8 +108,6 @@ TEST(EngineTest, two_engines_can_save_and_load) throw std::runtime_error{ "Unexpected data" }; }; e2->asyncLoad("key", on_load); - - EXPECT_GT(io_service.poll(), 0); } } diff --git a/test/unit_tests/FindValueTaskTest.cpp b/test/unit_tests/FindValueTaskTest.cpp index 6eb5fdd..a3abc0b 100644 --- a/test/unit_tests/FindValueTaskTest.cpp +++ b/test/unit_tests/FindValueTaskTest.cpp @@ -92,7 +92,8 @@ TEST_F(FindValueTaskTest, can_notify_error_when_unique_peer_fails_to_respond) // Task asked p1 for a closer peer or the value. kd::FindValueRequestBody const fv{ searched_key }; - EXPECT_TRUE(tracker_.has_sent_message(p1.endpoint_, fv)); + for (int i = 0; i < kd::MAX_FIND_PEER_ATTEMPT_COUNT; ++i) + EXPECT_TRUE(tracker_.has_sent_message(p1.endpoint_, fv)); // Task didn't send any more message. EXPECT_TRUE(! tracker_.has_sent_message()); @@ -121,8 +122,11 @@ TEST_F(FindValueTaskTest, can_notify_error_when_all_peers_fail_to_respond) // Task asked p1 & p2 for a closer peer or the value. kd::FindValueRequestBody const fv{ searched_key }; - EXPECT_TRUE(tracker_.has_sent_message(p1.endpoint_, fv)); - EXPECT_TRUE(tracker_.has_sent_message(p2.endpoint_, fv)); + for (int i = 0; i < kd::MAX_FIND_PEER_ATTEMPT_COUNT; ++i) + { + EXPECT_TRUE(tracker_.has_sent_message(p1.endpoint_, fv)); + EXPECT_TRUE(tracker_.has_sent_message(p2.endpoint_, fv)); + } // Task didn't send any more message. EXPECT_TRUE(! tracker_.has_sent_message()); diff --git a/test/unit_tests/IntegrationTest.cpp b/test/unit_tests/IntegrationTest.cpp new file mode 100644 index 0000000..10d6df7 --- /dev/null +++ b/test/unit_tests/IntegrationTest.cpp @@ -0,0 +1,229 @@ +// Copyright (c) 2013-2014, David Keller +// All rights reserved. +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of the University of California, Berkeley nor the +// names of its contributors may be used to endorse or promote products +// derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY DAVID KELLER AND CONTRIBUTORS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +#include "kademlia/endpoint.hpp" +#include "kademlia/Session.h" +#include "kademlia/error.hpp" +#include "kademlia/detail/Util.h" +#include "Poco/Timestamp.h" +#include "Poco/Timespan.h" +#include "Poco/Thread.h" +#include "Poco/ThreadPool.h" +#include "Poco/LogStream.h" +#include "gtest/gtest.h" +#include "TaskFixture.h" + +namespace k = kademlia; +namespace kd = kademlia::detail; +using Poco::Thread; +using Poco::ThreadPool; +using Poco::Timestamp; +using Poco::Timespan; +using Poco::Logger; +using Poco::LogStream; +using Poco::Net::SocketAddress; +using Session = Kademlia::Session; + +namespace { + + +//struct IntegrationTest : k::test::TaskFixture +//{ + /*IntegrationTest(): k::test::TaskFixture() + { + LOG_DEBUG(IntegrationTest, this) << "create IntegrationTest." << std::endl; + }*/ + + std::atomic _saved = 0, _loaded = 0, _errors = 0; + std::atomic _savedBytes = 0, _loadedBytes = 0; + std::atomic _saveTime = 0, _loadTime = 0; + + int randomSession(int maxID) + { + std::random_device rd; + std::default_random_engine eng(rd()); + std::uniform_int_distribution distr(0, maxID); + return distr(eng); + } + + template + void load(S& session, std::string const& key) + { + std::vector key_vec(key.begin(), key.end()); + Timestamp ts; + auto on_load = [key, ts](std::error_code const& error, Session::DataType const& data) + { + ++_loaded; + if (error) + { + ++_errors; + std::cerr << "Failed to load \"" << key << "\", error: " << error.message() << std::endl; + } + else + { + Timespan elapsed = Timestamp() - ts; + _loadTime += elapsed.microseconds(); + _loadedBytes += static_cast(data.size()); + //std::cout << "Loaded \"" << key << "\" (" << data.size() << " bytes) in " << elapsed.microseconds() << " us" << std::endl; + } + }; + ts.update(); + session.asyncLoad(key_vec, std::move(on_load)); + //std::cout << "Async loading \"" << key << "\" ..." << std::endl; + } + + template + void save(S& session, std::string const& key, std::string const& val) + { + std::vector key_vec(key.begin(), key.end()); + std::vector val_vec(val.begin(), val.end()); + std::size_t sz = val.size(); + Timestamp ts; + auto on_save = [key, ts, sz](const std::error_code& error) + { + ++_saved; + if (error) + { + ++_errors; + std::cerr << "Failed to save \"" << key << "\", error: " << error.message() << std::endl; + } + else + { + Timespan elapsed = Timestamp() - ts; + _saveTime += elapsed.microseconds(); + _savedBytes += static_cast(sz); + //std::cout << "Saved \"" << key << "\" (" << sz << " bytes) in " << elapsed.microseconds() << " us" << std::endl; + } + }; + ts.update(); + session.asyncSave(key_vec, std::move(val_vec), std::move(on_save)); + //std::cout << "Async saving \"" << key << /*"\": \"" << val <<*/ "\"" << std::endl; + } + + template + void abortSession(S& sess) + { + // Stop the session loop + sess.abort(); + + // Wait for the session termination + auto failure = sess.wait(); + if (failure != k::RUN_ABORTED) + std::cerr << failure.message() << std::endl; + } +//}; + +TEST(IntegrationTest, integrationTest) +{ + try + { + ThreadPool& pool = ThreadPool::defaultPool(); + + std::string bootAddr4 = "0.0.0.0"; + std::string bootAddr6 = "::"; + std::uint16_t bootPort4 = kd::getAvailablePort(SocketAddress::IPv4); + std::uint16_t bootPort6 = kd::getAvailablePort(SocketAddress::IPv6); + + int peers = 3; + pool.addCapacity(peers * 2); + int chunks = 24, chunkSize = 50000; + + std::vector sessions; + sessions.push_back(new Session{ k::endpoint{bootAddr4, bootPort4}, k::endpoint{bootAddr6, bootPort6} }); + //std::cout << "bootstrap session listening on " << bootAddr4 << ':' << bootPort4 << ", " << + // '[' << bootAddr6 << "]:" << bootPort6 << std::endl; + + uint16_t sessPort4 = kd::getAvailablePort(SocketAddress::IPv4, bootPort4 + 1); + uint16_t sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, bootPort6 + 1); + + for (int i = 0; i < peers; ++i) + { + Session* pSession = new Session{ k::endpoint{ "127.0.0.1", bootPort4 } + , k::endpoint{ "127.0.0.1", sessPort4 } + , k::endpoint{ "::1", sessPort6} }; + sessions.push_back(pSession); + //std::cout << "peer session connected to 127.0.0.1:" << bootPort4 << + // ", listening on 127.0.0.1:" << sessPort4 << ", " << + // "[::1]:" << sessPort6 << std::endl; + sessPort4 = kd::getAvailablePort(SocketAddress::IPv4, ++sessPort4); + sessPort6 = kd::getAvailablePort(SocketAddress::IPv6, ++sessPort6); + } + + int i = 0; + for (; i < chunks; ++i) + { + std::string k("k"), v; + k += std::to_string(i); + v += std::to_string(i); + v.resize(chunkSize); + save(*sessions[randomSession(peers - 1)], k, v); + } + // wait for all save ops to complete + while (_saved < i) Thread::sleep(10); + + _loaded = 0; + i = 0; + for (; i < chunks; ++i) + { + std::string k("k"); + k += std::to_string(i); + load(*sessions[randomSession(peers - 1)], k); + } + // wait for all load ops to complete + while (_loaded < i) Thread::sleep(10); + + for (auto& pS : sessions) + { + abortSession(*pS); + delete pS; + } + + std::cout << std::endl << "Summary\n=======\n" << peers << " peers, " << + chunks << " chunks of " << chunkSize << " bytes\n" + "saved " << _savedBytes << " bytes, loaded " << _loadedBytes << " bytes\n" + "Save time:\t" << float(_saveTime) / 1000 << " [ms]\n" + "Load time:\t" << float(_loadTime) / 1000 << " [ms]\n" + "Total time:\t" << (float(_saveTime) + float(_loadTime)) / 1000 << " [ms]" << std::endl; + + EXPECT_EQ(_errors, 0); + EXPECT_EQ(_savedBytes, chunks*chunkSize); + EXPECT_EQ(_savedBytes, _loadedBytes); + } + catch (std::exception& ex) + { + std::cerr << ex.what() << std::endl; + EXPECT_TRUE(false); + } + catch (...) + { + std::cerr << "unknown exception" << std::endl; + EXPECT_TRUE(false); + } +} + + +} + diff --git a/test/unit_tests/LookupTaskTest.cpp b/test/unit_tests/LookupTaskTest.cpp index 5468ee2..743be48 100644 --- a/test/unit_tests/LookupTaskTest.cpp +++ b/test/unit_tests/LookupTaskTest.cpp @@ -27,6 +27,7 @@ #include "common.hpp" #include "PeerFactory.h" #include "kademlia/id.hpp" +#include "kademlia/constants.hpp" #include "kademlia/LookupTask.h" #include "gtest/gtest.h" #include @@ -43,7 +44,7 @@ struct test_task : kd::LookupTask { test_task (kd::id const& key , Iterator i, Iterator e) - : LookupTask{ key, i, e } + : LookupTask{ key, i, e, SocketAddress("127.0.0.1", 1234), SocketAddress("::1", 1234) } { } }; @@ -101,7 +102,8 @@ TEST(LookupTaskTest, can_select_candidates) EXPECT_EQ(kd::id{ "1" }, valid_candidates[ 0 ].id_); EXPECT_TRUE(! c.have_all_requests_completed()); - c.flag_candidate_as_invalid(kd::id{ "2" }); + for (int i = 0; i < kd::MAX_FIND_PEER_ATTEMPT_COUNT; ++i) + c.flag_candidate_as_invalid(kd::id{ "2" }); EXPECT_TRUE(c.have_all_requests_completed()); closest_candidates = c.select_new_closest_candidates(2); diff --git a/test/unit_tests/StoreValueTaskTest.cpp b/test/unit_tests/StoreValueTaskTest.cpp index 718e909..8a2e631 100644 --- a/test/unit_tests/StoreValueTaskTest.cpp +++ b/test/unit_tests/StoreValueTaskTest.cpp @@ -83,14 +83,15 @@ TEST_F(StoreValueTaskTest, CanNotifyErrorWhenUniquePeerFailsToRespond) auto p1 = create_and_add_peer("192.168.1.1", kd::id{"b"}); kd::start_store_value_task(chosen_key, std::move(data), tracker_, routing_table_, std::ref(*this)); - io_service_.poll(); + while (!io_service_.poll()); // Task queried routing table to find closest known peers. EXPECT_EQ(1, routing_table_.find_call_count_); // Task asked p1 for a closer peer. kd::FindPeerRequestBody const fv{chosen_key}; - EXPECT_TRUE(tracker_.has_sent_message(p1.endpoint_, fv)); + for (int i = 0; i < kd::MAX_FIND_PEER_ATTEMPT_COUNT; ++i) + EXPECT_TRUE(tracker_.has_sent_message(p1.endpoint_, fv)); // Task didn't send any more message. EXPECT_TRUE(! tracker_.has_sent_message()); @@ -114,15 +115,18 @@ TEST_F(StoreValueTaskTest, CanNotifyErrorWhenAllPeersFailToRespond) , tracker_ , routing_table_ , std::ref(*this)); - io_service_.poll(); + while (!io_service_.poll()); // Task queried routing table to find closest known peers. EXPECT_EQ(1, routing_table_.find_call_count_); // Task asked p1 & p2 for a closer peer. kd::FindPeerRequestBody const fv{chosen_key}; - EXPECT_TRUE(tracker_.has_sent_message(p1.endpoint_, fv)); - EXPECT_TRUE(tracker_.has_sent_message(p2.endpoint_, fv)); + for (int i = 0; i < kd::MAX_FIND_PEER_ATTEMPT_COUNT; ++i) + { + EXPECT_TRUE(tracker_.has_sent_message(p1.endpoint_, fv)); + EXPECT_TRUE(tracker_.has_sent_message(p2.endpoint_, fv)); + } // Task didn't send any more message. EXPECT_TRUE(! tracker_.has_sent_message()); diff --git a/test/unit_tests/TrackerMock.h b/test/unit_tests/TrackerMock.h index 08423f9..d3b36d7 100644 --- a/test/unit_tests/TrackerMock.h +++ b/test/unit_tests/TrackerMock.h @@ -66,7 +66,6 @@ class TrackerMock auto const c = sent_messages_.front(); sent_messages_.pop(); auto const m = message_serializer_.serialize(message, detail::id{}); - return c.endpoint == endpoint && c.message == m; } @@ -116,6 +115,16 @@ class TrackerMock save_sent_message(r, e); } + Poco::Net::SocketAddress addressV4() + { + return Poco::Net::SocketAddress();// network_.addressV4(); + } + + Poco::Net::SocketAddress addressV6() + { + return Poco::Net::SocketAddress();// network_.addressV6(); + } + private: struct sent_message final {