Skip to content

Commit

Permalink
Handle errors in poll set
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed Dec 21, 2023
1 parent a6261e7 commit e0d9bc0
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 22 deletions.
3 changes: 3 additions & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
M(74, CANNOT_PARSE_ELF) \
M(75, CANNOT_STAT) \
M(76, NETLINK_ERROR) \
M(78, EPOLL_CTL) \
M(79, EPOLL_CREATE) \
M(80, EPOLL_WAIT) \
\
M(102, KEEPER_EXCEPTION) \
M(103, POCO_EXCEPTION) \
Expand Down
55 changes: 39 additions & 16 deletions src/Common/NIO/PollSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <Poco/Net/SocketImpl.h>
#include <Poco/Thread.h>

#include <Common/Exception.h>
#include <Common/NIO/PollSet.h>
#include <common/logger_useful.h>

Expand All @@ -21,6 +22,24 @@ using Poco::Net::SocketImpl;
namespace RK
{

namespace ErrorCodes
{
extern const int EPOLL_ERROR;
extern const int EPOLL_CTL;
extern const int EPOLL_CREATE;
extern const int EPOLL_WAIT;
extern const int LOGICAL_ERROR;
}

namespace
{
/// Return local address string for server socket, or return remote address for stream socket.
std::string getAddressName(const Socket & sock)
{
return sock.isStream() ? sock.peerAddress().toString() : sock.address().toString();
}
}

/// PollSet implementation with epoll
class PollSetImpl
{
Expand Down Expand Up @@ -70,7 +89,7 @@ PollSetImpl::PollSetImpl()
int err = addImpl(waking_up_fd, PollSet::POLL_READ, this);
if ((err) || (epoll_fd < 0))
{
errno;
throwFromErrno("Error when initializing poll set", ErrorCodes::EPOLL_ERROR, errno);
}
}

Expand All @@ -92,7 +111,7 @@ void PollSetImpl::add(const Socket & socket, int mode)
if (errno == EEXIST)
update(socket, mode);
else
errno;
throwFromErrno("Error when updating epoll event to " + getAddressName(socket), ErrorCodes::EPOLL_CTL, errno);
}

if (socket_map.find(socket_impl) == socket_map.end())
Expand Down Expand Up @@ -123,7 +142,7 @@ void PollSetImpl::remove(const Socket & socket)
ev.data.ptr = nullptr;
int err = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &ev);
if (err)
errno;
throwFromErrno("Error when updating epoll event to " + getAddressName(socket), ErrorCodes::EPOLL_CTL, errno);

socket_map.erase(socket.impl());
}
Expand All @@ -146,18 +165,19 @@ void PollSetImpl::update(const Socket & socket, int mode)
poco_socket_t fd = socket.impl()->sockfd();
struct epoll_event ev;
ev.events = 0;

if (mode & PollSet::POLL_READ)
ev.events |= EPOLLIN;
if (mode & PollSet::POLL_WRITE)
ev.events |= EPOLLOUT;
if (mode & PollSet::POLL_ERROR)
ev.events |= EPOLLERR;

ev.data.ptr = socket.impl();
int err = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev);

if (err)
{
errno;
}
throwFromErrno("Error when updating epoll event to " + getAddressName(socket), ErrorCodes::EPOLL_CTL, errno);
}

void PollSetImpl::clear()
Expand All @@ -169,14 +189,15 @@ void PollSetImpl::clear()
epoll_fd = epoll_create(1);
if (epoll_fd < 0)
{
errno;
throwFromErrno("Error when creating epoll fd", ErrorCodes::EPOLL_CREATE, errno);
}
}

PollSet::SocketModeMap PollSetImpl::poll(const Poco::Timespan & timeout)
{
PollSet::SocketModeMap result;
Poco::Timespan remaining_time(timeout);

int rc;
do
{
Expand All @@ -192,25 +213,27 @@ PollSet::SocketModeMap PollSetImpl::poll(const Poco::Timespan & timeout)
remaining_time -= waited;
else
remaining_time = 0;
LOG_TRACE(log, "Poll wait encounter error EINTR {}ms", remaining_time.totalMilliseconds());
}
} while (rc < 0 && errno == POCO_EINTR);

if (rc < 0)
errno;
throwFromErrno("Error when epoll waiting", ErrorCodes::EPOLL_WAIT, errno);

LOG_TRACE(log, "Got {} events", rc);

Poco::FastMutex::ScopedLock lock(mutex);

for (int i = 0; i < rc; i++)
{
/// Read data from 'wakeUp' method
if (events[i].data.ptr == this)
{
uint64_t val;
auto n = ::read(waking_up_fd, &val, sizeof(val));
LOG_TRACE(
log, "Poll wakeup {} {} {} {}", Poco::Thread::current() ? Poco::Thread::current()->name() : "main", waking_up_fd, n, errno);
if (n < 0)
errno;
throwFromErrno("Error when reading data from 'wakeUp' method", ErrorCodes::EPOLL_CREATE, errno);
}
/// Handle IO events
else if (events[i].data.ptr)
{
std::map<SocketImpl *, Socket>::iterator it = socket_map.find(static_cast<SocketImpl *>(events[i].data.ptr));
Expand All @@ -226,7 +249,7 @@ PollSet::SocketModeMap PollSetImpl::poll(const Poco::Timespan & timeout)
}
else
{
LOG_ERROR(log, "Poll receive null data socket event {}", static_cast<unsigned int>(events[i].events));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Should never reach here.");
}
}

Expand All @@ -235,11 +258,11 @@ PollSet::SocketModeMap PollSetImpl::poll(const Poco::Timespan & timeout)

void PollSetImpl::wakeUp()
{
uint64_t val = 1;
LOG_TRACE(log, "Try to wakeup poll set");
uint64_t val = 0;
int n = ::write(waking_up_fd, &val, sizeof(val));
LOG_TRACE(log, "Poll trigger wakeup {} {} {}", Poco::Thread::current() ? Poco::Thread::current()->name() : "main", waking_up_fd, n);
if (n < 0)
errno;
throwFromErrno("Error when trying to wakeup poll set", ErrorCodes::EPOLL_CREATE, errno);
}

int PollSetImpl::count() const
Expand Down
2 changes: 1 addition & 1 deletion src/Common/NIO/SocketAcceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SocketAcceptor
StreamSocket sock;
try
{
LOG_TRACE(log, "Try to accept connection");
LOG_TRACE(log, "{} tries to accept connection", socket.address().toString());
sock = socket.acceptConnection();
LOG_TRACE(log, "Successfully accepted {}", sock.peerAddress().toString());
createServiceHandler(sock);
Expand Down
5 changes: 4 additions & 1 deletion src/Common/NIO/SocketReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ void SocketReactor::dispatch(SocketNotifierPtr & notifier, const Notification &
{
try
{
LOG_TRACE(log, "Dispatch event {} for {} ", notification.name(), notifier->getSocket().peerAddress().toString());
const auto & sock = notifier->getSocket();
const auto socket_name = sock.isStream() ? sock.address().toString() /// use local address for server socket
: sock.peerAddress().toString(); /// use remote address for server socket
LOG_TRACE(log, "Dispatch event {} for {} ", notification.name(), socket_name);
notifier->dispatch(notification);
}
catch (...)
Expand Down
4 changes: 2 additions & 2 deletions src/Service/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ int64_t KeeperServer::getSessionID(int64_t session_timeout_ms)
new_session_id_callback.erase(sid);
if (status == std::cv_status::timeout)
{
throw Exception(ErrorCodes::RAFT_ERROR, "Time out, can not allocate session {}", sid);
throw Exception(ErrorCodes::RAFT_ERROR, "Time out, can not allocate session {}", toHexString(sid));
}
}
}

LOG_DEBUG(log, "Got session {}", sid);
LOG_DEBUG(log, "Got session {}", toHexString(sid));
return sid;
}

Expand Down
4 changes: 2 additions & 2 deletions src/Service/NuRaftStateMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ nuraft::ptr<nuraft::buffer> NuRaftStateMachine::commit(const ulong log_idx, nura
else
LOG_DEBUG(
log,
"Not found callback for session id {}, maybe time out or before wait or not allocate from local",
"Not found callback for session id {}, maybe timeout or before wait or not allocate from local",
toHexString(session_id));
}

Expand Down Expand Up @@ -284,7 +284,7 @@ nuraft::ptr<nuraft::buffer> NuRaftStateMachine::commit(const ulong log_idx, nura
else
LOG_DEBUG(
log,
"Not found callback for session id {}, maybe time out or before wait or not allocate from local",
"Not found callback for session id {}, maybe timeout or before wait or not allocate from local",
toHexString(session_id));
}

Expand Down

0 comments on commit e0d9bc0

Please sign in to comment.