diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index bf3f92afb1c..5e400be1c6a 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -9,8 +9,7 @@ #include #include -#include -#include +#include #include #include #include @@ -151,8 +150,8 @@ int Server::main(const std::vector & /*args*/) = global_context.getConfigRef().getUInt("keeper.raft_settings.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS); /// start server - std::shared_ptr> server; - std::shared_ptr> conn_acceptor; + AsyncSocketReactorPtr server; + std::shared_ptr> conn_acceptor; int32_t port = config().getInt("keeper.port", 8101); auto cpu_core_size = getNumberOfPhysicalCPUCores(); @@ -167,17 +166,17 @@ int Server::main(const std::vector & /*args*/) socket.setBlocking(false); Poco::Timespan timeout(operation_timeout_ms * 1000); - server = std::make_shared>(timeout, "IO-Acptr"); + server = std::make_shared(timeout, "IO-Acptr"); /// TODO add io thread count to config - conn_acceptor = std::make_shared>( - "IO-Hdlr", global_context, socket, *server, timeout, cpu_core_size); + conn_acceptor + = std::make_shared>("IO-Hdlr", global_context, socket, server, timeout, cpu_core_size); LOG_INFO(log, "Listening for user connections on {}", socket.address().toString()); }); /// start forwarding server - std::shared_ptr> forwarding_server; - std::shared_ptr> forwarding_conn_acceptor; + AsyncSocketReactorPtr forwarding_server; + std::shared_ptr> forwarding_conn_acceptor; int32_t forwarding_port = config().getInt("keeper.forwarding_port", 8102); createServer( @@ -190,11 +189,11 @@ int Server::main(const std::vector & /*args*/) socket.setBlocking(false); Poco::Timespan timeout(operation_timeout_ms * 1000); - forwarding_server = std::make_shared>(timeout, "IO-FwdAcptr"); + forwarding_server = std::make_shared(timeout, "IO-FwdAcptr"); /// TODO add io thread count to config - forwarding_conn_acceptor = std::make_shared>( - "IO-FwdHdlr", global_context, socket, *forwarding_server, timeout, cpu_core_size); + forwarding_conn_acceptor = std::make_shared>( + "IO-FwdHdlr", global_context, socket, forwarding_server, timeout, cpu_core_size); LOG_INFO(log, "Listening for forwarding connections on {}", socket.address().toString()); }); diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index f3ec68b27f8..2e3659e215a 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -86,6 +86,9 @@ M(74, CANNOT_PARSE_ELF) \ M(75, CANNOT_STAT) \ M(76, NETLINK_ERROR) \ + M(78, EPOLL_CTL) \ + M(79, EPOLL_CREATE) \ + M(80, EPOLL_WAIT) \ \ M(102, KEEPER_EXCEPTION) \ M(103, POCO_EXCEPTION) \ @@ -94,13 +97,13 @@ M(106, INVALID_LOG_LEVEL) \ M(107, UNEXPECTED_ZOOKEEPER_ERROR) \ M(108, UNEXPECTED_NODE_IN_ZOOKEEPER) \ - M(109, RAFT_FORWARDING_ERROR) \ + M(109, RAFT_FORWARD_ERROR) \ M(110, CANNOT_PTHREAD_ATTR) \ M(111, UNEXPECTED_FORWARD_PACKET) \ M(112, RAFT_IS_LEADER) \ M(113, RAFT_NO_LEADER) \ M(114, RAFT_FWD_NO_CONN) \ - M(115, FORWARDING_DISCONNECTED) \ + M(115, FORWARD_NOT_CONNECTED) \ M(116, ILLEGAL_SETTING_VALUE) \ /* See END */ @@ -126,7 +129,7 @@ namespace ErrorCodes } } error_codes_names; - std::string_view getName(ErrorCode error_code) + [[maybe_unused]] std::string_view getName(ErrorCode error_code) { if (error_code >= END) return std::string_view(); diff --git a/src/Common/ErrorCodes.h b/src/Common/ErrorCodes.h index 877fc289f94..789440e0817 100644 --- a/src/Common/ErrorCodes.h +++ b/src/Common/ErrorCodes.h @@ -22,7 +22,7 @@ namespace ErrorCodes /// Get name of error_code by identifier. /// Returns statically allocated string. - std::string_view getName(ErrorCode error_code); + [[maybe_unused]] std::string_view getName(ErrorCode error_code); /// ErrorCode identifier -> current value of error_code. extern std::atomic values[]; diff --git a/src/Common/NIO/Notification.h b/src/Common/NIO/Notification.h new file mode 100644 index 00000000000..9795f513951 --- /dev/null +++ b/src/Common/NIO/Notification.h @@ -0,0 +1,27 @@ +/** +* Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. and Contributors. +* SPDX-License-Identifier: BSL-1.0 +*/ +#pragma once + +#include +#include + + +namespace RK +{ + +/// The base class for all notification classes. +class Notification +{ +public: + Notification() = default; + virtual std::string name() const { return typeid(*this).name(); } + +protected: + virtual ~Notification() = default; +}; + +using NotificationPtr = std::shared_ptr; + +} diff --git a/src/Common/NIO/NotificationCenter.cpp b/src/Common/NIO/NotificationCenter.cpp index b97281fed94..2a3697557d1 100644 --- a/src/Common/NIO/NotificationCenter.cpp +++ b/src/Common/NIO/NotificationCenter.cpp @@ -18,24 +18,24 @@ namespace RK bool NotificationCenter::addObserverIfNotExist(const AbstractObserver & observer) { - Mutex::ScopedLock lock(_mutex); - for (const auto & p : _observers) + Mutex::ScopedLock lock(mutex); + for (const auto & p : observers) if (observer.equals(*p)) return false; - _observers.push_back(observer.clone()); + observers.push_back(observer.clone()); return true; } bool NotificationCenter::removeObserverIfExist(const AbstractObserver & observer) { - Mutex::ScopedLock lock(_mutex); - for (auto it = _observers.begin(); it != _observers.end(); ++it) + Mutex::ScopedLock lock(mutex); + for (auto it = observers.begin(); it != observers.end(); ++it) { if (observer.equals(**it)) { (*it)->disable(); - _observers.erase(it); + observers.erase(it); return true; } } @@ -45,8 +45,8 @@ bool NotificationCenter::removeObserverIfExist(const AbstractObserver & observer bool NotificationCenter::hasObserver(const AbstractObserver & observer) const { - Mutex::ScopedLock lock(_mutex); - for (const auto & p : _observers) + Mutex::ScopedLock lock(mutex); + for (const auto & p : observers) if (observer.equals(*p)) return true; @@ -56,49 +56,51 @@ bool NotificationCenter::hasObserver(const AbstractObserver & observer) const bool NotificationCenter::onlyHas(const AbstractObserver & observer) const { - Mutex::ScopedLock lock(_mutex); - return _observers.size() == 1 && observer.equals(*_observers[0]); + Mutex::ScopedLock lock(mutex); + return observers.size() == 1 && observer.equals(*observers[0]); } -bool NotificationCenter::accept(Poco::Notification * pNotification) const +bool NotificationCenter::accept(const Notification & notification) const { - Mutex::ScopedLock lock(_mutex); - for (const auto & observer : _observers) + Mutex::ScopedLock lock(mutex); + for (const auto & observer : observers) { - if (observer->accepts(pNotification)) + if (observer->accepts(notification)) return true; } return false; } -void NotificationCenter::postNotification(Notification::Ptr pNotification) +void NotificationCenter::postNotification(const Notification & notification) { - poco_check_ptr(pNotification); - - Poco::ScopedLockWithUnlock lock(_mutex); - ObserverList copied(_observers); - lock.unlock(); - - for (auto & p : copied) + Observers copied; { - p->notify(pNotification); + Mutex::ScopedLock lock(mutex); + for (auto & observer : observers) + { + if (observer->accepts(notification)) + copied.push_back(observer); + } } + + for (auto & observer : copied) + observer->notify(notification); } bool NotificationCenter::hasObservers() const { - Mutex::ScopedLock lock(_mutex); - return !_observers.empty(); + Mutex::ScopedLock lock(mutex); + return !observers.empty(); } -std::size_t NotificationCenter::countObservers() const +std::size_t NotificationCenter::size() const { - Mutex::ScopedLock lock(_mutex); - return _observers.size(); + Mutex::ScopedLock lock(mutex); + return observers.size(); } } diff --git a/src/Common/NIO/NotificationCenter.h b/src/Common/NIO/NotificationCenter.h index 90398a64ad6..bcafe8b51df 100644 --- a/src/Common/NIO/NotificationCenter.h +++ b/src/Common/NIO/NotificationCenter.h @@ -1,12 +1,3 @@ -// -// NotificationCenter.h -// -// Library: Foundation -// Package: Notifications -// Module: NotificationCenter -// -// Definition of the NotificationCenter class. -// // Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH. // and Contributors. // @@ -18,20 +9,16 @@ #include #include -#include #include #include -#include #include -#include +#include namespace RK { - -class Foundation_API NotificationCenter /// A NotificationCenter is essentially a notification dispatcher. /// It notifies all observers of notifications meeting specific criteria. /// This information is encapsulated in Notification objects. @@ -40,102 +27,56 @@ class Foundation_API NotificationCenter /// posts an appropriate notification to the notification center. The notification /// center invokes the registered method on each matching observer, passing the notification /// as argument. -/// -/// The order in which observers receive notifications is undefined. -/// It is possible for the posting object and the observing object to be the same. -/// The NotificationCenter delivers notifications to observers synchronously. -/// In other words the postNotification() method does not return until all observers have -/// received and processed the notification. -/// If an observer throws an exception while handling a notification, the NotificationCenter -/// stops dispatching the notification and postNotification() rethrows the exception. -/// -/// In a multithreaded scenario, notifications are always delivered in the thread in which the -/// notification was posted, which may not be the same thread in which an observer registered itself. -/// -/// The NotificationCenter class is basically a C++ implementation of the NSNotificationCenter class -/// found in Apple's Cocoa (or OpenStep). -/// -/// While handling a notification, an observer can unregister itself from the notification center, -/// or it can register or unregister other observers. Observers added during a dispatch cycle -/// will not receive the current notification. -/// -/// The method receiving the notification must be implemented as -/// void handleNotification(MyNotification* pNf); -/// The handler method gets co-ownership of the Notification object -/// and must release it when done. This is best done with an AutoPtr: -/// void MyClass::handleNotification(MyNotification* pNf) -/// { -/// AutoPtr nf(pNf); -/// ... -/// } -/// -/// Alternatively, the NObserver class template can be used to register a callback -/// method. In this case, the callback method receives the Notification in an -/// AutoPtr and thus does not have to deal with object ownership issues: -/// void MyClass::handleNotification(const AutoPtr& pNf) -/// { -/// ... -/// } +class NotificationCenter { public: using Mutex = Poco::Mutex; - using Notification = Poco::Notification; - using AbstractObserver = Poco::AbstractObserver; NotificationCenter() = default; - /// Creates the NotificationCenter. ~NotificationCenter() = default; - /// Destroys the NotificationCenter. - bool addObserverIfNotExist(const AbstractObserver & observer); /// Registers an observer with the NotificationCenter. /// Usage: /// Observer obs(*this, &MyClass::handleNotification); /// notificationCenter.addObserver(obs); /// - /// Alternatively, the NObserver template class can be used instead of Observer. + /// Alternatively, the Observer template class can be used instead of Observer. + bool addObserverIfNotExist(const AbstractObserver & observer); - bool removeObserverIfExist(const AbstractObserver & observer); /// Unregisters an observer with the NotificationCenter. + bool removeObserverIfExist(const AbstractObserver & observer); - bool hasObserver(const AbstractObserver & observer) const; /// Returns true if the observer is registered with this NotificationCenter. + bool hasObserver(const AbstractObserver & observer) const; - bool onlyHas(const AbstractObserver & observer) const; /// Returns true if only has the given observer + bool onlyHas(const AbstractObserver & observer) const; - void postNotification(Notification::Ptr pNotification); /// Posts a notification to the NotificationCenter. /// The NotificationCenter then delivers the notification /// to all interested observers. /// If an observer throws an exception, dispatching terminates /// and the exception is rethrown to the caller. - /// Ownership of the notification object is claimed and the - /// notification is released before returning. Therefore, - /// a call like - /// notificationCenter.postNotification(new MyNotification); - /// does not result in a memory leak. + void postNotification(const Notification & notification); - bool hasObservers() const; /// Returns true iff there is at least one registered observer. /// /// Can be used to improve performance if an expensive notification /// shall only be created and posted if there are any observers. + bool hasObservers() const; - std::size_t countObservers() const; /// Returns the number of registered observers. + std::size_t size() const; - bool accept(Poco::Notification * pNotification) const; + bool accept(const Notification & notification) const; private: using AbstractObserverPtr = Poco::SharedPtr; - using ObserverList = std::vector; - using EventSet = std::multiset; + using Observers = std::vector; - mutable Poco::Mutex _mutex; - /// All observers - ObserverList _observers; + mutable Poco::Mutex mutex; + Observers observers; /// All observers }; diff --git a/src/Common/NIO/Observer.h b/src/Common/NIO/Observer.h new file mode 100644 index 00000000000..5efa038324b --- /dev/null +++ b/src/Common/NIO/Observer.h @@ -0,0 +1,104 @@ +/** +* Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. and Contributors. +* SPDX-License-Identifier: BSL-1.0 +*/ +#pragma once + +#include + +namespace RK +{ + +/// The base class for all instantiations of +/// the Observer template classes. +class AbstractObserver +{ +public: + AbstractObserver() = default; + AbstractObserver(const AbstractObserver &) = default; + virtual ~AbstractObserver() = default; + + AbstractObserver & operator=(const AbstractObserver &) = default; + + virtual void notify(const Notification & notification) const = 0; + virtual bool accepts(const Notification & notification) const = 0; + + virtual bool equals(const AbstractObserver & observer) const = 0; + virtual AbstractObserver * clone() const = 0; + + virtual void disable() = 0; +}; + + +/// This template class implements an adapter that sits between +/// a NotificationCenter and an object receiving notifications +/// from it. It is quite similar in concept to the +/// RunnableAdapter, but provides some NotificationCenter +/// specific additional methods. +/// See the NotificationCenter class for information on how +/// to use this template class. +/// +/// This class template is quite similar to the Observer class +/// template. The only difference is that the Observer +/// expects the callback function to accept a const std::shared_ptr& +/// instead of a plain pointer as argument, thus simplifying memory +/// management. +template +class Observer : public AbstractObserver +{ +public: + using Callback = void (C::*)(const Notification &); + + Observer(C & object_, Callback method_) : object(&object_), method(method_) { } + Observer(const Observer & observer) : AbstractObserver(observer), object(observer.object), method(observer.method) { } + + ~Observer() override = default; + + Observer & operator=(const Observer & observer) + { + if (&observer != this) + { + object = observer.object; + method = observer.method; + } + return *this; + } + + void notify(const Notification & notification) const override + { + if (const N * casted = dynamic_cast(¬ification)) + { + Poco::Mutex::ScopedLock lock(mutex); + if (object) + { + (object->*method)(*casted); + } + } + } + + bool equals(const AbstractObserver & other) const override + { + const Observer * casted = dynamic_cast(&other); + return casted && casted->object == object && casted->method == method; + } + + bool accepts(const Notification & notification) const override { return dynamic_cast(¬ification); } + + AbstractObserver * clone() const override { return new Observer(*this); } + + void disable() override + { + Poco::Mutex::ScopedLock lock(mutex); + object = nullptr; + } + +private: + Observer() = default; + + C * object; + Callback method; + + mutable Poco::Mutex mutex; +}; + +} diff --git a/src/Common/NIO/PollSet.cpp b/src/Common/NIO/PollSet.cpp index 3d0ff82c5a7..a5916f2bfe1 100644 --- a/src/Common/NIO/PollSet.cpp +++ b/src/Common/NIO/PollSet.cpp @@ -4,502 +4,341 @@ * */ #include -#include -#include +#include +#include + +#include #include +#include #include #include -#include -#include -#include - - -#if defined(POCO_HAVE_FD_EPOLL) -#include -#include -#elif defined(POCO_HAVE_FD_POLL) -#include -#endif +#include +#include +#include using Poco::Net::SocketImpl; -namespace RK { +namespace RK +{ -#if defined(POCO_HAVE_FD_EPOLL) +namespace ErrorCodes +{ + extern const int EPOLL_ERROR; + extern const int EPOLL_CTL; + extern const int EPOLL_CREATE; + extern const int EPOLL_WAIT; + extern const int LOGICAL_ERROR; +} +namespace +{ + /// Return local address string for server socket, or return remote address for stream socket. + std::string getAddressName(const Socket & sock) + { + return sock.isStream() ? sock.peerAddress().toString() : sock.address().toString(); + } +} -// -// Linux implementation using epoll -// +/// PollSet implementation with epoll class PollSetImpl { -private: - Poco::Logger * log; public: - PollSetImpl(): _epollfd(epoll_create(1)), - _events(1024), - _eventfd(eventfd(0, EFD_NONBLOCK)) - { - log = &Poco::Logger::get("PollSet"); - int err = addImpl(_eventfd, PollSet::POLL_READ, log); - if ((err) || (_epollfd < 0)) - { - errno; - } - } + PollSetImpl(); + ~PollSetImpl(); - ~PollSetImpl() - { - if (_epollfd >= 0) ::close(_epollfd); - } + void add(const Socket & socket, int mode); + void remove(const Socket & socket); - void add(const Socket& socket, int mode) - { - Poco::FastMutex::ScopedLock lock(_mutex); + bool has(const Socket & socket) const; + bool empty() const; - SocketImpl* sockImpl = socket.impl(); + void update(const Socket & socket, int mode); + void clear(); - int err = addImpl(sockImpl->sockfd(), mode, sockImpl); + PollSet::SocketModeMap poll(const Poco::Timespan & timeout); - if (err) - { - if (errno == EEXIST) update(socket, mode); - else errno; - } + void wakeUp(); + int count() const; - if (_socketMap.find(sockImpl) == _socketMap.end()) - _socketMap[sockImpl] = socket; - } +private: + int addImpl(int fd, int mode, void * data); - void remove(const Socket& socket) - { - Poco::FastMutex::ScopedLock lock(_mutex); + mutable Poco::FastMutex mutex; - poco_socket_t fd = socket.impl()->sockfd(); - struct epoll_event ev; - ev.events = 0; - ev.data.ptr = nullptr; - int err = epoll_ctl(_epollfd, EPOLL_CTL_DEL, fd, &ev); - if (err) errno; + /// Monitored epoll events + std::map socket_map; - _socketMap.erase(socket.impl()); - } + /// epoll fd + int epoll_fd; - bool has(const Socket& socket) const - { - Poco::FastMutex::ScopedLock lock(_mutex); - SocketImpl* sockImpl = socket.impl(); - return sockImpl && - (_socketMap.find(sockImpl) != _socketMap.end()); - } + /// Monitored epoll events + std::vector events; - bool empty() const - { - Poco::FastMutex::ScopedLock lock(_mutex); - return _socketMap.empty(); - } + /// Only used to wake up poll set by writing 8 bytes. + int waking_up_fd; - void update(const Socket& socket, int mode) - { - poco_socket_t fd = socket.impl()->sockfd(); - struct epoll_event ev; - ev.events = 0; - if (mode & PollSet::POLL_READ) - ev.events |= EPOLLIN; - if (mode & PollSet::POLL_WRITE) - ev.events |= EPOLLOUT; - if (mode & PollSet::POLL_ERROR) - ev.events |= EPOLLERR; - ev.data.ptr = socket.impl(); - int err = epoll_ctl(_epollfd, EPOLL_CTL_MOD, fd, &ev); - if (err) - { - errno; - } - } + Poco::Logger * log; +}; - void clear() - { - Poco::FastMutex::ScopedLock lock(_mutex); - ::close(_epollfd); - _socketMap.clear(); - _epollfd = epoll_create(1); - if (_epollfd < 0) - { - errno; - } - } - - PollSet::SocketModeMap poll(const Poco::Timespan& timeout) +PollSetImpl::PollSetImpl() + : epoll_fd(epoll_create(1)), events(1024), waking_up_fd(eventfd(0, EFD_NONBLOCK)), log(&Poco::Logger::get("PollSet")) +{ + /// Monitor waking up fd, use this as waking up event marker. + int err = addImpl(waking_up_fd, PollSet::POLL_READ, this); + if ((err) || (epoll_fd < 0)) { - PollSet::SocketModeMap result; - Poco::Timespan remainingTime(timeout); - int rc; - do - { - Poco::Timestamp start; - rc = epoll_wait(_epollfd, &_events[0], _events.size(), remainingTime.totalMilliseconds()); - if (rc == 0) return result; - if (rc < 0 && errno == POCO_EINTR) - { - Poco::Timestamp end; - Poco::Timespan waited = end - start; - if (waited < remainingTime) - remainingTime -= waited; - else - remainingTime = 0; - LOG_TRACE(log, "Poll wait encounter error EINTR {}ms", remainingTime.totalMilliseconds()); - } - } - while (rc < 0 && errno == POCO_EINTR); - if (rc < 0) errno; + throwFromErrno("Error when initializing poll set", ErrorCodes::EPOLL_ERROR, errno); + } +} - Poco::FastMutex::ScopedLock lock(_mutex); - for (int i = 0; i < rc; i++) - { - if (_events[i].data.ptr == log) - { - /// read char eventfd - uint64_t val; - auto n = ::read(_eventfd, &val, sizeof(val)); - LOG_TRACE(log, "Poll wakeup {} {} {} {}", Poco::Thread::current() ? Poco::Thread::current()->name() : "main", _eventfd, n, errno); - if (n < 0) errno; - } - else if (_events[i].data.ptr) - { - std::map::iterator it = _socketMap.find(static_cast(_events[i].data.ptr)); - if (it != _socketMap.end()) - { - if (_events[i].events & EPOLLIN) - result[it->second] |= PollSet::POLL_READ; - if (_events[i].events & EPOLLOUT) - result[it->second] |= PollSet::POLL_WRITE; - if (_events[i].events & EPOLLERR) - result[it->second] |= PollSet::POLL_ERROR; - } - } - else - { - LOG_ERROR(log, "Poll receive null data socket event {}", static_cast(_events[i].events)); - } - } +PollSetImpl::~PollSetImpl() +{ + if (epoll_fd >= 0) + ::close(epoll_fd); +} - return result; - } +void PollSetImpl::add(const Socket & socket, int mode) +{ + Poco::FastMutex::ScopedLock lock(mutex); + SocketImpl * socket_impl = socket.impl(); + int err = addImpl(socket_impl->sockfd(), mode, socket_impl); - void wakeUp() + if (err) { - uint64_t val = 1; - int n = ::write(_eventfd, &val, sizeof(val)); - LOG_TRACE(log, "Poll trigger wakeup {} {} {}", Poco::Thread::current() ? Poco::Thread::current()->name() : "main", _eventfd, n); - if (n < 0) errno; + if (errno == EEXIST) + update(socket, mode); + else + throwFromErrno("Error when updating epoll event to " + getAddressName(socket), ErrorCodes::EPOLL_CTL, errno); } - int count() const - { - Poco::FastMutex::ScopedLock lock(_mutex); - return static_cast(_socketMap.size()); - } + if (socket_map.find(socket_impl) == socket_map.end()) + socket_map[socket_impl] = socket; +} -private: - int addImpl(int fd, int mode, void* ptr) - { - struct epoll_event ev; - ev.events = 0; - if (mode & PollSet::POLL_WRITE) - ev.events |= EPOLLOUT; - if (mode & PollSet::POLL_ERROR) - ev.events |= EPOLLERR; - if (mode & PollSet::POLL_READ) - ev.events |= EPOLLIN; - ev.data.ptr = ptr; - return epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev); - } +int PollSetImpl::addImpl(int fd, int mode, void * data) +{ + struct epoll_event ev; + ev.events = 0; + if (mode & PollSet::POLL_WRITE) + ev.events |= EPOLLOUT; + if (mode & PollSet::POLL_ERROR) + ev.events |= EPOLLERR; + if (mode & PollSet::POLL_READ) + ev.events |= EPOLLIN; + ev.data.ptr = data; + return epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev); +} - mutable Poco::FastMutex _mutex; - int _epollfd; - std::map _socketMap; - std::vector _events; - int _eventfd; -}; +void PollSetImpl::remove(const Socket & socket) +{ + Poco::FastMutex::ScopedLock lock(mutex); + poco_socket_t fd = socket.impl()->sockfd(); + struct epoll_event ev; + ev.events = 0; + ev.data.ptr = nullptr; + int err = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &ev); + if (err) + throwFromErrno("Error when updating epoll event to " + getAddressName(socket), ErrorCodes::EPOLL_CTL, errno); -#elif defined(POCO_HAVE_FD_POLL) + socket_map.erase(socket.impl()); +} +bool PollSetImpl::has(const Socket & socket) const +{ + Poco::FastMutex::ScopedLock lock(mutex); + SocketImpl * socket_impl = socket.impl(); + return socket_impl && (socket_map.find(socket_impl) != socket_map.end()); +} -// -// BSD/Windows implementation using poll/WSAPoll -// -class PollSetImpl +bool PollSetImpl::empty() const { -public: - PollSetImpl() - { - pollfd fd{_pipe.readHandle(), POLLIN, 0}; - _pollfds.push_back(fd); - } + Poco::FastMutex::ScopedLock lock(mutex); + return socket_map.empty(); +} - ~PollSetImpl() - { - _pipe.close(); - } +void PollSetImpl::update(const Socket & socket, int mode) +{ + poco_socket_t fd = socket.impl()->sockfd(); + struct epoll_event ev; + ev.events = 0; + + if (mode & PollSet::POLL_READ) + ev.events |= EPOLLIN; + if (mode & PollSet::POLL_WRITE) + ev.events |= EPOLLOUT; + if (mode & PollSet::POLL_ERROR) + ev.events |= EPOLLERR; + + ev.data.ptr = socket.impl(); + int err = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev); + + if (err) + throwFromErrno("Error when updating epoll event to " + getAddressName(socket), ErrorCodes::EPOLL_CTL, errno); +} - void add(const Socket& socket, int mode) - { - Poco::FastMutex::ScopedLock lock(_mutex); - poco_socket_t fd = socket.impl()->sockfd(); - _addMap[fd] = mode; - _removeSet.erase(fd); - _socketMap[fd] = socket; - } +void PollSetImpl::clear() +{ + Poco::FastMutex::ScopedLock lock(mutex); - void remove(const Socket& socket) + ::close(epoll_fd); + socket_map.clear(); + epoll_fd = epoll_create(1); + if (epoll_fd < 0) { - Poco::FastMutex::ScopedLock lock(_mutex); - poco_socket_t fd = socket.impl()->sockfd(); - _removeSet.insert(fd); - _addMap.erase(fd); - _socketMap.erase(fd); + throwFromErrno("Error when creating epoll fd", ErrorCodes::EPOLL_CREATE, errno); } +} - bool has(const Socket& socket) const - { - Poco::FastMutex::ScopedLock lock(_mutex); - SocketImpl* sockImpl = socket.impl(); - return sockImpl && - (_socketMap.find(sockImpl->sockfd()) != _socketMap.end()); - } +PollSet::SocketModeMap PollSetImpl::poll(const Poco::Timespan & timeout) +{ + PollSet::SocketModeMap result; + Poco::Timespan remaining_time(timeout); - bool empty() const + int rc; + do { - Poco::FastMutex::ScopedLock lock(_mutex); - return _socketMap.empty(); - } + Poco::Timestamp start; + rc = epoll_wait(epoll_fd, &events[0], events.size(), remaining_time.totalMilliseconds()); - void update(const Socket& socket, int mode) - { - Poco::FastMutex::ScopedLock lock(_mutex); - poco_socket_t fd = socket.impl()->sockfd(); - for (auto it = _pollfds.begin(); it != _pollfds.end(); ++it) + if (rc == 0) { - if (it->fd == fd) - { - it->events = 0; - it->revents = 0; - setMode(it->events, mode); - } + LOG_TRACE(log, "epoll_wait got 0 events"); + return result; } - } - - void clear() - { - Poco::FastMutex::ScopedLock lock(_mutex); - - _socketMap.clear(); - _addMap.clear(); - _removeSet.clear(); - _pollfds.reserve(1); - } - PollSet::SocketModeMap poll(const Poco::Timespan& timeout) - { - PollSet::SocketModeMap result; + if (rc < 0 && errno == POCO_EINTR) { - Poco::FastMutex::ScopedLock lock(_mutex); + Poco::Timestamp end; + Poco::Timespan waited = end - start; + if (waited < remaining_time) + remaining_time -= waited; + else + break; /// timeout + } + } while (rc < 0 && errno == POCO_EINTR); - if (!_removeSet.empty()) - { - for (auto it = _pollfds.begin(); it != _pollfds.end();) - { - if (_removeSet.find(it->fd) != _removeSet.end()) - { - it = _pollfds.erase(it); - } - else ++it; - } - _removeSet.clear(); - } + if (rc < 0 && errno != POCO_EINTR) + throwFromErrno("Error when epoll waiting", ErrorCodes::EPOLL_WAIT, errno); - _pollfds.reserve(_pollfds.size() + _addMap.size()); - for (auto it = _addMap.begin(); it != _addMap.end(); ++it) - { - pollfd pfd; - pfd.fd = it->first; - pfd.events = 0; - pfd.revents = 0; - setMode(pfd.events, it->second); - _pollfds.push_back(pfd); - } - _addMap.clear(); - } + LOG_TRACE(log, "Got {} events", rc); - if (_pollfds.empty()) return result; + Poco::FastMutex::ScopedLock lock(mutex); - Poco::Timespan remainingTime(timeout); - int rc; - do + for (int i = 0; i < rc; i++) + { + /// Read data from 'wakeUp' method + if (events[i].data.ptr == this) { - Poco::Timestamp start; -#ifdef _WIN32 - rc = WSAPoll(&_pollfds[0], static_cast(_pollfds.size()), static_cast(remainingTime.totalMilliseconds())); -#else - rc = ::poll(&_pollfds[0], _pollfds.size(), remainingTime.totalMilliseconds()); -#endif - if (rc < 0 && errno == POCO_EINTR) - { - Poco::Timestamp end; - Poco::Timespan waited = end - start; - if (waited < remainingTime) - remainingTime -= waited; - else - remainingTime = 0; - } + uint64_t val; + auto n = ::read(waking_up_fd, &val, sizeof(val)); + if (n < 0) + throwFromErrno("Error when reading data from 'wakeUp' method", ErrorCodes::EPOLL_CREATE, errno); } - while (rc < 0 && errno == POCO_EINTR); - if (rc < 0) errno; - + /// Handle IO events + else if (events[i].data.ptr) { - if (_pollfds[0].revents & POLLIN) + std::map::iterator it = socket_map.find(static_cast(events[i].data.ptr)); + if (it != socket_map.end()) { - char c; - _pipe.readBytes(&c, 1); - } - - Poco::FastMutex::ScopedLock lock(_mutex); - - if (!_socketMap.empty()) - { - for (auto it = _pollfds.begin() + 1; it != _pollfds.end(); ++it) - { - std::map::const_iterator its = _socketMap.find(it->fd); - if (its != _socketMap.end()) - { - if ((it->revents & POLLIN) -#ifdef _WIN32 - || (it->revents & POLLHUP) -#endif - ) - result[its->second] |= PollSet::POLL_READ; - if (it->revents & POLLOUT) - result[its->second] |= PollSet::POLL_WRITE; - if (it->revents & POLLERR || (it->revents & POLLHUP)) - result[its->second] |= PollSet::POLL_ERROR; - } - it->revents = 0; - } + if (events[i].events & EPOLLIN) + result[it->second] |= PollSet::POLL_READ; + if (events[i].events & EPOLLOUT) + result[it->second] |= PollSet::POLL_WRITE; + if (events[i].events & EPOLLERR) + result[it->second] |= PollSet::POLL_ERROR; } } - - return result; - } - - void wakeUp() - { - char c = 1; - _pipe.writeBytes(&c, 1); - } - - int count() const - { - Poco::FastMutex::ScopedLock lock(_mutex); - return static_cast(_socketMap.size()); - } - -private: - - void setMode(short& target, int mode) - { - if (mode & PollSet::POLL_READ) - target |= POLLIN; - - if (mode & PollSet::POLL_WRITE) - target |= POLLOUT; + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Should never reach here."); + } } - mutable Poco::FastMutex _mutex; - std::map _socketMap; - std::map _addMap; - std::set _removeSet; - std::vector _pollfds; - Poco::Pipe _pipe; - /// Add _pipe to head of _pollfds used to wake up poll blocking -}; - - -#else - + return result; +} +void PollSetImpl::wakeUp() +{ + LOG_TRACE(log, "Try to wakeup poll set"); + uint64_t val = 0; + int n = ::write(waking_up_fd, &val, sizeof(val)); + if (n < 0) + throwFromErrno("Error when trying to wakeup poll set", ErrorCodes::EPOLL_CREATE, errno); +} -#endif +int PollSetImpl::count() const +{ + Poco::FastMutex::ScopedLock lock(mutex); + return static_cast(socket_map.size()); +} -PollSet::PollSet(): - _pImpl(new PollSetImpl) +PollSet::PollSet() : impl(new PollSetImpl) { } PollSet::~PollSet() { - delete _pImpl; + delete impl; } -void PollSet::add(const Socket& socket, int mode) +void PollSet::add(const Socket & socket, int mode) { - _pImpl->add(socket, mode); + impl->add(socket, mode); } -void PollSet::remove(const Socket& socket) +void PollSet::remove(const Socket & socket) { - _pImpl->remove(socket); + impl->remove(socket); } -void PollSet::update(const Socket& socket, int mode) +void PollSet::update(const Socket & socket, int mode) { - _pImpl->update(socket, mode); + impl->update(socket, mode); } -bool PollSet::has(const Socket& socket) const +bool PollSet::has(const Socket & socket) const { - return _pImpl->has(socket); + return impl->has(socket); } bool PollSet::empty() const { - return _pImpl->empty(); + return impl->empty(); } void PollSet::clear() { - _pImpl->clear(); + impl->clear(); } -PollSet::SocketModeMap PollSet::poll(const Poco::Timespan& timeout) +PollSet::SocketModeMap PollSet::poll(const Poco::Timespan & timeout) { - return _pImpl->poll(timeout); + return impl->poll(timeout); } int PollSet::count() const { - return _pImpl->count(); + return impl->count(); } void PollSet::wakeUp() { - _pImpl->wakeUp(); + impl->wakeUp(); } - } diff --git a/src/Common/NIO/PollSet.h b/src/Common/NIO/PollSet.h index 2921eba2aa9..d6a4a5eed47 100644 --- a/src/Common/NIO/PollSet.h +++ b/src/Common/NIO/PollSet.h @@ -4,28 +4,27 @@ */ #pragma once -#include "Poco/Net/Socket.h" #include +#include -using Poco::Net::Socket; -namespace RK { +namespace RK +{ +using Socket = Poco::Net::Socket; class PollSetImpl; - -class Net_API PollSet /// A set of sockets that can be efficiently polled as a whole. /// -/// If supported, PollSet is implemented using epoll (Linux) or -/// poll (BSD) APIs. A fallback implementation using select() -/// is also provided. +/// PollSet is implemented using epoll (Linux) or poll (BSD) APIs. +/// A fallback implementation using select() is also provided. +class PollSet { public: enum Mode { - POLL_READ = 0x01, + POLL_READ = 0x01, POLL_WRITE = 0x02, POLL_ERROR = 0x04 }; @@ -33,49 +32,43 @@ class Net_API PollSet using SocketModeMap = std::map; PollSet(); - /// Creates an empty PollSet. - ~PollSet(); - /// Destroys the PollSet. - void add(const Poco::Net::Socket& socket, int mode); - /// Adds the given socket to the set, for polling with - /// the given mode, which can be an OR'd combination of - /// POLL_READ, POLL_WRITE and POLL_ERROR. + /// Adds the given socket to the set, for polling with the given mode. + void add(const Socket & socket, int mode); - void remove(const Poco::Net::Socket& socket); /// Removes the given socket from the set. + void remove(const Socket & socket); - void update(const Poco::Net::Socket& socket, int mode); /// Updates the mode of the given socket. + void update(const Socket & socket, int mode); - bool has(const Socket& socket) const; /// Returns true if socket is registered for polling. + bool has(const Socket & socket) const; - bool empty() const; /// Returns true if no socket is registered for polling. + bool empty() const; - void clear(); /// Removes all sockets from the PollSet. + void clear(); - SocketModeMap poll(const Poco::Timespan& timeout); /// Waits until the state of at least one of the PollSet's sockets /// changes accordingly to its mode, or the timeout expires. /// Returns a PollMap containing the sockets that have had /// their state changed. + SocketModeMap poll(const Poco::Timespan & timeout); + /// Returns the number of sockets monitored. int count() const; - /// Returns the numberof sockets monitored. - void wakeUp(); /// Wakes up a waiting PollSet. - /// On platforms/implementations where this functionality - /// is not available, it does nothing. + void wakeUp(); + private: - PollSetImpl* _pImpl; + PollSetImpl * impl; - PollSet(const PollSet&); - PollSet& operator = (const PollSet&); + PollSet(const PollSet &); + PollSet & operator=(const PollSet &); }; diff --git a/src/Common/NIO/SocketAcceptor.h b/src/Common/NIO/SocketAcceptor.h new file mode 100644 index 00000000000..bde667a5d09 --- /dev/null +++ b/src/Common/NIO/SocketAcceptor.h @@ -0,0 +1,178 @@ +/** +* Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. and Contributors. +* SPDX-License-Identifier: BSL-1.0 +*/ +#pragma once + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + + +namespace RK +{ + +namespace ErrorCodes +{ + extern const int EPOLL_ERROR; +} + +using Poco::Net::ServerSocket; +using Poco::Net::Socket; +using Poco::Net::StreamSocket; + +/// This class implements the Acceptor part of the Acceptor-Connector design pattern. +/// +/// This is a multi-threaded version of SocketAcceptor, it differs from the +/// single-threaded version in number of getWorkerReactors (defaulting to number of processors) +/// that can be specified at construction time and is rotated in a round-robin fashion +/// by event handler. See ParallelSocketAcceptor::onAccept and +/// ParallelSocketAcceptor::createServiceHandler documentation and implementation for +/// details. +template +class SocketAcceptor +{ +public: + using MainReactorPtr = AsyncSocketReactorPtr; + using WorkerReactor = AsyncSocketReactor; + using WorkerReactorPtr = AsyncSocketReactorPtr; + using WorkerReactors = std::vector; + using AcceptorObserver = Observer; + using ErrorObserver = Observer; + + SocketAcceptor() = delete; + SocketAcceptor(const SocketAcceptor &) = delete; + SocketAcceptor & operator=(const SocketAcceptor &) = delete; + + explicit SocketAcceptor( + const String & name_, + Context & keeper_context_, + ServerSocket & socket_, + MainReactorPtr & main_reactor_, + const Poco::Timespan & timeout_, + size_t worker_count_ = getNumberOfPhysicalCPUCores()) + : name(name_) + , socket(socket_) + , main_reactor(main_reactor_) + , worker_count(worker_count_) + , keeper_context(keeper_context_) + , timeout(timeout_) + , log(&Poco::Logger::get("SocketAcceptor")) + { + initialize(); + } + + virtual ~SocketAcceptor() + { + try + { + if (main_reactor) + { + main_reactor->removeEventHandler(socket, AcceptorObserver(*this, &SocketAcceptor::onAccept)); + main_reactor->removeEventHandler(socket, ErrorObserver(*this, &SocketAcceptor::onError)); + } + } + catch (...) + { + } + } + + /// Accepts connection and dispatches event handler. + void onAccept(const Notification &) + { + StreamSocket sock; + try + { + LOG_TRACE(log, "{} tries to accept connection", socket.address().toString()); + sock = socket.acceptConnection(); + LOG_TRACE(log, "Successfully accepted {}", sock.peerAddress().toString()); + createServiceHandler(sock); + } + catch (...) + { + LOG_ERROR(log, "Failed to accept a connection"); + sock.close(); + } + } + + virtual void onError(const Notification & enf) + { + auto error_no = dynamic_cast(&enf)->getErrorNo(); + throwFromErrno("Error when accepting connection for " + socket.address().toString(), ErrorCodes::EPOLL_ERROR, error_no); + } + + /// Returns a reference to the listening socket. + [[maybe_unused]] Socket & getSocket() { return socket; } + +protected: + void initialize() + { + /// Initialize worker getWorkerReactors + poco_assert(worker_count > 0); + for (size_t i = 0; i < worker_count; ++i) + worker_reactors.push_back(std::make_shared(timeout, name + "#" + std::to_string(i))); + + /// Register accept event handler to main reactor + main_reactor->addEventHandler(socket, AcceptorObserver(*this, &SocketAcceptor::onAccept)); + main_reactor->addEventHandler(socket, ErrorObserver(*this, &SocketAcceptor::onError)); + + /// It is necessary to wake up the getWorkerReactor. + main_reactor->wakeUp(); + } + + /// Socket will be dispatched by socket_fd % worker_count. + WorkerReactorPtr getWorkerReactor(const StreamSocket & socket_) + { + auto fd = socket_.impl()->sockfd(); + return worker_reactors[fd % worker_count]; + } + + /// Create and initialize a new ServiceHandler instance. + virtual ServiceHandler * createServiceHandler(StreamSocket & socket_) + { + socket_.setBlocking(false); + + auto worker_reactor = getWorkerReactor(socket_); + auto * handler = new ServiceHandler(keeper_context, socket_, *worker_reactor); + + /// It is necessary to wake up the getWorkerReactor. + worker_reactor->wakeUp(); + return handler; + } + +private: + /// Thread name prefix of the worker reactor threads. + String name; + + /// Socket the main reactor bounded. + ServerSocket socket; + + /// Main reactor which only concerns about the accept socket events of the socket. + MainReactorPtr main_reactor; + + /// Number of workers which works in an independent thread. + size_t worker_count; + WorkerReactors worker_reactors; + + /// Keeper context + Context & keeper_context; + + /// A period in which worker reactor poll waits. + Poco::Timespan timeout; + + Poco::Logger * log; +}; + + +} diff --git a/src/Common/NIO/SocketNotification.cpp b/src/Common/NIO/SocketNotification.cpp deleted file mode 100644 index 9c19e75c22c..00000000000 --- a/src/Common/NIO/SocketNotification.cpp +++ /dev/null @@ -1,93 +0,0 @@ -/** -* Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. and Contributors. -* SPDX-License-Identifier: BSL-1.0 -*/ -#include - - -namespace RK { - -SocketNotification::SocketNotification(SocketReactor* pReactor): - _pReactor(pReactor) -{ -} - - -SocketNotification::~SocketNotification() -{ -} - - -void SocketNotification::setSocket(const Socket& socket) -{ - _socket = socket; -} - - -ReadableNotification::ReadableNotification(SocketReactor* pReactor): - SocketNotification(pReactor) -{ -} - - -ReadableNotification::~ReadableNotification() -{ -} - - -WritableNotification::WritableNotification(SocketReactor* pReactor): - SocketNotification(pReactor) -{ -} - - -WritableNotification::~WritableNotification() -{ -} - - -ErrorNotification::ErrorNotification(SocketReactor* pReactor): - SocketNotification(pReactor) -{ -} - - -ErrorNotification::~ErrorNotification() -{ -} - - -TimeoutNotification::TimeoutNotification(SocketReactor* pReactor): - SocketNotification(pReactor) -{ -} - - -TimeoutNotification::~TimeoutNotification() -{ -} - - -IdleNotification::IdleNotification(SocketReactor* pReactor): - SocketNotification(pReactor) -{ -} - - -IdleNotification::~IdleNotification() -{ -} - - -ShutdownNotification::ShutdownNotification(SocketReactor* pReactor): - SocketNotification(pReactor) -{ -} - - -ShutdownNotification::~ShutdownNotification() -{ -} - - -} diff --git a/src/Common/NIO/SocketNotification.h b/src/Common/NIO/SocketNotification.h index 0c55e6cdf36..3403f3b2adf 100644 --- a/src/Common/NIO/SocketNotification.h +++ b/src/Common/NIO/SocketNotification.h @@ -1,134 +1,130 @@ /** * Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. and Contributors. * SPDX-License-Identifier: BSL-1.0 -* */ #pragma once -#include "Poco/Net/Net.h" -#include "Poco/Net/Socket.h" -#include "Poco/Notification.h" +#include +#include + +#include +#include using Poco::Net::Socket; -namespace RK { +namespace RK +{ class SocketReactor; -class SocketNotification: public Poco::Notification /// The base class for all notifications generated by /// the SocketReactor. +class SocketNotification : public Notification { public: - explicit SocketNotification(SocketReactor* pReactor); - /// Creates the SocketNotification for the given SocketReactor. - - virtual ~SocketNotification() override; - /// Destroys the SocketNotification. - - SocketReactor& source() const; - /// Returns the SocketReactor that generated the notification. + explicit SocketNotification(SocketReactor * reactor_) : reactor(reactor_) { } + virtual ~SocketNotification() override = default; - Socket socket() const; - /// Returns the socket that caused the notification. + [[maybe_unused]] SocketReactorPtr getSocketReactor() const; + [[maybe_unused]] Socket getSocket() const; private: - void setSocket(const Socket& socket); + [[maybe_unused]] void setSocket(const Socket & socket_); - SocketReactor* _pReactor; - Socket _socket; - - friend class SocketNotifier; + SocketReactorPtr reactor; + Socket socket; /// TODO delete }; - -class ReadableNotification: public SocketNotification /// This notification is sent if a socket has become readable. +class ReadableNotification : public SocketNotification { public: - ReadableNotification(SocketReactor* pReactor); - /// Creates the ReadableNotification for the given SocketReactor. + explicit ReadableNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } + ~ReadableNotification() override = default; - ~ReadableNotification() override; - /// Destroys the ReadableNotification. + std::string name() const override { return "read"; } }; - -class WritableNotification: public SocketNotification /// This notification is sent if a socket has become writable. +class WritableNotification : public SocketNotification { public: - WritableNotification(SocketReactor* pReactor); - /// Creates the WritableNotification for the given SocketReactor. + explicit WritableNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } + ~WritableNotification() override = default; - ~WritableNotification() override; - /// Destroys the WritableNotification. + std::string name() const override { return "write"; } }; - -class ErrorNotification: public SocketNotification /// This notification is sent if a socket has signalled an error. +class ErrorNotification : public SocketNotification { public: - ErrorNotification(SocketReactor* pReactor); - /// Creates the ErrorNotification for the given SocketReactor. + explicit ErrorNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } + ~ErrorNotification() override = default; - ~ErrorNotification() override; - /// Destroys the ErrorNotification. -}; + std::string name() const override { return "error"; } + int getErrorNo() const { return error_no; } + void setErrorNo(int error_no_) { error_no = error_no_; } +private: + int error_no; +}; -class TimeoutNotification: public SocketNotification /// This notification is sent if no other event has occurred /// for a specified time. +class TimeoutNotification : public SocketNotification { public: - TimeoutNotification(SocketReactor* pReactor); - /// Creates the TimeoutNotification for the given SocketReactor. + explicit TimeoutNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } + ~TimeoutNotification() override = default; - ~TimeoutNotification() override; - /// Destroys the TimeoutNotification. + std::string name() const override { return "timeout"; } }; - -class IdleNotification: public SocketNotification /// This notification is sent when the SocketReactor does /// not have any sockets to react to. +class IdleNotification : public SocketNotification { public: - IdleNotification(SocketReactor* pReactor); - /// Creates the IdleNotification for the given SocketReactor. + explicit IdleNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } + ~IdleNotification() override = default; - ~IdleNotification() override; - /// Destroys the IdleNotification. + std::string name() const override { return "idle"; } }; - -class ShutdownNotification: public SocketNotification /// This notification is sent when the SocketReactor is /// about to shut down. +class ShutdownNotification : public SocketNotification { public: - ShutdownNotification(SocketReactor* pReactor); - /// Creates the ShutdownNotification for the given SocketReactor. + explicit ShutdownNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } + ~ShutdownNotification() override = default; - ~ShutdownNotification() override; - /// Destroys the ShutdownNotification. + std::string name() const override { return "shutdown"; } }; +using SocketNotificationPtr [[maybe_unused]] = std::shared_ptr; +using ReadableNotificationPtr [[maybe_unused]] = std::shared_ptr; +using WritableNotificationPtr [[maybe_unused]] = std::shared_ptr; +using ErrorNotificationPtr [[maybe_unused]] = std::shared_ptr; +using TimeoutNotificationPtr [[maybe_unused]] = std::shared_ptr; +using IdleNotificationPtr [[maybe_unused]] = std::shared_ptr; +using ShutdownNotificationPtr [[maybe_unused]] = std::shared_ptr; -// -// inlines -// -inline SocketReactor& SocketNotification::source() const +[[maybe_unused]] [[maybe_unused]] inline SocketReactorPtr SocketNotification::getSocketReactor() const { - return *_pReactor; + return reactor; } -inline Socket SocketNotification::socket() const +[[maybe_unused]] inline Socket SocketNotification::getSocket() const +{ + return socket; +} + +[[maybe_unused]] inline void SocketNotification::setSocket(const Socket & socket_) { - return _socket; + socket = socket_; } diff --git a/src/Common/NIO/SocketNotifier.cpp b/src/Common/NIO/SocketNotifier.cpp index da5527d576d..fd9d162fd6f 100644 --- a/src/Common/NIO/SocketNotifier.cpp +++ b/src/Common/NIO/SocketNotifier.cpp @@ -3,52 +3,40 @@ * SPDX-License-Identifier: BSL-1.0 * */ +#include #include #include -#include - - -namespace RK { -SocketNotifier::SocketNotifier(const Socket& socket): - _socket(socket) +namespace RK { -} -bool SocketNotifier::addObserverIfNotExist(SocketReactor*, const Poco::AbstractObserver& observer) +SocketNotifier::SocketNotifier(const Socket & socket_) : socket(socket_) { - return _nc.addObserverIfNotExist(observer); } - -bool SocketNotifier::removeObserverIfExist(SocketReactor*, const Poco::AbstractObserver& observer) +bool SocketNotifier::addObserverIfNotExist(const AbstractObserver & observer) { - return _nc.removeObserverIfExist(observer); + return nc.addObserverIfNotExist(observer); } -namespace +bool SocketNotifier::removeObserverIfExist(const AbstractObserver & observer) { - static Socket nullSocket; + return nc.removeObserverIfExist(observer); } -void SocketNotifier::dispatch(SocketNotification* pNotification) +void SocketNotifier::dispatch(const Notification & notification) { - pNotification->setSocket(_socket); - pNotification->duplicate(); try { - _nc.postNotification(pNotification); + nc.postNotification(notification); } catch (...) { - pNotification->setSocket(nullSocket); throw; } - pNotification->setSocket(nullSocket); } - } diff --git a/src/Common/NIO/SocketNotifier.h b/src/Common/NIO/SocketNotifier.h index 43a67eb7a63..8c2c916a0ad 100644 --- a/src/Common/NIO/SocketNotifier.h +++ b/src/Common/NIO/SocketNotifier.h @@ -1,95 +1,98 @@ /** * Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. and Contributors. * SPDX-License-Identifier: BSL-1.0 -* */ #pragma once -#include "Poco/Net/Net.h" -#include "Poco/Net/Socket.h" -#include "Poco/RefCountedObject.h" -#include -#include "Poco/Observer.h" - -namespace RK { +#include +#include -class SocketReactor; -class SocketNotification; - -using Poco::Net::Socket; +#include +#include +#include +namespace RK +{ -class Net_API SocketNotifier: public Poco::RefCountedObject /// This class is used internally by SocketReactor /// to notify registered event handlers of socket events. +class SocketNotifier { public: - explicit SocketNotifier(const Socket& socket); + using Socket = Poco::Net::Socket; + using MutexType = Poco::FastMutex; + using ScopedLock = MutexType::ScopedLock; + /// Creates the SocketNotifier for the given socket. + explicit SocketNotifier(const Socket & socket); - bool addObserverIfNotExist(SocketReactor* pReactor, const Poco::AbstractObserver& observer); /// Adds the given observer. + bool addObserverIfNotExist(const AbstractObserver & observer); - bool removeObserverIfExist(SocketReactor* pReactor, const Poco::AbstractObserver& observer); /// Removes the given observer. + bool removeObserverIfExist(const AbstractObserver & observer); - bool hasObserver(const Poco::AbstractObserver& observer) const; /// Returns true if the given observer is registered. + bool hasObserver(const AbstractObserver & observer) const; - bool onlyHas(const Poco::AbstractObserver& observer) const; /// Returns true if only has the given observer + bool onlyHas(const AbstractObserver & observer) const; - bool accepts(SocketNotification* pNotification); /// Returns true if there is at least one observer for the given notification. + bool accepts(const Notification & notification); - void dispatch(SocketNotification* pNotification); /// Dispatches the notification to all observers. + void dispatch(const Notification & notification); - bool hasObservers() const; /// Returns true if there are subscribers. + [[maybe_unused]] bool hasObservers() const; - std::size_t countObservers() const; /// Returns the number of subscribers; + size_t size() const; -protected: - ~SocketNotifier() override = default; - /// Destroys the SocketNotifier. + const Socket & getSocket() const; -private: - using MutexType = Poco::FastMutex; - using ScopedLock = MutexType::ScopedLock; + ~SocketNotifier() = default; - NotificationCenter _nc; - Socket _socket; +private: + NotificationCenter nc; + Socket socket; }; +using SocketNotifierPtr = std::shared_ptr; + -inline bool SocketNotifier::accepts(SocketNotification* pNotification) +inline bool SocketNotifier::accepts(const Notification & notification) { - return _nc.accept(pNotification); + return nc.accept(notification); } -inline bool SocketNotifier::hasObserver(const Poco::AbstractObserver& observer) const +inline bool SocketNotifier::hasObserver(const AbstractObserver & observer) const { - return _nc.hasObserver(observer); + return nc.hasObserver(observer); } -inline bool SocketNotifier::onlyHas(const Poco::AbstractObserver& observer) const +inline bool SocketNotifier::onlyHas(const AbstractObserver & observer) const { - return _nc.onlyHas(observer); + return nc.onlyHas(observer); } -inline bool SocketNotifier::hasObservers() const +[[maybe_unused]] inline bool SocketNotifier::hasObservers() const { - return _nc.hasObservers(); + return nc.hasObservers(); } -inline std::size_t SocketNotifier::countObservers() const +inline size_t SocketNotifier::size() const +{ + return nc.size(); +} + +inline const Socket & SocketNotifier::getSocket() const { - return _nc.countObservers(); + return socket; } diff --git a/src/Common/NIO/SocketReactor.cpp b/src/Common/NIO/SocketReactor.cpp index 8f840b2646b..4113014089b 100644 --- a/src/Common/NIO/SocketReactor.cpp +++ b/src/Common/NIO/SocketReactor.cpp @@ -3,88 +3,89 @@ * SPDX-License-Identifier: BSL-1.0 * */ -#include +#include +#include +#include + +#include #include #include -#include "Poco/ErrorHandler.h" -#include "Poco/Thread.h" -#include "Poco/Exception.h" +#include +using Poco::ErrorHandler; using Poco::Exception; using Poco::Thread; -using Poco::ErrorHandler; using Poco::Net::SocketImpl; -namespace RK { - +namespace RK +{ -SocketReactor::SocketReactor(): SocketReactor(DEFAULT_TIMEOUT) +SocketReactor::SocketReactor() : SocketReactor(DEFAULT_TIMEOUT) { } -SocketReactor::SocketReactor(const Poco::Timespan& timeout): - _stop(false), - _timeout(timeout), - _pReadableNotification(new ReadableNotification(this)), - _pWritableNotification(new WritableNotification(this)), - _pErrorNotification(new ErrorNotification(this)), - _pTimeoutNotification(new TimeoutNotification(this)), - _pIdleNotification(new IdleNotification(this)), - _pShutdownNotification(new ShutdownNotification(this)), - _pThread(nullptr) +SocketReactor::SocketReactor(const Poco::Timespan & timeout_) + : timeout(timeout_) + , stopped(false) + , rnf(new ReadableNotification(this)) + , wnf(new WritableNotification(this)) + , enf(new ErrorNotification(this)) + , tnf(new TimeoutNotification(this)) + , inf(new IdleNotification(this)) + , snf(new ShutdownNotification(this)) + , log(&Poco::Logger::get("SocketReactor")) { } void SocketReactor::run() { - _pThread = Thread::current(); - while (!_stop) + while (!stopped) { try { if (!hasSocketHandlers()) { onIdle(); - Thread::trySleep(static_cast(_timeout.totalMilliseconds())); + sleep(); } else { bool readable = false; - PollSet::SocketModeMap sm = _pollSet.poll(_timeout); - if (sm.size() > 0) + PollSet::SocketModeMap sm = poll_set.poll(timeout); + + if (!sm.empty()) { onBusy(); - PollSet::SocketModeMap::iterator it = sm.begin(); - PollSet::SocketModeMap::iterator end = sm.end(); - for (; it != end; ++it) + for (auto & socket_and_events : sm) { - if (it->second & PollSet::POLL_READ) + if (socket_and_events.second & PollSet::POLL_READ) { - dispatch(it->first, _pReadableNotification); + dispatch(socket_and_events.first, *rnf); readable = true; } - if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification); - if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification); + if (socket_and_events.second & PollSet::POLL_WRITE) + { + dispatch(socket_and_events.first, *wnf); + } + if (socket_and_events.second & PollSet::POLL_ERROR) + { + dynamic_cast(enf.get())->setErrorNo(errno); + dispatch(socket_and_events.first, *enf); + } } } - if (!readable) onTimeout(); + + if (!readable) + onTimeout(); } } - catch (Exception& exc) - { - ErrorHandler::handle(exc); - } - catch (std::exception& exc) - { - ErrorHandler::handle(exc); - } catch (...) { - ErrorHandler::handle(); + tryLogCurrentException(log, "Failed to handle socket event"); } } onShutdown(); @@ -93,14 +94,13 @@ void SocketReactor::run() bool SocketReactor::hasSocketHandlers() { - if (!_pollSet.empty()) + ScopedLock lock(mutex); + if (!notifiers.empty()) { - ScopedLock lock(_mutex); - for (auto& p: _handlers) + for (auto & p : notifiers) { - if (p.second->accepts(_pReadableNotification) || - p.second->accepts(_pWritableNotification) || - p.second->accepts(_pErrorNotification)) return true; + if (p.second->accepts(*rnf) || p.second->accepts(*wnf) || p.second->accepts(*enf)) + return true; } } @@ -110,143 +110,163 @@ bool SocketReactor::hasSocketHandlers() void SocketReactor::stop() { - _stop = true; + stopped = true; wakeUp(); } -/// Wake up reactor, if invoker running in event loop thread there is no need to wake up. void SocketReactor::wakeUp() { - auto * copy = _pThread.load(); /// to avoid data race - if (copy && copy != Thread::current()) - { - copy->wakeUp(); - _pollSet.wakeUp(); - } + if (stopped) + return; + poll_set.wakeUp(); + event.set(); +} + +void SocketReactor::sleep() +{ + event.tryWait(timeout.totalMilliseconds()); } -void SocketReactor::setTimeout(const Poco::Timespan& timeout) +void SocketReactor::setTimeout(const Poco::Timespan & timeout_) { - _timeout = timeout; + timeout = timeout_; } -const Poco::Timespan& SocketReactor::getTimeout() const +const Poco::Timespan & SocketReactor::getTimeout() const { - return _timeout; + return timeout; } -void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer) +void SocketReactor::addEventHandler(const Socket & socket, const AbstractObserver & observer) { - NotifierPtr pNotifier = getNotifier(socket, true); - if (pNotifier->addObserverIfNotExist(this, observer)) + SocketNotifierPtr notifier = getNotifier(socket, true); + if (notifier->addObserverIfNotExist(observer)) { int mode = 0; - if (pNotifier->accepts(_pReadableNotification)) mode |= PollSet::POLL_READ; - if (pNotifier->accepts(_pWritableNotification)) mode |= PollSet::POLL_WRITE; - if (pNotifier->accepts(_pErrorNotification)) mode |= PollSet::POLL_ERROR; - if (mode) _pollSet.add(socket, mode); + if (notifier->accepts(*rnf)) + mode |= PollSet::POLL_READ; + if (notifier->accepts(*wnf)) + mode |= PollSet::POLL_WRITE; + if (notifier->accepts(*enf)) + mode |= PollSet::POLL_ERROR; + if (mode) + poll_set.add(socket, mode); } } -void SocketReactor::addEventHandlers(const Socket& socket, const std::vector& observers) +void SocketReactor::addEventHandlers(const Socket & socket, const std::vector & observers) { - NotifierPtr pNotifier = getNotifier(socket, true); + SocketNotifierPtr notifier = getNotifier(socket, true); int mode = 0; for (auto * observer : observers) { - pNotifier->addObserverIfNotExist(this, *observer); - - if (pNotifier->accepts(_pReadableNotification)) mode |= PollSet::POLL_READ; - if (pNotifier->accepts(_pWritableNotification)) mode |= PollSet::POLL_WRITE; - if (pNotifier->accepts(_pErrorNotification)) mode |= PollSet::POLL_ERROR; + notifier->addObserverIfNotExist(*observer); + + if (notifier->accepts(*rnf)) + mode |= PollSet::POLL_READ; + if (notifier->accepts(*wnf)) + mode |= PollSet::POLL_WRITE; + if (notifier->accepts(*enf)) + mode |= PollSet::POLL_ERROR; } - if (mode) _pollSet.add(socket, mode); + if (mode) + poll_set.add(socket, mode); } -bool SocketReactor::hasEventHandler(const Socket& socket, const Poco::AbstractObserver& observer) +[[maybe_unused]] bool SocketReactor::hasEventHandler(const Socket & socket, const AbstractObserver & observer) { - NotifierPtr pNotifier = getNotifier(socket); - if (!pNotifier) return false; - if (pNotifier->hasObserver(observer)) return true; + SocketNotifierPtr notifier = getNotifier(socket); + if (!notifier) + return false; + if (notifier->hasObserver(observer)) + return true; return false; } -SocketReactor::NotifierPtr SocketReactor::getNotifier(const Socket& socket, bool makeNew) +SocketNotifierPtr SocketReactor::getNotifier(const Socket & socket, bool makeNew) { - const SocketImpl* pImpl = socket.impl(); - if (pImpl == nullptr) return nullptr; - poco_socket_t sockfd = pImpl->sockfd(); - ScopedLock lock(_mutex); + const SocketImpl * impl = socket.impl(); + if (impl == nullptr) + return nullptr; + + poco_socket_t sock_fd = impl->sockfd(); + ScopedLock lock(mutex); - EventHandlerMap::iterator it = _handlers.find(sockfd); - if (it != _handlers.end()) return it->second; - else if (makeNew) return (_handlers[sockfd] = new SocketNotifier(socket)); + SocketNotifierMap::iterator it = notifiers.find(sock_fd); + if (it != notifiers.end()) + return it->second; + else if (makeNew) + return (notifiers[sock_fd] = std::make_shared(socket)); return nullptr; } -void SocketReactor::removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer) +void SocketReactor::removeEventHandler(const Socket & socket, const AbstractObserver & observer) { - const SocketImpl* pImpl = socket.impl(); - if (pImpl == nullptr) return; + const SocketImpl * impl = socket.impl(); + if (impl == nullptr) + return; - NotifierPtr pNotifier; + SocketNotifierPtr notifier; { - ScopedLock lock(_mutex); - EventHandlerMap::iterator it = _handlers.find(pImpl->sockfd()); - if (it != _handlers.end()) - pNotifier = it->second; + ScopedLock lock(mutex); + SocketNotifierMap::iterator it = notifiers.find(impl->sockfd()); + if (it != notifiers.end()) + notifier = it->second; - if (pNotifier && pNotifier->onlyHas(observer)) + if (notifier && notifier->onlyHas(observer)) { - _handlers.erase(pImpl->sockfd()); - _pollSet.remove(socket); + notifiers.erase(impl->sockfd()); + poll_set.remove(socket); } } - if (pNotifier) + if (notifier) { - pNotifier->removeObserverIfExist(this, observer); - if (pNotifier->countObservers() > 0 && socket.impl()->sockfd() > 0) + notifier->removeObserverIfExist(observer); + if (notifier->size() > 0 && socket.impl()->sockfd() > 0) { int mode = 0; - if (pNotifier->accepts(_pReadableNotification)) mode |= PollSet::POLL_READ; - if (pNotifier->accepts(_pWritableNotification)) mode |= PollSet::POLL_WRITE; - if (pNotifier->accepts(_pErrorNotification)) mode |= PollSet::POLL_ERROR; - _pollSet.update(socket, mode); + if (notifier->accepts(*rnf)) + mode |= PollSet::POLL_READ; + if (notifier->accepts(*wnf)) + mode |= PollSet::POLL_WRITE; + if (notifier->accepts(*enf)) + mode |= PollSet::POLL_ERROR; + poll_set.update(socket, mode); } } } -bool SocketReactor::has(const Socket& socket) const +bool SocketReactor::has(const Socket & socket) const { - return _pollSet.has(socket); + return poll_set.has(socket); } void SocketReactor::onTimeout() { - dispatch(_pTimeoutNotification); + dispatch(*tnf); } void SocketReactor::onIdle() { - dispatch(_pIdleNotification); + dispatch(*inf); } void SocketReactor::onShutdown() { - dispatch(_pShutdownNotification); + dispatch(*snf); } @@ -254,50 +274,84 @@ void SocketReactor::onBusy() { } - -void SocketReactor::dispatch(const Socket& socket, SocketNotification* pNotification) +void SocketReactor::dispatch(const Socket & socket, const Notification & notification) { - NotifierPtr pNotifier = getNotifier(socket); - if (!pNotifier) return; - dispatch(pNotifier, pNotification); + SocketNotifierPtr notifier = getNotifier(socket); + if (!notifier) + return; + dispatch(notifier, notification); } -void SocketReactor::dispatch(SocketNotification* pNotification) +void SocketReactor::dispatch(const Notification & notification) { - std::vector delegates; + std::vector copied; { - ScopedLock lock(_mutex); - delegates.reserve(_handlers.size()); - for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it) - delegates.push_back(it->second); + ScopedLock lock(mutex); + copied.reserve(notifiers.size()); + for (auto & notifier : notifiers) + copied.push_back(notifier.second); } - for (std::vector::iterator it = delegates.begin(); it != delegates.end(); ++it) + for (auto & notifier : copied) { - dispatch(*it, pNotification); + dispatch(notifier, notification); } } -void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification) +void SocketReactor::dispatch(SocketNotifierPtr & notifier, const Notification & notification) { try { - pNotifier->dispatch(pNotification); + const auto & sock = notifier->getSocket(); + const auto socket_name = sock.isStream() ? sock.address().toString() /// use local address for server socket + : sock.peerAddress().toString(); /// use remote address for server socket + LOG_TRACE(log, "Dispatch event {} for {} ", notification.name(), socket_name); + notifier->dispatch(notification); + } + catch (...) + { + tryLogCurrentException(log, "Failed to dispatch socket event " + notification.name()); } - catch (Exception& exc) +} + + +AsyncSocketReactor::AsyncSocketReactor(const Poco::Timespan & timeout, const std::string & name_) : SocketReactor(timeout), name(name_) +{ + startup(); +} + +void AsyncSocketReactor::startup() +{ + thread.start(*this); +} + +void AsyncSocketReactor::run() +{ + if (!name.empty()) { - ErrorHandler::handle(exc); + setThreadName(name.c_str()); + Poco::Thread::current()->setName(name); } - catch (std::exception& exc) + SocketReactor::run(); +} + +AsyncSocketReactor::~AsyncSocketReactor() +{ + try { - ErrorHandler::handle(exc); + this->stop(); + thread.join(); } catch (...) { - ErrorHandler::handle(); } } +void AsyncSocketReactor::onIdle() +{ + SocketReactor::onIdle(); + Poco::Thread::yield(); +} } diff --git a/src/Common/NIO/SocketReactor.h b/src/Common/NIO/SocketReactor.h index fff00904b3f..09de5e69e55 100644 --- a/src/Common/NIO/SocketReactor.h +++ b/src/Common/NIO/SocketReactor.h @@ -1,32 +1,32 @@ /** * Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. and Contributors. * SPDX-License-Identifier: BSL-1.0 -* */ #pragma once -#include "Poco/Net/Net.h" -#include "Poco/Net/Socket.h" +#include +#include + +#include +#include +#include +#include +#include + +#include +#include #include #include -#include -#include "Poco/Runnable.h" -#include "Poco/Timespan.h" -#include "Poco/Observer.h" -#include "Poco/NObserver.h" -#include "Poco/AutoPtr.h" -#include "Poco/Thread.h" -#include -#include +#include +#include using Poco::Net::Socket; -using Poco::AutoPtr; -namespace RK { +namespace RK +{ -class SocketReactor: public Poco::Runnable -/// This class, which is part of the Reactor pattern, +/// This class, which is the core of the Reactor pattern, /// implements the "Initiation Dispatcher". /// /// The Reactor pattern has been described in the book @@ -40,192 +40,146 @@ class SocketReactor: public Poco::Runnable /// handler is responsible for servicing service-specific requests. /// The SocketReactor dispatches the event handlers. /// -/// Event handlers (any class can be an event handler - there -/// is no base class for event handlers) can be registered -/// with the addEventHandler() method and deregistered with -/// the removeEventHandler() method. -/// -/// An event handler is always registered for a certain socket, -/// which is given in the call to addEventHandler(). Any method -/// of the event handler class can be registered to handle the -/// event - the only requirement is that the method takes -/// a pointer to an instance of SocketNotification (or a subclass of it) -/// as argument. -/// /// Once started, the SocketReactor waits for events -/// on the registered sockets, using Socket::select(). +/// on the registered sockets, using PollSet. /// If an event is detected, the corresponding event handler /// is invoked. There are five event types (and corresponding /// notification classes) defined: ReadableNotification, WritableNotification, /// ErrorNotification, TimeoutNotification, IdleNotification and /// ShutdownNotification. -/// -/// The ReadableNotification will be dispatched if a socket becomes -/// readable. The WritableNotification will be dispatched if a socket -/// becomes writable. The ErrorNotification will be dispatched if -/// there is an error condition on a socket. -/// -/// If the timeout expires and no event has occurred, a -/// TimeoutNotification will be dispatched to all event handlers -/// registered for it. This is done in the onTimeout() method -/// which can be overridden by subclasses to perform custom -/// timeout processing. -/// -/// If there are no sockets for the SocketReactor to pass to -/// Socket::select(), an IdleNotification will be dispatched to -/// all event handlers registered for it. This is done in the -/// onIdle() method which can be overridden by subclasses -/// to perform custom idle processing. Since onIdle() will be -/// called repeatedly in a loop, it is recommended to do a -/// short sleep or yield in the event handler. -/// -/// Finally, when the SocketReactor is about to shut down (as a result -/// of stop() being called), it dispatches a ShutdownNotification -/// to all event handlers. This is done in the onShutdown() method -/// which can be overridden by subclasses to perform custom -/// shutdown processing. -/// -/// The SocketReactor is implemented so that it can -/// run in its own thread. It is also possible to run -/// multiple SocketReactors in parallel, as long as -/// they work on different sockets. -/// -/// It is safe to call addEventHandler() and removeEventHandler() -/// from another thread while the SocketReactor is running. Also, -/// it is safe to call addEventHandler() and removeEventHandler() -/// from event handlers. +class SocketReactor : public Poco::Runnable { public: SocketReactor(); - /// Creates the SocketReactor. - - explicit SocketReactor(const Poco::Timespan& timeout); - /// Creates the SocketReactor, using the given timeout. + explicit SocketReactor(const Poco::Timespan & timeout); virtual ~SocketReactor() override = default; - /// Destroys the SocketReactor. void run() override; - /// Runs the SocketReactor. The reactor will run - /// until stop() is called (in a separate thread). - void stop(); - /// Stops the SocketReactor. - /// - /// The reactor will be stopped when the next event - /// (including a timeout event) occurs. + /// Wake up the Reactor void wakeUp(); - /// Wakes up idle reactor. - void setTimeout(const Poco::Timespan& timeout); /// Sets the timeout. /// /// If no other event occurs for the given timeout /// interval, a timeout event is sent to all event listeners. /// /// The default timeout is 250 milliseconds; - /// - /// The timeout is passed to the Socket::select() - /// method. - - const Poco::Timespan& getTimeout() const; - /// Returns the timeout. + /// The timeout is passed to the PollSet. + void setTimeout(const Poco::Timespan & timeout); + const Poco::Timespan & getTimeout() const; - void addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer); /// Deprecated for it has TSAN heap-use-after-free risk. /// Acceptor thread wants to add 3 events(read, error, shutdown), /// when the first event `read` added, the handler thread can trigger /// read event if the socket is not available, handler thread may destroy /// itself and the socket, so heap-use-after-free happens. + void addEventHandler(const Socket & socket, const AbstractObserver & observer); - void addEventHandlers(const Socket& socket, const std::vector& observers); /// Registers an event handler with the SocketReactor. /// /// Usage: - /// Poco::Observer obs(*this, &MyEventHandler::handleMyEvent); - /// reactor.addEventHandler(obs); + /// Observer obs(*this, &MyEventHandler::handleMyEvent); + /// getWorkerReactor.addEventHandler(obs); + void addEventHandlers(const Socket & socket, const std::vector & observers); - bool hasEventHandler(const Socket& socket, const Poco::AbstractObserver& observer); /// Returns true if the observer is registered with SocketReactor for the given socket. + [[maybe_unused]] bool hasEventHandler(const Socket & socket, const AbstractObserver & observer); - void removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer); /// Unregisters an event handler with the SocketReactor. /// /// Usage: - /// Poco::Observer obs(*this, &MyEventHandler::handleMyEvent); - /// reactor.removeEventHandler(obs); + /// Observer obs(*this, &MyEventHandler::handleMyEvent); + /// getWorkerReactor.removeEventHandler(obs); + void removeEventHandler(const Socket & socket, const AbstractObserver & observer); - bool has(const Socket& socket) const; /// Returns true if socket is registered with this rector. + bool has(const Socket & socket) const; protected: + /// Called if the timeout expires and no readable events are available. virtual void onTimeout(); - /// Called if the timeout expires and no other events are available. - /// - /// Can be overridden by subclasses. The default implementation - /// dispatches the TimeoutNotification and thus should be called by overriding - /// implementations. + /// Called if no sockets are available. virtual void onIdle(); - /// Called if no sockets are available to call select() on. - /// - /// Can be overridden by subclasses. The default implementation - /// dispatches the IdleNotification and thus should be called by overriding - /// implementations. - virtual void onShutdown(); /// Called when the SocketReactor is about to terminate. - /// - /// Can be overridden by subclasses. The default implementation - /// dispatches the ShutdownNotification and thus should be called by overriding - /// implementations. + virtual void onShutdown(); - virtual void onBusy(); /// Called when the SocketReactor is busy and at least one notification /// has been dispatched. - /// - /// Can be overridden by subclasses to perform additional - /// periodic tasks. The default implementation does nothing. + virtual void onBusy(); - void dispatch(const Socket& socket, SocketNotification* pNotification); - /// Dispatches the given notification to all observers - /// registered for the given socket. + /// Dispatches the given notification to observers which are registered for the given socket. + void dispatch(const Socket & socket, const Notification & notification); - void dispatch(SocketNotification* pNotification); /// Dispatches the given notification to all observers. + void dispatch(const Notification & notification); private: - using NotifierPtr = Poco::AutoPtr; - using NotificationPtr = Poco::AutoPtr; - typedef std::map EventHandlerMap; - typedef Poco::FastMutex MutexType; - typedef MutexType::ScopedLock ScopedLock; + using SocketNotifierMap = std::map; + + using MutexType = Poco::FastMutex; + using ScopedLock = MutexType::ScopedLock; + + void sleep(); + + void dispatch(SocketNotifierPtr & pNotifier, const Notification & notification); bool hasSocketHandlers(); - void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification); - NotifierPtr getNotifier(const Socket& socket, bool makeNew = false); + + SocketNotifierPtr getNotifier(const Socket & socket, bool makeNew = false); enum { DEFAULT_TIMEOUT = 250000 }; - std::atomic _stop; - Poco::Timespan _timeout; - EventHandlerMap _handlers; - PollSet _pollSet; - NotificationPtr _pReadableNotification; - NotificationPtr _pWritableNotification; - NotificationPtr _pErrorNotification; - NotificationPtr _pTimeoutNotification; - NotificationPtr _pIdleNotification; - NotificationPtr _pShutdownNotification; - MutexType _mutex; - std::atomic _pThread; - - friend class SocketNotifier; + /// + Poco::Timespan timeout; + std::atomic stopped; + + SocketNotifierMap notifiers; + PollSet poll_set; + + /// Notifications which will dispatched to observers + NotificationPtr rnf; + NotificationPtr wnf; + NotificationPtr enf; + NotificationPtr tnf; + NotificationPtr inf; + NotificationPtr snf; + + MutexType mutex; + Poco::Event event; + + Poco::Logger * log; }; +/// SocketReactor which run asynchronously. +class AsyncSocketReactor : public SocketReactor +{ +public: + explicit AsyncSocketReactor(const Poco::Timespan & timeout, const std::string & name); + ~AsyncSocketReactor() override; + + void run() override; + +protected: + void onIdle() override; + +private: + void startup(); + + Poco::Thread thread; + const std::string name; +}; + + +using SocketReactorPtr = std::shared_ptr; +using AsyncSocketReactorPtr = std::shared_ptr; + } diff --git a/src/Common/NIO/SvsSocketAcceptor.h b/src/Common/NIO/SvsSocketAcceptor.h deleted file mode 100644 index 3850db4789d..00000000000 --- a/src/Common/NIO/SvsSocketAcceptor.h +++ /dev/null @@ -1,239 +0,0 @@ -/** -* Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. and Contributors. -* SPDX-License-Identifier: BSL-1.0 -* -*/ -#pragma once - -#include -#include -#include -#include -#include -#include - -#include -#include - - -using Poco::Net::Socket; -using Poco::Net::ServerSocket; -using Poco::Net::StreamSocket; -using Poco::NObserver; -using Poco::AutoPtr; - - -namespace RK { - -template -class SvsSocketAcceptor -/// This class implements the Acceptor part of the Acceptor-Connector design pattern. -/// Only the difference from single-threaded version is documented here, For full -/// description see Poco::Net::SocketAcceptor documentation. -/// -/// This is a multi-threaded version of SocketAcceptor, it differs from the -/// single-threaded version in number of reactors (defaulting to number of processors) -/// that can be specified at construction time and is rotated in a round-robin fashion -/// by event handler. See ParallelSocketAcceptor::onAccept and -/// ParallelSocketAcceptor::createServiceHandler documentation and implementation for -/// details. -{ -public: - using ParallelReactor = SvsSocketReactor; - using Observer = Poco::Observer; - - explicit SvsSocketAcceptor( - const String& name, Context & keeper_context_, ServerSocket & socket, unsigned threads = Poco::Environment::processorCount()) - : name_(name), keeper_context(keeper_context_), socket_(socket), reactor_(nullptr), threads_(threads), next_(0) - /// Creates a ParallelSocketAcceptor using the given ServerSocket, - /// sets number of threads and populates the reactors vector. - { - init(); - } - - SvsSocketAcceptor( - const String& name, - Context & keeper_context_, - ServerSocket & socket, - SocketReactor & reactor, - const Poco::Timespan & timeout, - unsigned threads = Poco::Environment::processorCount()) - : name_(name) - , socket_(socket) - , reactor_(&reactor) - , threads_(threads) - , next_(0) - , keeper_context(keeper_context_) - , timeout_(timeout) - /// Creates a ParallelSocketAcceptor using the given ServerSocket, sets the - /// number of threads, populates the reactors vector and registers itself - /// with the given SocketReactor. - { - init(); - reactor_->addEventHandler(socket_, Observer(*this, &SvsSocketAcceptor::onAccept)); - /// It is necessary to wake up the reactor. - reactor_->wakeUp(); - } - - virtual ~SvsSocketAcceptor() - /// Destroys the ParallelSocketAcceptor. - { - try - { - if (reactor_) - { - reactor_->removeEventHandler(socket_, Observer(*this, &SvsSocketAcceptor::onAccept)); - } - } - catch (...) - { - } - } - - void setReactor(SocketReactor& reactor) - /// Sets the reactor for this acceptor. - { - registerAcceptor(reactor); - } - - virtual void registerAcceptor(SocketReactor& reactor) - /// Registers the ParallelSocketAcceptor with a SocketReactor. - /// - /// A subclass can override this function to e.g. - /// register an event handler for timeout event. - /// - /// The overriding method must either call the base class - /// implementation or register the accept handler on its own. - { - reactor_ = &reactor; - if (!reactor_->hasEventHandler(socket_, Observer(*this, &SvsSocketAcceptor::onAccept))) - { - reactor_->addEventHandler(socket_, Observer(*this, &SvsSocketAcceptor::onAccept)); - reactor_->wakeUp(); - } - } - - virtual void unregisterAcceptor() - /// Unregisters the ParallelSocketAcceptor. - /// - /// A subclass can override this function to e.g. - /// unregister its event handler for a timeout event. - /// - /// The overriding method must either call the base class - /// implementation or unregister the accept handler on its own. - { - if (reactor_) - { - reactor_->removeEventHandler(socket_, Observer(*this, &SvsSocketAcceptor::onAccept)); - } - } - - void onAccept(ReadableNotification* pNotification) - /// Accepts connection and creates event handler. - /// TODO why wait a moment? For when adding EventHandler it does not wake up register. - /// and need register event? no - { - pNotification->release(); - StreamSocket sock = socket_.acceptConnection(); - createServiceHandler(sock); - } - -protected: - using ReactorVec = std::vector; - - virtual ServiceHandler* createServiceHandler(StreamSocket& socket) - /// Create and initialize a new ServiceHandler instance. - /// If socket is already registered with a reactor, the new - /// ServiceHandler instance is given that reactor; otherwise, - /// the next reactor is used. Reactors are rotated in round-robin - /// fashion. - /// - /// Subclasses can override this method. - { - socket.setBlocking(false); - SocketReactor* pReactor = reactor(socket); - if (!pReactor) - { - std::size_t next = next_++; - if (next_ == reactors_.size()) next_ = 0; - pReactor = reactors_[next]; - } - auto* ret = new ServiceHandler(keeper_context, socket, *pReactor); - pReactor->wakeUp(); - return ret; - } - - SocketReactor* reactor(const Socket& socket) - /// Returns reactor where this socket is already registered - /// for polling, if found; otherwise returns null pointer. - { - typename ReactorVec::iterator it = reactors_.begin(); - typename ReactorVec::iterator end = reactors_.end(); - for (; it != end; ++it) - { - if ((*it)->has(socket)) return it->get(); - } - return nullptr; - } - - SocketReactor* reactor() - /// Returns a pointer to the SocketReactor where - /// this SocketAcceptor is registered. - /// - /// The pointer may be null. - { - return reactor_; - } - - Socket& socket() - /// Returns a reference to the SocketAcceptor's socket. - { - return socket_; - } - - void init() - /// Populates the reactors vector. - { - poco_assert (threads_ > 0); - - for (unsigned i = 0; i < threads_; ++i) - reactors_.push_back(new ParallelReactor(timeout_, name_ + "#" + std::to_string(i))); - } - - ReactorVec& reactors() - /// Returns reference to vector of reactors. - { - return reactors_; - } - - SocketReactor* reactor(std::size_t idx) - /// Returns reference to the reactor at position idx. - { - return reactors_.at(idx).get(); - } - - std::size_t next() - /// Returns the next reactor index. - { - return next_; - } - -private: - SvsSocketAcceptor() = delete; - SvsSocketAcceptor(const SvsSocketAcceptor &) = delete; - SvsSocketAcceptor & operator = (const SvsSocketAcceptor &) = delete; - - String name_; - - ServerSocket socket_; - SocketReactor* reactor_; - unsigned threads_; - ReactorVec reactors_; - std::size_t next_; - - Context & keeper_context; - Poco::Timespan timeout_; -}; - - -} diff --git a/src/Common/NIO/SvsSocketReactor.h b/src/Common/NIO/SvsSocketReactor.h deleted file mode 100644 index 431284ab9f3..00000000000 --- a/src/Common/NIO/SvsSocketReactor.h +++ /dev/null @@ -1,67 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - - - -namespace RK { - -template -class SvsSocketReactor : public SR -{ -public: - using Ptr = Poco::SharedPtr; - - SvsSocketReactor(const std::string& name = "") : _name(name) - { - _thread.start(*this); - } - - SvsSocketReactor(const Poco::Timespan& timeout, const std::string& name = ""): - SR(timeout), _name(name) - { - _thread.start(*this); - } - - void run() override - { - if (!_name.empty()) - { - _thread.setName(_name); - setThreadName(_name.c_str()); - } - SR::run(); - } - - ~SvsSocketReactor() override - { - try - { - this->stop(); - _thread.join(); - } - catch (...) - { - } - } - -protected: - void onIdle() override - { - SR::onIdle(); - Poco::Thread::yield(); - } - -private: - Poco::Thread _thread; - std::string _name; -}; - -} diff --git a/src/Service/ConnectionHandler.cpp b/src/Service/ConnectionHandler.cpp index bc9104e7ea9..49ace914122 100644 --- a/src/Service/ConnectionHandler.cpp +++ b/src/Service/ConnectionHandler.cpp @@ -81,11 +81,11 @@ ConnectionHandler::ConnectionHandler(Context & global_context_, StreamSocket & s LOG_DEBUG(log, "New connection from {}", peer); registerConnection(this); - auto read_handler = NObserver(*this, &ConnectionHandler::onSocketReadable); - auto error_handler = NObserver(*this, &ConnectionHandler::onSocketError); - auto shutdown_handler = NObserver(*this, &ConnectionHandler::onReactorShutdown); + auto read_handler = Observer(*this, &ConnectionHandler::onSocketReadable); + auto error_handler = Observer(*this, &ConnectionHandler::onSocketError); + auto shutdown_handler = Observer(*this, &ConnectionHandler::onReactorShutdown); - std::vector handlers; + std::vector handlers; handlers.push_back(&read_handler); handlers.push_back(&error_handler); handlers.push_back(&shutdown_handler); @@ -99,17 +99,17 @@ ConnectionHandler::~ConnectionHandler() LOG_INFO(log, "Disconnecting peer {}#{}", peer, toHexString(session_id.load())); unregisterConnection(this); - reactor.removeEventHandler(sock, NObserver(*this, &ConnectionHandler::onSocketReadable)); - reactor.removeEventHandler(sock, NObserver(*this, &ConnectionHandler::onSocketWritable)); - reactor.removeEventHandler(sock, NObserver(*this, &ConnectionHandler::onSocketError)); - reactor.removeEventHandler(sock, NObserver(*this, &ConnectionHandler::onReactorShutdown)); + reactor.removeEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketReadable)); + reactor.removeEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketWritable)); + reactor.removeEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketError)); + reactor.removeEventHandler(sock, Observer(*this, &ConnectionHandler::onReactorShutdown)); } catch (...) { } } -void ConnectionHandler::onSocketReadable(const AutoPtr & /*pNf*/) +void ConnectionHandler::onSocketReadable(const Notification &) { try { @@ -248,7 +248,7 @@ void ConnectionHandler::onSocketReadable(const AutoPtr & / } } -void ConnectionHandler::onSocketWritable(const AutoPtr &) +void ConnectionHandler::onSocketWritable(const Notification &) { LOG_TRACE(log, "Peer {}#{} is writable", peer, toHexString(session_id.load())); @@ -264,7 +264,7 @@ void ConnectionHandler::onSocketWritable(const AutoPtr &) { LOG_TRACE(log, "Remove socket writable event handler for peer {}", peer); reactor.removeEventHandler( - sock, NObserver(*this, &ConnectionHandler::onSocketWritable)); + sock, Observer(*this, &ConnectionHandler::onSocketWritable)); } } } @@ -350,13 +350,13 @@ void ConnectionHandler::onSocketWritable(const AutoPtr &) } } -void ConnectionHandler::onReactorShutdown(const AutoPtr & /*pNf*/) +void ConnectionHandler::onReactorShutdown(const Notification &) { - LOG_INFO(log, "Reactor shutdown!"); + LOG_INFO(log, "Reactor of peer {} shutdown!", peer); destroyMe(); } -void ConnectionHandler::onSocketError(const AutoPtr & /*pNf*/) +void ConnectionHandler::onSocketError(const Notification &) { LOG_WARNING(log, "Socket error for peer {}#{}, errno {} !", peer, toHexString(session_id.load()), errno); destroyMe(); @@ -474,7 +474,7 @@ Coordination::OpNum ConnectionHandler::receiveHandshake(int32_t handshake_req_le Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); /// Generate id for new session request and use session_id for update session request - auto id = opnum == OpNum::NewSession ? keeper_dispatcher->getNewSessionInternalId() : previous_session_id; + auto id = opnum == OpNum::NewSession ? keeper_dispatcher->getAndAddInternalId() : previous_session_id; if (opnum == OpNum::NewSession) { @@ -661,10 +661,10 @@ void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKe } /// Trigger socket writable event - reactor.addEventHandler(sock, NObserver(*this, &ConnectionHandler::onSocketWritable)); + reactor.addEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketWritable)); } - /// We must wake up reactor to interrupt it's sleeping. + /// We must wake up getWorkerReactor to interrupt it's sleeping. reactor.wakeUp(); } diff --git a/src/Service/ConnectionHandler.h b/src/Service/ConnectionHandler.h index b43a5addcb1..d1460655580 100644 --- a/src/Service/ConnectionHandler.h +++ b/src/Service/ConnectionHandler.h @@ -15,12 +15,10 @@ #include #include +#include #include #include -#include -#include -#include "ZooKeeper/ZooKeeperCommon.h" #include #include #include @@ -31,7 +29,6 @@ namespace RK { using Poco::Net::StreamSocket; -using Poco::AutoPtr; using Poco::FIFOBuffer; using Poco::Logger; using Poco::Thread; @@ -40,7 +37,7 @@ using Poco::Thread; * User connection handler with TCP protocol. It is a core class who process * Zookeeper network protocol and send it to dispatcher. * - * We utilize a reactor network programming model. We allocate a handler for + * We utilize a getWorkerReactor network programming model. We allocate a handler for * every connection and ensure that every handler run in the same network thread. * * So there is no multi-thread issues. @@ -65,11 +62,11 @@ class ConnectionHandler ~ConnectionHandler(); /// socket events: readable, writable, error - void onSocketReadable(const AutoPtr & pNf); - void onSocketWritable(const AutoPtr & pNf); + void onSocketReadable(const Notification &); + void onSocketWritable(const Notification &); - void onReactorShutdown(const AutoPtr & pNf); - void onSocketError(const AutoPtr & pNf); + void onReactorShutdown(const Notification &); + void onSocketError(const Notification &); /// current connection statistics ConnectionStats getConnectionStats() const; @@ -79,17 +76,6 @@ class ConnectionHandler void resetStats(); private: - /// client hand shake result - struct HandShakeResult - { - /// handshake result - bool connect_success{}; - bool session_expired{}; - - /// whether is reconnected request - bool is_reconnected{}; - }; - Coordination::OpNum receiveHandshake(int32_t handshake_length); static bool isHandShake(Int32 & handshake_length); diff --git a/src/Service/Context.h b/src/Service/Context.h index aeabb9a1852..d8157628017 100644 --- a/src/Service/Context.h +++ b/src/Service/Context.h @@ -7,7 +7,6 @@ #include #include #include -#include #include #include diff --git a/src/Service/ForwardConnection.cpp b/src/Service/ForwardConnection.cpp index 70259aaaa4f..89a14b2adf2 100644 --- a/src/Service/ForwardConnection.cpp +++ b/src/Service/ForwardConnection.cpp @@ -12,8 +12,8 @@ namespace ErrorCodes extern const int ALL_CONNECTION_TRIES_FAILED; extern const int NETWORK_ERROR; extern const int UNEXPECTED_FORWARD_PACKET; - extern const int RAFT_FORWARDING_ERROR; - extern const int FORWARDING_DISCONNECTED; + extern const int RAFT_FORWARD_ERROR; + extern const int FORWARD_NOT_CONNECTED; } void ForwardConnection::connect() @@ -44,7 +44,7 @@ void ForwardConnection::connect() LOG_TRACE(log, "Sent handshake to {}", endpoint); if (!receiveHandshake()) - throw Exception(ErrorCodes::RAFT_FORWARDING_ERROR, "Handshake with {} failed", endpoint); + throw Exception(ErrorCodes::RAFT_FORWARD_ERROR, "Handshake with {} failed", endpoint); connected = true; LOG_TRACE(log, "Connect to {} success", endpoint); @@ -76,7 +76,7 @@ void ForwardConnection::send(ForwardRequestPtr request) if (!connected) throw Exception("Connect to server failed", ErrorCodes::ALL_CONNECTION_TRIES_FAILED); - LOG_TRACE(log, "Forwarding request {} to endpoint {}", request->toString(), endpoint); + LOG_TRACE(log, "Forward request {} to endpoint {}", request->toString(), endpoint); try { @@ -99,7 +99,7 @@ bool ForwardConnection::poll(UInt64 timeout_microseconds) void ForwardConnection::receive(ForwardResponsePtr & response) { if (!connected) - throw Exception("Forwarding connection disconnected", ErrorCodes::FORWARDING_DISCONNECTED); + throw Exception(ErrorCodes::FORWARD_NOT_CONNECTED, "Forwarding connection disconnected"); /// There are two situations, /// 1. Feedback not accepted. diff --git a/src/Service/ForwardConnectionHandler.cpp b/src/Service/ForwardConnectionHandler.cpp index 5bd53707eb8..34b1e581f6e 100644 --- a/src/Service/ForwardConnectionHandler.cpp +++ b/src/Service/ForwardConnectionHandler.cpp @@ -12,8 +12,6 @@ namespace RK { -using Poco::NObserver; - ForwardConnectionHandler::ForwardConnectionHandler(Context & global_context_, StreamSocket & socket_, SocketReactor & reactor_) : log(&Logger::get("ForwardConnectionHandler")) @@ -22,21 +20,20 @@ ForwardConnectionHandler::ForwardConnectionHandler(Context & global_context_, St , global_context(global_context_) , keeper_dispatcher(global_context.getDispatcher()) , responses(std::make_unique()) + , need_destroy(false) { - LOG_INFO(log, "New forwarding connection from {}", sock.peerAddress().toString()); + LOG_INFO(log, "New forward connection from {}", sock.peerAddress().toString()); - auto read_handler = NObserver(*this, &ForwardConnectionHandler::onSocketReadable); - auto error_handler = NObserver(*this, &ForwardConnectionHandler::onSocketError); + auto read_handler = Observer(*this, &ForwardConnectionHandler::onSocketReadable); + auto error_handler = Observer(*this, &ForwardConnectionHandler::onSocketError); auto shutdown_handler - = NObserver(*this, &ForwardConnectionHandler::onReactorShutdown); + = Observer(*this, &ForwardConnectionHandler::onReactorShutdown); - std::vector handlers; + std::vector handlers; handlers.push_back(&read_handler); handlers.push_back(&error_handler); handlers.push_back(&shutdown_handler); reactor.addEventHandlers(sock, handlers); - - need_destroy = false; } ForwardConnectionHandler::~ForwardConnectionHandler() @@ -44,24 +41,24 @@ ForwardConnectionHandler::~ForwardConnectionHandler() try { reactor.removeEventHandler( - sock, NObserver(*this, &ForwardConnectionHandler::onSocketReadable)); + sock, Observer(*this, &ForwardConnectionHandler::onSocketReadable)); reactor.removeEventHandler( - sock, NObserver(*this, &ForwardConnectionHandler::onSocketWritable)); + sock, Observer(*this, &ForwardConnectionHandler::onSocketWritable)); reactor.removeEventHandler( - sock, NObserver(*this, &ForwardConnectionHandler::onSocketError)); + sock, Observer(*this, &ForwardConnectionHandler::onSocketError)); reactor.removeEventHandler( - sock, NObserver(*this, &ForwardConnectionHandler::onReactorShutdown)); + sock, Observer(*this, &ForwardConnectionHandler::onReactorShutdown)); } catch (...) { } } -void ForwardConnectionHandler::onSocketReadable(const AutoPtr & /*pNf*/) +void ForwardConnectionHandler::onSocketReadable(const Notification &) { try { - LOG_TRACE(log, "Forwarding handler socket readable"); + LOG_TRACE(log, "Forward handler socket readable"); if (!sock.available()) { LOG_INFO(log, "Client close connection!"); @@ -71,7 +68,7 @@ void ForwardConnectionHandler::onSocketReadable(const AutoPtrbegin(), req_body_buf->used()); request->readImpl(body); - keeper_dispatcher->pushForwardingRequest(server_id, client_id, request); + keeper_dispatcher->pushForwardRequest(server_id, client_id, request); } void ForwardConnectionHandler::processHandshake() @@ -261,7 +258,7 @@ void ForwardConnectionHandler::processHandshake() keeper_dispatcher->invokeForwardResponseCallBack({server_id, client_id}, response); } -void ForwardConnectionHandler::onSocketWritable(const AutoPtr &) +void ForwardConnectionHandler::onSocketWritable(const Notification &) { LOG_TRACE(log, "Forwarder socket writable"); @@ -278,7 +275,7 @@ void ForwardConnectionHandler::onSocketWritable(const AutoPtr( + Observer( *this, &ForwardConnectionHandler::onSocketWritable)); } } @@ -358,13 +355,13 @@ void ForwardConnectionHandler::onSocketWritable(const AutoPtr & /*pNf*/) +void ForwardConnectionHandler::onReactorShutdown(const Notification &) { LOG_INFO(log, "Reactor shutdown!"); destroyMe(); } -void ForwardConnectionHandler::onSocketError(const AutoPtr & /*pNf*/) +void ForwardConnectionHandler::onSocketError(const Notification &) { destroyMe(); } @@ -388,10 +385,10 @@ void ForwardConnectionHandler::sendResponse(ForwardResponsePtr response) responses->push(buf.getBuffer()); /// Trigger socket writable event reactor.addEventHandler( - sock, NObserver(*this, &ForwardConnectionHandler::onSocketWritable)); + sock, Observer(*this, &ForwardConnectionHandler::onSocketWritable)); } - /// We must wake up reactor to interrupt it's sleeping. + /// We must wake up getWorkerReactor to interrupt it's sleeping. reactor.wakeUp(); } diff --git a/src/Service/ForwardConnectionHandler.h b/src/Service/ForwardConnectionHandler.h index 93d79a7b869..e55d563d9e4 100644 --- a/src/Service/ForwardConnectionHandler.h +++ b/src/Service/ForwardConnectionHandler.h @@ -14,10 +14,9 @@ #include #include +#include #include #include -#include -#include #include #include @@ -28,13 +27,12 @@ namespace RK { using Poco::Net::StreamSocket; -using Poco::AutoPtr; using Poco::FIFOBuffer; using Poco::Logger; using Poco::Thread; /** - * Server endpoint for forwarding request. + * Server endpoint for forward request. */ class ForwardConnectionHandler { @@ -42,13 +40,12 @@ class ForwardConnectionHandler ForwardConnectionHandler(Context & global_context_, StreamSocket & socket_, SocketReactor & reactor_); ~ForwardConnectionHandler(); - void onSocketReadable(const AutoPtr & pNf); - void onSocketWritable(const AutoPtr & pNf); - void onReactorShutdown(const AutoPtr & pNf); - void onSocketError(const AutoPtr & pNf); + void onSocketReadable(const Notification &); + void onSocketWritable(const Notification &); + void onReactorShutdown(const Notification &); + void onSocketError(const Notification &); private: - void sendResponse(ForwardResponsePtr response); /// destroy connection @@ -93,8 +90,8 @@ class ForwardConnectionHandler void processUserOrSessionRequest(ForwardRequestPtr request); void processSyncSessionsRequest(ForwardRequestPtr request); - // The connection is stale and need destroyed - bool need_destroy; + /// The connection is stale and need destroyed + std::atomic need_destroy; }; } diff --git a/src/Service/KeeperDispatcher.cpp b/src/Service/KeeperDispatcher.cpp index c798ef50577..bcc4e060c26 100644 --- a/src/Service/KeeperDispatcher.cpp +++ b/src/Service/KeeperDispatcher.cpp @@ -25,7 +25,6 @@ KeeperDispatcher::KeeperDispatcher() , request_processor(std::make_shared(responses_queue)) , request_accumulator(request_processor) , request_forwarder(request_processor) - , new_session_internal_id_counter(1) { } @@ -140,7 +139,7 @@ void KeeperDispatcher::invokeResponseCallBack(int64_t session_id, const Coordina } } -void KeeperDispatcher::invokeForwardResponseCallBack(ForwardingClientId client_id, ForwardResponsePtr response) +void KeeperDispatcher::invokeForwardResponseCallBack(ForwardClientId client_id, ForwardResponsePtr response) { std::lock_guard lock(forward_response_callbacks_mutex); auto forward_response_writer = forward_response_callbacks.find(client_id); @@ -204,7 +203,7 @@ bool KeeperDispatcher::pushRequest(const Coordination::ZooKeeperRequestPtr & req } -bool KeeperDispatcher::pushForwardingRequest(size_t server_id, size_t client_id, ForwardRequestPtr request) +bool KeeperDispatcher::pushForwardRequest(size_t server_id, size_t client_id, ForwardRequestPtr request) { RequestForSession && request_info = request->requestForSession(); @@ -216,7 +215,7 @@ bool KeeperDispatcher::pushForwardingRequest(size_t server_id, size_t client_id, LOG_TRACE( log, - "Push forwarding request #{}#{}#{} which is from server {} client {}", + "Push forward request #{}#{}#{} which is from server {} client {}", toHexString(request_info.session_id), request_info.request->xid, Coordination::toString(request_info.request->getOpNum()), @@ -230,7 +229,7 @@ bool KeeperDispatcher::pushForwardingRequest(size_t server_id, size_t client_id, throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push request to queue"); } else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->raft_settings->operation_timeout_ms)) - throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push forwarding request to queue within operation timeout"); + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push forward request to queue within operation timeout"); return true; } @@ -314,22 +313,16 @@ void KeeperDispatcher::shutdown() responses_thread->wait(); } - LOG_INFO(log, "Shutting down request forwarder"); request_forwarder.shutdown(); - - LOG_INFO(log, "Shutting down request accumulator"); request_accumulator.shutdown(); - - LOG_INFO(log, "Shutting down request processor"); request_processor->shutdown(); if (server) { - LOG_INFO(log, "Shutting down server"); server->shutdown(); } - LOG_INFO(log, "for unhandled requests sending session expired error to client."); + LOG_INFO(log, "Sending session expired error for unhandled request."); RequestForSession request_for_session; while (requests_queue->tryPopAny(request_for_session)) { @@ -398,7 +391,7 @@ void KeeperDispatcher::unregisterUserResponseCallBackWithoutLock(int64_t session user_response_callbacks.erase(it); } -void KeeperDispatcher::registerForwarderResponseCallBack(ForwardingClientId client_id, ForwardResponseCallback callback) +void KeeperDispatcher::registerForwarderResponseCallBack(ForwardClientId client_id, ForwardResponseCallback callback) { std::lock_guard lock(forward_response_callbacks_mutex); @@ -406,7 +399,7 @@ void KeeperDispatcher::registerForwarderResponseCallBack(ForwardingClientId clie { LOG_WARNING( log, - "Receive new forwarding connection from server_id {}, client_id {}, will destroy the older one", + "Receive new forward connection from server_id {}, client_id {}, will destroy the older one", client_id.first, client_id.second); auto & call_back = forward_response_callbacks[client_id]; @@ -418,7 +411,7 @@ void KeeperDispatcher::registerForwarderResponseCallBack(ForwardingClientId clie forward_response_callbacks.emplace(client_id, callback); } -void KeeperDispatcher::unRegisterForwarderResponseCallBack(ForwardingClientId client_id) +void KeeperDispatcher::unRegisterForwarderResponseCallBack(ForwardClientId client_id) { std::lock_guard lock(forward_response_callbacks_mutex); auto forward_response_writer = forward_response_callbacks.find(client_id); diff --git a/src/Service/KeeperDispatcher.h b/src/Service/KeeperDispatcher.h index 342640b559a..3dd65a0b9b5 100644 --- a/src/Service/KeeperDispatcher.h +++ b/src/Service/KeeperDispatcher.h @@ -60,8 +60,8 @@ class KeeperDispatcher : public std::enable_shared_from_this }; /// - using ForwardingClientId = std::pair; - using ForwardResponseCallbacks = std::unordered_map; + using ForwardClientId = std::pair; + using ForwardResponseCallbacks = std::unordered_map; ForwardResponseCallbacks forward_response_callbacks; std::mutex forward_response_callbacks_mutex; @@ -122,8 +122,8 @@ class KeeperDispatcher : public std::enable_shared_from_this /// Push new session or update session request bool pushSessionRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t internal_id); - /// Push forwarding request - bool pushForwardingRequest(size_t server_id, size_t client_id, ForwardRequestPtr request); + /// Push forward request + bool pushForwardRequest(size_t server_id, size_t client_id, ForwardRequestPtr request); /// TODO remove int64_t newSession(int64_t session_timeout_ms) { return server->newSession(session_timeout_ms); } @@ -133,8 +133,8 @@ class KeeperDispatcher : public std::enable_shared_from_this } /// Register response callback for forwarder - void registerForwarderResponseCallBack(ForwardingClientId client_id, ForwardResponseCallback callback); - void unRegisterForwarderResponseCallBack(ForwardingClientId client_id); + void registerForwarderResponseCallBack(ForwardClientId client_id, ForwardResponseCallback callback); + void unRegisterForwarderResponseCallBack(ForwardClientId client_id); /// Register response callback for user request [[maybe_unused]] void registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected = false); @@ -163,8 +163,8 @@ class KeeperDispatcher : public std::enable_shared_from_this /// Invoked when a request completes. void updateKeeperStatLatency(uint64_t process_time_ms); - /// Send forwarding response - void invokeForwardResponseCallBack(ForwardingClientId client_id, ForwardResponsePtr response); + /// Send forward response + void invokeForwardResponseCallBack(ForwardClientId client_id, ForwardResponsePtr response); /// Are we leader bool isLeader() const { return server->isLeader(); } @@ -215,15 +215,15 @@ class KeeperDispatcher : public std::enable_shared_from_this /// Request to be leader bool requestLeader() { return server->requestLeader(); } - /// return process start time in us. + /// Return process start time in us. int64_t uptimeFromStartup() { return Poco::Timestamp() - uptime; } /// My server id int32_t myId() const { return server->myId(); } - /// When user create new session, we use this id as request id. - /// Note that the internal id for different nodes can be same. - int64_t getNewSessionInternalId() { return new_session_internal_id_counter.fetch_add(server->getClusterNodeCount()); } + /// When user creating new session, we use this id as fake session id. + /// Note that the internal id for different nodes can not be same. + int64_t getAndAddInternalId() { return new_session_internal_id_counter.fetch_add(server->getClusterNodeCount()); } }; } diff --git a/src/Service/KeeperServer.cpp b/src/Service/KeeperServer.cpp index 4a7deeaf8e3..4a7ca346261 100644 --- a/src/Service/KeeperServer.cpp +++ b/src/Service/KeeperServer.cpp @@ -108,8 +108,6 @@ void KeeperServer::shutdown() LOG_WARNING(log, "Log store flush error while server shutdown."); dynamic_cast(*state_manager->load_log_store()).shutdown(); - - LOG_INFO(log, "Shutting down state machine."); state_machine->shutdown(); LOG_INFO(log, "Shut down NuRaft core done!"); diff --git a/src/Service/NuRaftStateMachine.cpp b/src/Service/NuRaftStateMachine.cpp index 35eb7089359..b05eadaaad7 100644 --- a/src/Service/NuRaftStateMachine.cpp +++ b/src/Service/NuRaftStateMachine.cpp @@ -226,11 +226,11 @@ ptr NuRaftStateMachine::serializeRequest(RequestForSession & session_req ptr NuRaftStateMachine::pre_commit(const ulong log_idx, buffer & data) { - LOG_TRACE(log, "pre commit, log indx {}, data size {}", log_idx, data.size()); + LOG_TRACE(log, "pre commit, log index {}, data size {}", log_idx, data.size()); return nullptr; } -/// Do nothing, as this example doesn't do anything on pre-commit. +/// Do nothing, as doesn't do anything on pre-commit. void NuRaftStateMachine::rollback(const ulong log_idx, buffer & data) { LOG_TRACE(log, "pre commit, log index {}, data size {}", log_idx, data.size()); @@ -304,13 +304,7 @@ nuraft::ptr NuRaftStateMachine::commit(const ulong log_idx, nura auto request_for_session = parseRequest(data); KeeperStore::ResponsesForSessions responses_for_sessions; - LOG_TRACE( - log, - "Commit log index {}, session {}, xid {}, request {}", - log_idx, - toHexString(request_for_session.session_id), - request_for_session.request->xid, - request_for_session.request->toString()); + LOG_TRACE(log, "Commit log index {} fore request {}", log_idx, request_for_session.toSimpleString()); if (request_for_session.create_time > 0) { @@ -318,12 +312,10 @@ nuraft::ptr NuRaftStateMachine::commit(const ulong log_idx, nura if (elapsed > 1000) LOG_WARNING( log, - "Commit log {} request process time {}ms, session {} xid {} req type {}", + "When committing log {} for request {}, the time has passed {}ms, it is a little long.", log_idx, - elapsed, - toHexString(request_for_session.session_id), - request_for_session.request->xid, - Coordination::toString(request_for_session.request->getOpNum())); + request_for_session.toSimpleString(), + elapsed); } if (request_processor) @@ -343,7 +335,7 @@ nuraft::ptr NuRaftStateMachine::commit(const ulong log_idx, buff return commit(log_idx, data, false); } -void NuRaftStateMachine::processReadRequest(const RequestForSession & request_for_session) +[[maybe_unused]] void NuRaftStateMachine::processReadRequest(const RequestForSession & request_for_session) { store.processRequest(responses_queue, request_for_session); } diff --git a/src/Service/NuRaftStateMachine.h b/src/Service/NuRaftStateMachine.h index 456be51fb95..6597fe218be 100644 --- a/src/Service/NuRaftStateMachine.h +++ b/src/Service/NuRaftStateMachine.h @@ -231,7 +231,7 @@ class NuRaftStateMachine : public nuraft::state_machine KeeperStore & getStore() { return store; } /// process read request - void processReadRequest(const RequestForSession & request_for_session); + [[maybe_unused]] void processReadRequest(const RequestForSession & request_for_session); /// get expired session std::vector getDeadSessions(); diff --git a/src/Service/NuRaftStateManager.cpp b/src/Service/NuRaftStateManager.cpp index a8e940a44ca..a497093b586 100644 --- a/src/Service/NuRaftStateManager.cpp +++ b/src/Service/NuRaftStateManager.cpp @@ -209,6 +209,7 @@ ConfigUpdateActions NuRaftStateManager::getConfigurationDiff(const Poco::Util::A size_t NuRaftStateManager::getClusterNodeCount() const { + std::lock_guard lock(cluster_config_mutex); return cur_cluster_config->get_servers().size(); } diff --git a/src/Service/RequestForwarder.cpp b/src/Service/RequestForwarder.cpp index a94748dd45f..b4f7aa0153f 100644 --- a/src/Service/RequestForwarder.cpp +++ b/src/Service/RequestForwarder.cpp @@ -8,7 +8,7 @@ namespace RK namespace ErrorCodes { - extern const int RAFT_FORWARDING_ERROR; + extern const int RAFT_FORWARD_ERROR; extern const int RAFT_IS_LEADER; extern const int RAFT_NO_LEADER; extern const int RAFT_FWD_NO_CONN; @@ -23,7 +23,7 @@ void RequestForwarder::runSend(RunnerId runner_id) { setThreadName(("ReqFwdSend#" + toString(runner_id)).c_str()); - LOG_DEBUG(log, "Starting forwarding request sending thread."); + LOG_DEBUG(log, "Starting forward request sending thread."); while (!shutdown_called) { UInt64 max_wait = session_sync_period_ms; @@ -62,7 +62,7 @@ void RequestForwarder::runSend(RunnerId runner_id) forward_request->send_time = clock::now(); connection->send(forward_request); - forwarding_queues[runner_id]->push(std::move(forward_request)); + forward_request_queue[runner_id]->push(std::move(forward_request)); } catch (...) { @@ -104,14 +104,13 @@ void RequestForwarder::runSend(RunnerId runner_id) ForwardRequestPtr forward_request = std::make_shared(std::move(session_to_expiration_time)); forward_request->send_time = clock::now(); connection->send(forward_request); - forwarding_queues[runner_id]->push(std::move(forward_request)); + forward_request_queue[runner_id]->push(std::move(forward_request)); } } else { throw Exception( - "Not found connection when sending sessions for runner " + std::to_string(runner_id), - ErrorCodes::RAFT_FORWARDING_ERROR); + ErrorCodes::RAFT_FORWARD_ERROR, "Not found connection when sending sessions for runner {}", runner_id); } } catch (...) @@ -130,7 +129,7 @@ void RequestForwarder::runReceive(RunnerId runner_id) { setThreadName(("ReqFwdRecv#" + toString(runner_id)).c_str()); - LOG_DEBUG(log, "Starting forwarding response receiving thread."); + LOG_DEBUG(log, "Starting forward response receiving thread."); while (!shutdown_called) { try @@ -140,7 +139,7 @@ void RequestForwarder::runReceive(RunnerId runner_id) /// Check if the earliest request has timed out. And handle all timed out requests. ForwardRequestPtr earliest_request; - if (forwarding_queues[runner_id]->peek(earliest_request)) + if (forward_request_queue[runner_id]->peek(earliest_request)) { auto earliest_request_deadline = earliest_request->send_time + std::chrono::microseconds(operation_timeout.totalMicroseconds()); if (now > earliest_request_deadline) @@ -195,7 +194,7 @@ void RequestForwarder::runReceive(RunnerId runner_id) } catch (...) { - tryLogCurrentException(log, "Error when receiving forwarding response, runner " + std::to_string(runner_id)); + tryLogCurrentException(log, "Error when receiving forward response, runner " + std::to_string(runner_id)); std::this_thread::sleep_for(std::chrono::milliseconds(session_sync_period_ms)); } } @@ -203,7 +202,7 @@ void RequestForwarder::runReceive(RunnerId runner_id) bool RequestForwarder::processTimeoutRequest(RunnerId runner_id, ForwardRequestPtr newFront) { - LOG_INFO(log, "Process timeout request for runner {} queue size {}", runner_id, forwarding_queues[runner_id]->size()); + LOG_INFO(log, "Process timeout request for runner {} queue size {}", runner_id, forward_request_queue[runner_id]->size()); clock::time_point now = clock::now(); @@ -226,13 +225,13 @@ bool RequestForwarder::processTimeoutRequest(RunnerId runner_id, ForwardRequestP } }; - return forwarding_queues[runner_id]->removeFrontIf(func, newFront); + return forward_request_queue[runner_id]->removeFrontIf(func, newFront); } bool RequestForwarder::removeFromQueue(RunnerId runner_id, ForwardResponsePtr forward_response_ptr) { - return forwarding_queues[runner_id]->findAndRemove([forward_response_ptr](const ForwardRequestPtr & request) -> bool + return forward_request_queue[runner_id]->findAndRemove([forward_response_ptr](const ForwardRequestPtr & request) -> bool { if (request->forwardType() != forward_response_ptr->forwardType()) return false; @@ -266,9 +265,9 @@ void RequestForwarder::shutdown() request_thread->wait(); response_thread->wait(); - for (auto & forwarding_queue : forwarding_queues) + for (auto & queue : forward_request_queue) { - forwarding_queue->forEach([this](const ForwardRequestPtr & request) -> bool + queue->forEach([this](const ForwardRequestPtr & request) -> bool { ForwardResponsePtr response = request->makeResponse(); response->setAppendEntryResult(false, nuraft::cmd_result_code::FAILED); @@ -380,7 +379,7 @@ void RequestForwarder::initialize( for (RunnerId runner_id = 0; runner_id < thread_count; runner_id++) { - forwarding_queues.push_back(std::make_unique()); + forward_request_queue.push_back(std::make_unique()); } initConnections(); diff --git a/src/Service/RequestForwarder.h b/src/Service/RequestForwarder.h index 83078e34795..24888ae79aa 100644 --- a/src/Service/RequestForwarder.h +++ b/src/Service/RequestForwarder.h @@ -72,9 +72,9 @@ class RequestForwarder std::atomic session_sync_idx{0}; Stopwatch session_sync_time_watch; - using ForwardingQueue = ThreadSafeQueue>; - using ForwardingQueuePtr = std::unique_ptr; - std::vector forwarding_queues; + using ForwardRequestQueue = ThreadSafeQueue>; + using ForwardRequestQueuePtr = std::unique_ptr; + std::vector forward_request_queue; Poco::Timespan operation_timeout; diff --git a/src/Service/RequestProcessor.cpp b/src/Service/RequestProcessor.cpp index 15248337e33..1dc86a9d300 100644 --- a/src/Service/RequestProcessor.cpp +++ b/src/Service/RequestProcessor.cpp @@ -220,7 +220,7 @@ void RequestProcessor::processCommittedRequest(size_t count) LOG_WARNING( log, "Found session {} in pending_queue while it is not local, maybe because of connection disconnected. " - "Just delete from pending queue", + "Just delete from pending queue.", toHexString(committed_request.session_id)); my_pending_requests.erase(committed_request.session_id); } @@ -312,7 +312,7 @@ void RequestProcessor::processErrorRequest(size_t count) LOG_WARNING( log, "Found session {} in pending_queue while it is not local, maybe because of connection disconnected. " - "Just delete from pending queue", + "Just delete from pending queue.", toHexString(session_id)); my_pending_requests.erase(session_id); }