Skip to content

Commit

Permalink
SockerAcceptor should handle error event
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed Dec 22, 2023
1 parent e0d9bc0 commit a564f67
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 8 deletions.
9 changes: 7 additions & 2 deletions src/Common/NIO/PollSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,25 @@ PollSet::SocketModeMap PollSetImpl::poll(const Poco::Timespan & timeout)
{
Poco::Timestamp start;
rc = epoll_wait(epoll_fd, &events[0], events.size(), remaining_time.totalMilliseconds());

if (rc == 0)
{
LOG_TRACE(log, "epoll_wait got 0 events");
return result;
}

if (rc < 0 && errno == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remaining_time)
remaining_time -= waited;
else
remaining_time = 0;
break; /// timeout
}
} while (rc < 0 && errno == POCO_EINTR);

if (rc < 0)
if (rc < 0 && errno != POCO_EINTR)
throwFromErrno("Error when epoll waiting", ErrorCodes::EPOLL_WAIT, errno);

LOG_TRACE(log, "Got {} events", rc);
Expand Down
16 changes: 16 additions & 0 deletions src/Common/NIO/SocketAcceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
namespace RK
{

namespace ErrorCodes
{
extern const int EPOLL_ERROR;
}

using Poco::Net::ServerSocket;
using Poco::Net::Socket;
using Poco::Net::StreamSocket;
Expand All @@ -43,6 +48,7 @@ class SocketAcceptor
using WorkerReactorPtr = AsyncSocketReactorPtr;
using WorkerReactors = std::vector<WorkerReactorPtr>;
using AcceptorObserver = Observer<SocketAcceptor, ReadableNotification>;
using ErrorObserver = Observer<SocketAcceptor, ErrorNotification>;

SocketAcceptor() = delete;
SocketAcceptor(const SocketAcceptor &) = delete;
Expand Down Expand Up @@ -71,7 +77,10 @@ class SocketAcceptor
try
{
if (main_reactor)
{
main_reactor->removeEventHandler(socket, AcceptorObserver(*this, &SocketAcceptor::onAccept));
main_reactor->removeEventHandler(socket, ErrorObserver(*this, &SocketAcceptor::onError));
}
}
catch (...)
{
Expand All @@ -96,6 +105,12 @@ class SocketAcceptor
}
}

virtual void onError(const Notification & enf)
{
auto error_no = dynamic_cast<const ErrorNotification *>(&enf)->getErrorNo();
throwFromErrno("Error when accepting connection for " + socket.address().toString(), ErrorCodes::EPOLL_ERROR, error_no);
}

/// Returns a reference to the listening socket.
[[maybe_unused]] Socket & getSocket() { return socket; }

Expand All @@ -109,6 +124,7 @@ class SocketAcceptor

/// Register accept event handler to main reactor
main_reactor->addEventHandler(socket, AcceptorObserver(*this, &SocketAcceptor::onAccept));
main_reactor->addEventHandler(socket, ErrorObserver(*this, &SocketAcceptor::onError));

/// It is necessary to wake up the getWorkerReactor.
main_reactor->wakeUp();
Expand Down
17 changes: 11 additions & 6 deletions src/Common/NIO/SocketNotification.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ReadableNotification : public SocketNotification
explicit ReadableNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { }
~ReadableNotification() override = default;

std::string name() const override { return "read";}
std::string name() const override { return "read"; }
};

/// This notification is sent if a socket has become writable.
Expand All @@ -52,7 +52,7 @@ class WritableNotification : public SocketNotification
explicit WritableNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { }
~WritableNotification() override = default;

std::string name() const override { return "write";}
std::string name() const override { return "write"; }
};

/// This notification is sent if a socket has signalled an error.
Expand All @@ -62,7 +62,12 @@ class ErrorNotification : public SocketNotification
explicit ErrorNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { }
~ErrorNotification() override = default;

std::string name() const override { return "error";}
std::string name() const override { return "error"; }
int getErrorNo() const { return error_no; }
void setErrorNo(int error_no_) { error_no = error_no_; }

private:
int error_no;
};

/// This notification is sent if no other event has occurred
Expand All @@ -73,7 +78,7 @@ class TimeoutNotification : public SocketNotification
explicit TimeoutNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { }
~TimeoutNotification() override = default;

std::string name() const override { return "timeout";}
std::string name() const override { return "timeout"; }
};

/// This notification is sent when the SocketReactor does
Expand All @@ -84,7 +89,7 @@ class IdleNotification : public SocketNotification
explicit IdleNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { }
~IdleNotification() override = default;

std::string name() const override { return "idle";}
std::string name() const override { return "idle"; }
};

/// This notification is sent when the SocketReactor is
Expand All @@ -95,7 +100,7 @@ class ShutdownNotification : public SocketNotification
explicit ShutdownNotification(SocketReactor * reactor_) : SocketNotification(reactor_) { }
~ShutdownNotification() override = default;

std::string name() const override { return "shutdown";}
std::string name() const override { return "shutdown"; }
};

using SocketNotificationPtr [[maybe_unused]] = std::shared_ptr<SocketNotification>;
Expand Down
1 change: 1 addition & 0 deletions src/Common/NIO/SocketReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void SocketReactor::run()
}
if (socket_and_events.second & PollSet::POLL_ERROR)
{
dynamic_cast<ErrorNotification *>(enf.get())->setErrorNo(errno);
dispatch(socket_and_events.first, *enf);
}
}
Expand Down

0 comments on commit a564f67

Please sign in to comment.