Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the NIO framework #135

Merged
merged 11 commits into from
Jan 4, 2024
23 changes: 11 additions & 12 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

#include <Common/Config/ConfigReloader.h>
#include <Common/CurrentMetrics.h>
#include <Common/NIO/SvsSocketAcceptor.h>
#include <Common/NIO/SvsSocketReactor.h>
#include <Common/NIO/SocketAcceptor.h>
#include <Common/config_version.h>
#include <Common/getExecutablePath.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
Expand Down Expand Up @@ -151,8 +150,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
= global_context.getConfigRef().getUInt("keeper.raft_settings.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS);

/// start server
std::shared_ptr<SvsSocketReactor<SocketReactor>> server;
std::shared_ptr<SvsSocketAcceptor<ConnectionHandler, SocketReactor>> conn_acceptor;
AsyncSocketReactorPtr server;
std::shared_ptr<SocketAcceptor<ConnectionHandler>> conn_acceptor;
int32_t port = config().getInt("keeper.port", 8101);

auto cpu_core_size = getNumberOfPhysicalCPUCores();
Expand All @@ -167,17 +166,17 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setBlocking(false);

Poco::Timespan timeout(operation_timeout_ms * 1000);
server = std::make_shared<SvsSocketReactor<SocketReactor>>(timeout, "IO-Acptr");
server = std::make_shared<AsyncSocketReactor>(timeout, "IO-Acptr");

/// TODO add io thread count to config
conn_acceptor = std::make_shared<SvsSocketAcceptor<ConnectionHandler, SocketReactor>>(
"IO-Hdlr", global_context, socket, *server, timeout, cpu_core_size);
conn_acceptor
= std::make_shared<SocketAcceptor<ConnectionHandler>>("IO-Hdlr", global_context, socket, server, timeout, cpu_core_size);
LOG_INFO(log, "Listening for user connections on {}", socket.address().toString());
});

/// start forwarding server
std::shared_ptr<SvsSocketReactor<SocketReactor>> forwarding_server;
std::shared_ptr<SvsSocketAcceptor<ForwardConnectionHandler, SocketReactor>> forwarding_conn_acceptor;
AsyncSocketReactorPtr forwarding_server;
std::shared_ptr<SocketAcceptor<ForwardConnectionHandler>> forwarding_conn_acceptor;
int32_t forwarding_port = config().getInt("keeper.forwarding_port", 8102);

createServer(
Expand All @@ -190,11 +189,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setBlocking(false);

Poco::Timespan timeout(operation_timeout_ms * 1000);
forwarding_server = std::make_shared<SvsSocketReactor<SocketReactor>>(timeout, "IO-FwdAcptr");
forwarding_server = std::make_shared<AsyncSocketReactor>(timeout, "IO-FwdAcptr");

/// TODO add io thread count to config
forwarding_conn_acceptor = std::make_shared<SvsSocketAcceptor<ForwardConnectionHandler, SocketReactor>>(
"IO-FwdHdlr", global_context, socket, *forwarding_server, timeout, cpu_core_size);
forwarding_conn_acceptor = std::make_shared<SocketAcceptor<ForwardConnectionHandler>>(
"IO-FwdHdlr", global_context, socket, forwarding_server, timeout, cpu_core_size);
LOG_INFO(log, "Listening for forwarding connections on {}", socket.address().toString());
});

Expand Down
9 changes: 6 additions & 3 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
M(74, CANNOT_PARSE_ELF) \
M(75, CANNOT_STAT) \
M(76, NETLINK_ERROR) \
M(78, EPOLL_CTL) \
M(79, EPOLL_CREATE) \
M(80, EPOLL_WAIT) \
\
M(102, KEEPER_EXCEPTION) \
M(103, POCO_EXCEPTION) \
Expand All @@ -94,13 +97,13 @@
M(106, INVALID_LOG_LEVEL) \
M(107, UNEXPECTED_ZOOKEEPER_ERROR) \
M(108, UNEXPECTED_NODE_IN_ZOOKEEPER) \
M(109, RAFT_FORWARDING_ERROR) \
M(109, RAFT_FORWARD_ERROR) \
M(110, CANNOT_PTHREAD_ATTR) \
M(111, UNEXPECTED_FORWARD_PACKET) \
M(112, RAFT_IS_LEADER) \
M(113, RAFT_NO_LEADER) \
M(114, RAFT_FWD_NO_CONN) \
M(115, FORWARDING_DISCONNECTED) \
M(115, FORWARD_NOT_CONNECTED) \
M(116, ILLEGAL_SETTING_VALUE) \
/* See END */

Expand All @@ -126,7 +129,7 @@ namespace ErrorCodes
}
} error_codes_names;

std::string_view getName(ErrorCode error_code)
[[maybe_unused]] std::string_view getName(ErrorCode error_code)
{
if (error_code >= END)
return std::string_view();
Expand Down
2 changes: 1 addition & 1 deletion src/Common/ErrorCodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace ErrorCodes

/// Get name of error_code by identifier.
/// Returns statically allocated string.
std::string_view getName(ErrorCode error_code);
[[maybe_unused]] std::string_view getName(ErrorCode error_code);

/// ErrorCode identifier -> current value of error_code.
extern std::atomic<Value> values[];
Expand Down
27 changes: 27 additions & 0 deletions src/Common/NIO/Notification.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. and Contributors.
* SPDX-License-Identifier: BSL-1.0
*/
#pragma once

#include <Poco/Foundation.h>
#include <Poco/Mutex.h>


namespace RK
{

/// The base class for all notification classes.
class Notification
{
public:
Notification() = default;
virtual std::string name() const { return typeid(*this).name(); }

protected:
virtual ~Notification() = default;
};

using NotificationPtr = std::shared_ptr<Notification>;

}
58 changes: 30 additions & 28 deletions src/Common/NIO/NotificationCenter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,24 @@ namespace RK

bool NotificationCenter::addObserverIfNotExist(const AbstractObserver & observer)
{
Mutex::ScopedLock lock(_mutex);
for (const auto & p : _observers)
Mutex::ScopedLock lock(mutex);
for (const auto & p : observers)
if (observer.equals(*p))
return false;
_observers.push_back(observer.clone());
observers.push_back(observer.clone());
return true;
}


bool NotificationCenter::removeObserverIfExist(const AbstractObserver & observer)
{
Mutex::ScopedLock lock(_mutex);
for (auto it = _observers.begin(); it != _observers.end(); ++it)
Mutex::ScopedLock lock(mutex);
for (auto it = observers.begin(); it != observers.end(); ++it)
{
if (observer.equals(**it))
{
(*it)->disable();
_observers.erase(it);
observers.erase(it);
return true;
}
}
Expand All @@ -45,8 +45,8 @@ bool NotificationCenter::removeObserverIfExist(const AbstractObserver & observer

bool NotificationCenter::hasObserver(const AbstractObserver & observer) const
{
Mutex::ScopedLock lock(_mutex);
for (const auto & p : _observers)
Mutex::ScopedLock lock(mutex);
for (const auto & p : observers)
if (observer.equals(*p))
return true;

Expand All @@ -56,49 +56,51 @@ bool NotificationCenter::hasObserver(const AbstractObserver & observer) const

bool NotificationCenter::onlyHas(const AbstractObserver & observer) const
{
Mutex::ScopedLock lock(_mutex);
return _observers.size() == 1 && observer.equals(*_observers[0]);
Mutex::ScopedLock lock(mutex);
return observers.size() == 1 && observer.equals(*observers[0]);
}


bool NotificationCenter::accept(Poco::Notification * pNotification) const
bool NotificationCenter::accept(const Notification & notification) const
{
Mutex::ScopedLock lock(_mutex);
for (const auto & observer : _observers)
Mutex::ScopedLock lock(mutex);
for (const auto & observer : observers)
{
if (observer->accepts(pNotification))
if (observer->accepts(notification))
return true;
}
return false;
}


void NotificationCenter::postNotification(Notification::Ptr pNotification)
void NotificationCenter::postNotification(const Notification & notification)
{
poco_check_ptr(pNotification);

Poco::ScopedLockWithUnlock<Mutex> lock(_mutex);
ObserverList copied(_observers);
lock.unlock();

for (auto & p : copied)
Observers copied;
{
p->notify(pNotification);
Mutex::ScopedLock lock(mutex);
for (auto & observer : observers)
{
if (observer->accepts(notification))
copied.push_back(observer);
}
}

for (auto & observer : copied)
observer->notify(notification);
}


bool NotificationCenter::hasObservers() const
{
Mutex::ScopedLock lock(_mutex);
return !_observers.empty();
Mutex::ScopedLock lock(mutex);
return !observers.empty();
}


std::size_t NotificationCenter::countObservers() const
std::size_t NotificationCenter::size() const
{
Mutex::ScopedLock lock(_mutex);
return _observers.size();
Mutex::ScopedLock lock(mutex);
return observers.size();
}

}
87 changes: 14 additions & 73 deletions src/Common/NIO/NotificationCenter.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
//
// NotificationCenter.h
//
// Library: Foundation
// Package: Notifications
// Module: NotificationCenter
//
// Definition of the NotificationCenter class.
//
// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
Expand All @@ -18,20 +9,16 @@
#include <set>
#include <vector>

#include <Poco/AbstractObserver.h>
#include <Poco/Foundation.h>
#include <Poco/Mutex.h>
#include <Poco/Notification.h>
#include <Poco/SharedPtr.h>

#include <Common/NIO/SocketNotification.h>
#include <Common/NIO/Observer.h>


namespace RK
{


class Foundation_API NotificationCenter
/// A NotificationCenter is essentially a notification dispatcher.
/// It notifies all observers of notifications meeting specific criteria.
/// This information is encapsulated in Notification objects.
Expand All @@ -40,102 +27,56 @@ class Foundation_API NotificationCenter
/// posts an appropriate notification to the notification center. The notification
/// center invokes the registered method on each matching observer, passing the notification
/// as argument.
///
/// The order in which observers receive notifications is undefined.
/// It is possible for the posting object and the observing object to be the same.
/// The NotificationCenter delivers notifications to observers synchronously.
/// In other words the postNotification() method does not return until all observers have
/// received and processed the notification.
/// If an observer throws an exception while handling a notification, the NotificationCenter
/// stops dispatching the notification and postNotification() rethrows the exception.
///
/// In a multithreaded scenario, notifications are always delivered in the thread in which the
/// notification was posted, which may not be the same thread in which an observer registered itself.
///
/// The NotificationCenter class is basically a C++ implementation of the NSNotificationCenter class
/// found in Apple's Cocoa (or OpenStep).
///
/// While handling a notification, an observer can unregister itself from the notification center,
/// or it can register or unregister other observers. Observers added during a dispatch cycle
/// will not receive the current notification.
///
/// The method receiving the notification must be implemented as
/// void handleNotification(MyNotification* pNf);
/// The handler method gets co-ownership of the Notification object
/// and must release it when done. This is best done with an AutoPtr:
/// void MyClass::handleNotification(MyNotification* pNf)
/// {
/// AutoPtr<MyNotification> nf(pNf);
/// ...
/// }
///
/// Alternatively, the NObserver class template can be used to register a callback
/// method. In this case, the callback method receives the Notification in an
/// AutoPtr and thus does not have to deal with object ownership issues:
/// void MyClass::handleNotification(const AutoPtr<MyNotification>& pNf)
/// {
/// ...
/// }
class NotificationCenter
{
public:
using Mutex = Poco::Mutex;
using Notification = Poco::Notification;
using AbstractObserver = Poco::AbstractObserver;

NotificationCenter() = default;
/// Creates the NotificationCenter.

~NotificationCenter() = default;
/// Destroys the NotificationCenter.

bool addObserverIfNotExist(const AbstractObserver & observer);
/// Registers an observer with the NotificationCenter.
/// Usage:
/// Observer<MyClass, MyNotification> obs(*this, &MyClass::handleNotification);
/// notificationCenter.addObserver(obs);
///
/// Alternatively, the NObserver template class can be used instead of Observer.
/// Alternatively, the Observer template class can be used instead of Observer.
bool addObserverIfNotExist(const AbstractObserver & observer);

bool removeObserverIfExist(const AbstractObserver & observer);
/// Unregisters an observer with the NotificationCenter.
bool removeObserverIfExist(const AbstractObserver & observer);

bool hasObserver(const AbstractObserver & observer) const;
/// Returns true if the observer is registered with this NotificationCenter.
bool hasObserver(const AbstractObserver & observer) const;

bool onlyHas(const AbstractObserver & observer) const;
/// Returns true if only has the given observer
bool onlyHas(const AbstractObserver & observer) const;

void postNotification(Notification::Ptr pNotification);
/// Posts a notification to the NotificationCenter.
/// The NotificationCenter then delivers the notification
/// to all interested observers.
/// If an observer throws an exception, dispatching terminates
/// and the exception is rethrown to the caller.
/// Ownership of the notification object is claimed and the
/// notification is released before returning. Therefore,
/// a call like
/// notificationCenter.postNotification(new MyNotification);
/// does not result in a memory leak.
void postNotification(const Notification & notification);

bool hasObservers() const;
/// Returns true iff there is at least one registered observer.
///
/// Can be used to improve performance if an expensive notification
/// shall only be created and posted if there are any observers.
bool hasObservers() const;

std::size_t countObservers() const;
/// Returns the number of registered observers.
std::size_t size() const;

bool accept(Poco::Notification * pNotification) const;
bool accept(const Notification & notification) const;

private:
using AbstractObserverPtr = Poco::SharedPtr<AbstractObserver>;
using ObserverList = std::vector<AbstractObserverPtr>;
using EventSet = std::multiset<Poco::Notification *>;
using Observers = std::vector<AbstractObserverPtr>;

mutable Poco::Mutex _mutex;
/// All observers
ObserverList _observers;
mutable Poco::Mutex mutex;
Observers observers; /// All observers
};


Expand Down
Loading