diff --git a/src/Common/NIO/PollSet.cpp b/src/Common/NIO/PollSet.cpp index e8193e2d35..a5916f2bfe 100644 --- a/src/Common/NIO/PollSet.cpp +++ b/src/Common/NIO/PollSet.cpp @@ -203,8 +203,13 @@ PollSet::SocketModeMap PollSetImpl::poll(const Poco::Timespan & timeout) { Poco::Timestamp start; rc = epoll_wait(epoll_fd, &events[0], events.size(), remaining_time.totalMilliseconds()); + if (rc == 0) + { + LOG_TRACE(log, "epoll_wait got 0 events"); return result; + } + if (rc < 0 && errno == POCO_EINTR) { Poco::Timestamp end; @@ -212,11 +217,11 @@ PollSet::SocketModeMap PollSetImpl::poll(const Poco::Timespan & timeout) if (waited < remaining_time) remaining_time -= waited; else - remaining_time = 0; + break; /// timeout } } while (rc < 0 && errno == POCO_EINTR); - if (rc < 0) + if (rc < 0 && errno != POCO_EINTR) throwFromErrno("Error when epoll waiting", ErrorCodes::EPOLL_WAIT, errno); LOG_TRACE(log, "Got {} events", rc); diff --git a/src/Common/NIO/SocketAcceptor.h b/src/Common/NIO/SocketAcceptor.h index a30d5c7d38..36816e7412 100644 --- a/src/Common/NIO/SocketAcceptor.h +++ b/src/Common/NIO/SocketAcceptor.h @@ -22,6 +22,11 @@ namespace RK { +namespace ErrorCodes +{ + extern const int EPOLL_ERROR; +} + using Poco::Net::ServerSocket; using Poco::Net::Socket; using Poco::Net::StreamSocket; @@ -43,6 +48,7 @@ class SocketAcceptor using WorkerReactorPtr = AsyncSocketReactorPtr; using WorkerReactors = std::vector; using AcceptorObserver = Observer; + using ErrorObserver = Observer; SocketAcceptor() = delete; SocketAcceptor(const SocketAcceptor &) = delete; @@ -71,7 +77,10 @@ class SocketAcceptor try { if (main_reactor) + { main_reactor->removeEventHandler(socket, AcceptorObserver(*this, &SocketAcceptor::onAccept)); + main_reactor->removeEventHandler(socket, ErrorObserver(*this, &SocketAcceptor::onError)); + } } catch (...) { @@ -96,6 +105,12 @@ class SocketAcceptor } } + virtual void onError(const Notification & enf) + { + auto error_no = dynamic_cast(&enf)->getErrorNo(); + throwFromErrno("Error when accepting connection for " + socket.address().toString(), ErrorCodes::EPOLL_ERROR, error_no); + } + /// Returns a reference to the listening socket. [[maybe_unused]] Socket & getSocket() { return socket; } @@ -109,6 +124,7 @@ class SocketAcceptor /// Register accept event handler to main reactor main_reactor->addEventHandler(socket, AcceptorObserver(*this, &SocketAcceptor::onAccept)); + main_reactor->addEventHandler(socket, ErrorObserver(*this, &SocketAcceptor::onError)); /// It is necessary to wake up the getWorkerReactor. main_reactor->wakeUp(); diff --git a/src/Common/NIO/SocketNotification.h b/src/Common/NIO/SocketNotification.h index c71c13fbd1..3403f3b2ad 100644 --- a/src/Common/NIO/SocketNotification.h +++ b/src/Common/NIO/SocketNotification.h @@ -42,7 +42,7 @@ class ReadableNotification : public SocketNotification explicit ReadableNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } ~ReadableNotification() override = default; - std::string name() const override { return "read";} + std::string name() const override { return "read"; } }; /// This notification is sent if a socket has become writable. @@ -52,7 +52,7 @@ class WritableNotification : public SocketNotification explicit WritableNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } ~WritableNotification() override = default; - std::string name() const override { return "write";} + std::string name() const override { return "write"; } }; /// This notification is sent if a socket has signalled an error. @@ -62,7 +62,12 @@ class ErrorNotification : public SocketNotification explicit ErrorNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } ~ErrorNotification() override = default; - std::string name() const override { return "error";} + std::string name() const override { return "error"; } + int getErrorNo() const { return error_no; } + void setErrorNo(int error_no_) { error_no = error_no_; } + +private: + int error_no; }; /// This notification is sent if no other event has occurred @@ -73,7 +78,7 @@ class TimeoutNotification : public SocketNotification explicit TimeoutNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } ~TimeoutNotification() override = default; - std::string name() const override { return "timeout";} + std::string name() const override { return "timeout"; } }; /// This notification is sent when the SocketReactor does @@ -84,7 +89,7 @@ class IdleNotification : public SocketNotification explicit IdleNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } ~IdleNotification() override = default; - std::string name() const override { return "idle";} + std::string name() const override { return "idle"; } }; /// This notification is sent when the SocketReactor is @@ -95,7 +100,7 @@ class ShutdownNotification : public SocketNotification explicit ShutdownNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { } ~ShutdownNotification() override = default; - std::string name() const override { return "shutdown";} + std::string name() const override { return "shutdown"; } }; using SocketNotificationPtr [[maybe_unused]] = std::shared_ptr; diff --git a/src/Common/NIO/SocketReactor.cpp b/src/Common/NIO/SocketReactor.cpp index 13a5e538a2..4113014089 100644 --- a/src/Common/NIO/SocketReactor.cpp +++ b/src/Common/NIO/SocketReactor.cpp @@ -73,6 +73,7 @@ void SocketReactor::run() } if (socket_and_events.second & PollSet::POLL_ERROR) { + dynamic_cast(enf.get())->setErrorNo(errno); dispatch(socket_and_events.first, *enf); } }