Skip to content

Commit

Permalink
Merge pull request #1431 from joynr/uds_server_fix
Browse files Browse the repository at this point in the history
[C++] UdsServer TSAN fixes

Reviewed-by: mateuszszumilaspartner
Reviewed-by: guidoostkamppartner <[email protected]>
Reviewed-by: thomasreuterpartner <[email protected]>
  • Loading branch information
zuul[bot] authored and GitHub Enterprise committed Feb 16, 2023
2 parents 2eb064c + 5e201f2 commit 2586748
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 47 deletions.
128 changes: 95 additions & 33 deletions cpp/libjoynr/uds/UdsServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ UdsServer::UdsServer(const UdsSettings& settings)
_endpoint(settings.getSocketPath()),
_acceptor(*_ioContext),
_started{false},
_acceptorMutex()
_acceptorMutex(),
_connectionMap(),
_connectionIndex(0),
_workGuard(std::make_shared<boost::asio::io_service::work>(*_ioContext))
{
_remoteConfig._maxSendQueueSize = settings.getSendingQueueSize();
}
Expand All @@ -53,7 +56,29 @@ UdsServer::~UdsServer()
boost::system::error_code ignore;
std::unique_lock<std::mutex> acceptorLock(_acceptorMutex);
_acceptor.cancel(ignore); // Acceptor will not create further connections

// shutdown all still existing connections
while (!_connectionMap.empty()) {
auto it = _connectionMap.cbegin();
std::weak_ptr<Connection> connection = it->second;
_connectionMap.erase(it);
if (auto connectionSharedPtr = connection.lock()) {
_acceptorMutex.unlock();
// shutdown must be invoked without lock to avoid
// potential deadlock situation
connectionSharedPtr->shutdown();
connectionSharedPtr.reset();
// wait until the object has been destructed
while (connection.lock()) {
std::this_thread::sleep_for(std::chrono::milliseconds(25));
}
_acceptorMutex.lock();
}
}

_workGuard.reset();
acceptorLock.unlock();

_ioContext->stop();
// Wait for worker before destructing members
try {
Expand Down Expand Up @@ -134,22 +159,51 @@ void UdsServer::run()

void UdsServer::doAcceptClient() noexcept
{
_newConnection = std::make_shared<Connection>(_ioContext, _remoteConfig);
std::unique_lock<std::mutex> acceptorLock(_acceptorMutex);
if (!_started.load()) {
JOYNR_LOG_INFO(logger(),
"Stop accepting new clients because UdsServer destructor has been invoked.");
return;
}

// check if some of the existing entries can be removed
// which is the case when the weak_ptr has expired since the
// connection object got already destructed
std::unordered_set<std::uint64_t> expiredEntries;
for (const auto& connectionsPair : _connectionMap) {
auto connectionIndex = connectionsPair.first;
auto connection = connectionsPair.second;
if (connection.expired()) {
expiredEntries.insert(connectionIndex);
}
}
for (const auto& connectionIndex : expiredEntries) {
_connectionMap.erase(connectionIndex);
}

uint64_t index = _connectionIndex++;
auto newConnection = std::make_shared<Connection>(_ioContext, _remoteConfig, index);
_connectionMap[index] = newConnection;

_acceptor.async_accept(
_newConnection->getSocket(), [this](boost::system::error_code acceptFailure) {
newConnection->getSocket(),
[this, newConnection, index](boost::system::error_code acceptFailure) {
if (acceptFailure) {
JOYNR_LOG_ERROR(
logger(), "Failed to accept new client: {}", acceptFailure.message());
} else {
JOYNR_LOG_INFO(logger(), "Connection request received from new client.");
_newConnection->doReadInitHeader();
JOYNR_LOG_INFO(logger(),
"Connection index {} request received from new client.",
index);
newConnection->doReadInitHeader();
doAcceptClient();
}
});
}

UdsServer::Connection::Connection(std::shared_ptr<boost::asio::io_service>& ioContext,
const ConnectionConfig& config) noexcept
const ConnectionConfig& config,
std::uint64_t connectionIndex) noexcept
: _ioContext{ioContext},
_socket(*ioContext),
_connectedCallback{config._connectedCallback},
Expand All @@ -158,7 +212,8 @@ UdsServer::Connection::Connection(std::shared_ptr<boost::asio::io_service>& ioCo
_isClosed{false},
_username("connection not established"),
_sendQueue(std::make_unique<UdsSendQueue<UdsFrameBufferV1>>(config._maxSendQueueSize)),
_readBuffer(std::make_unique<UdsFrameBufferV1>())
_readBuffer(std::make_unique<UdsFrameBufferV1>()),
_connectionIndex(connectionIndex)
{
}

Expand Down Expand Up @@ -197,7 +252,10 @@ std::string UdsServer::Connection::getUserName()
username = std::string("anonymous");
int storedErrno = errno;
JOYNR_LOG_ERROR(
logger(), "Could not obtain peer credentials from socket, errno {}", storedErrno);
logger(),
"Connection index {} could not obtain peer credentials from socket, errno {}",
_connectionIndex,
storedErrno);
}
return username;
}
Expand All @@ -213,17 +271,9 @@ void UdsServer::Connection::send(const smrf::ByteArrayView& msg,
if (_isClosed.load()) {
throw std::runtime_error("Connection already closed.");
}
auto ioContext = _ioContext.lock();
if (!ioContext) {
JOYNR_LOG_WARN(logger(),
"Forced close of connection to {} ({}) since server shutting down.",
_address.getId(),
getUserName());
return;
}
try {
// UdsFrameBufferV1 first since it can cause exception
ioContext->post(
_ioContext->post(
[frame = UdsFrameBufferV1(msg), self = shared_from_this(), callback]() mutable {
try {
if (self->_sendQueue->pushBack(std::move(frame), callback)) {
Expand All @@ -235,7 +285,7 @@ void UdsServer::Connection::send(const smrf::ByteArrayView& msg,
});
} catch (const joynr::exceptions::JoynrRuntimeException& e) {
// In case generation of frame buffer failed, close connection
ioContext->post([self = shared_from_this(), e]() mutable {
_ioContext->post([self = shared_from_this(), e]() mutable {
self->doClose("Failed to construct message", e);
});
throw;
Expand All @@ -248,14 +298,15 @@ void UdsServer::Connection::shutdown()
return;
}
const std::string clientId = _address.getId().empty() ? "[unknown ID]" : _address.getId();
JOYNR_LOG_INFO(logger(), "Closing connection to {} ({}).", clientId, getUserName());
auto ioContext = _ioContext.lock();
if (ioContext) {
ioContext->dispatch([self = shared_from_this()]() { self->doClose(); });
JOYNR_LOG_INFO(logger(),
"Closing connection index {} to {} ({}).",
_connectionIndex,
clientId,
_username);
if (_ioContext) {
_ioContext->dispatch([self = shared_from_this()]() { self->doClose(); });
}
ioContext.reset();
// Wait till close is processed or the server is shutting down
while ((!_isClosed.load()) && (!_ioContext.expired())) {
while (!_isClosed.load()) {
std::this_thread::yield();
}
}
Expand Down Expand Up @@ -284,11 +335,12 @@ void UdsServer::Connection::doReadInitBody() noexcept
try {
self->_username = self->getUserName();
self->_address = self->_readBuffer->readInit();
JOYNR_LOG_INFO(
logger(),
"Initialize connection for client with User / ID: {} / {}",
self->_username,
self->_address.getId());
JOYNR_LOG_INFO(logger(),
"Initialize connection index {} for client with User / "
"ID: {} / {}",
self->_connectionIndex,
self->_username,
self->_address.getId());
self->_connectedCallback(self->_address,
std::make_unique<UdsServer::UdsSender>(
std::weak_ptr<Connection>(self)));
Expand Down Expand Up @@ -372,7 +424,11 @@ void UdsServer::Connection::doClose(const std::string& errorMessage) noexcept
{
if (!_isClosed.load()) {
const std::string clientId = _address.getId().empty() ? "[unknown ID]" : _address.getId();
JOYNR_LOG_FATAL(logger(), "Connection to {} corrupted: {}", clientId, errorMessage);
JOYNR_LOG_FATAL(logger(),
"Connection index {} to {} corrupted: {}",
_connectionIndex,
clientId,
errorMessage);
}
doClose();
}
Expand All @@ -384,7 +440,10 @@ void UdsServer::Connection::doClose() noexcept
try {
_disconnectedCallback(_address);
} catch (const std::exception& e) {
JOYNR_LOG_ERROR(logger(), "Failed to process disconnection: {}", e.what());
JOYNR_LOG_ERROR(logger(),
"Connection index {} failed to process disconnection: {}",
_connectionIndex,
e.what());
}
}
boost::system::error_code ignore;
Expand All @@ -393,7 +452,10 @@ void UdsServer::Connection::doClose() noexcept
try {
_sendQueue->emptyQueueAndNotify("Connection closed.");
} catch (const std::exception& e) {
JOYNR_LOG_ERROR(logger(), "Failed to process send-failure: {}", e.what());
JOYNR_LOG_ERROR(logger(),
"Connection index {} failed to process send-failure: {}",
_connectionIndex,
e.what());
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions cpp/libjoynr/uds/include/joynr/UdsServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ class UdsServer
void setConnectCallback(const Connected& callback);

/**
* @brief Sets callback for disconnection of the client (not called if server is stopped before
* client disconnection)
* @brief Sets callback for disconnection of the client
* @param callback Callback
*/
void setDisconnectCallback(const Disconnected& callback);
Expand Down Expand Up @@ -117,7 +116,8 @@ class UdsServer

public:
Connection(std::shared_ptr<boost::asio::io_service>& ioContext,
const ConnectionConfig& config) noexcept;
const ConnectionConfig& config,
std::uint64_t connectionIndex) noexcept;

/** Notifies sender if the connection got lost (error occured) */
~Connection() = default;
Expand Down Expand Up @@ -146,7 +146,7 @@ class UdsServer
void doClose(const std::string& errorMessage) noexcept;
void doClose() noexcept;

std::weak_ptr<boost::asio::io_service> _ioContext;
std::shared_ptr<boost::asio::io_service> _ioContext;
uds::socket _socket;
Connected _connectedCallback;
Disconnected _disconnectedCallback;
Expand All @@ -162,6 +162,8 @@ class UdsServer
std::unique_ptr<UdsSendQueue<UdsFrameBufferV1>> _sendQueue;
std::unique_ptr<UdsFrameBufferV1> _readBuffer;

std::uint64_t _connectionIndex;

ADD_LOGGER(Connection)
};

Expand Down Expand Up @@ -191,10 +193,12 @@ class UdsServer
std::chrono::milliseconds _openSleepTime;
uds::endpoint _endpoint;
uds::acceptor _acceptor;
std::shared_ptr<Connection> _newConnection;
std::future<void> _worker;
std::atomic_bool _started;
std::mutex _acceptorMutex;
std::unordered_map<std::uint64_t, std::weak_ptr<Connection>> _connectionMap;
std::atomic<uint64_t> _connectionIndex;
std::shared_ptr<boost::asio::io_service::work> _workGuard;
ADD_LOGGER(UdsServer)
};

Expand Down
32 changes: 23 additions & 9 deletions cpp/tests/unit-tests/uds/UdsServerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,42 @@ TEST_F(UdsServerTest, multipleServerReusingSocket)
{
std::shared_ptr<joynr::IUdsSender> sender1;
std::shared_ptr<joynr::IUdsSender> sender2;
auto semaphore1 = std::make_shared<Semaphore>();
auto semaphore2 = std::make_shared<Semaphore>();
MockUdsServerCallbacks mockUdsServerCallbacks1;
MockUdsServerCallbacks mockUdsServerCallbacks2;
EXPECT_CALL(mockUdsServerCallbacks1, connectedMock(_, _))
.Times(1)
.WillOnce(SaveArg<1>(&sender1));
.WillOnce(DoAll(SaveArg<1>(&sender1) , ReleaseSemaphore(semaphore1)));
EXPECT_CALL(mockUdsServerCallbacks1, disconnected(_))
.Times(1);
EXPECT_CALL(mockUdsServerCallbacks2, connectedMock(_, _))
.Times(1)
.WillOnce(SaveArg<1>(&sender2));
.WillOnce(DoAll(SaveArg<1>(&sender2), ReleaseSemaphore(semaphore2)));
EXPECT_CALL(mockUdsServerCallbacks2, disconnected(_))
.Times(1);
auto server1 = createServer(mockUdsServerCallbacks1);
server1->start();
// expect client to get connected to server1
ASSERT_EQ(waitClientConnected(true), true) << "Client is not connected to initial server.";
ASSERT_TRUE(semaphore1->waitFor(_waitPeriodForClientServerCommunication))
<< "Failed to invoke connected callback of server 1.";
auto server2 = createServer(mockUdsServerCallbacks2);
server2->start();
std::this_thread::yield();
// expect client to be still connected to server1
ASSERT_EQ(waitClientConnected(true), true)
<< "Client lost connection after second server reuses socket.";
server1.reset();
// expect client to now be disconnected since server1 got shutdown
ASSERT_EQ(waitClientConnected(false), false)
<< "Client is not disconnected after initial server stopped.";
restartClient();
// expect client to now be connected to server2 after restart
ASSERT_EQ(waitClientConnected(true), true)
<< "Client is not connected to second server after restart.";
ASSERT_TRUE(semaphore2->waitFor(_waitPeriodForClientServerCommunication))
<< "Failed to invoke connected callback of server 2.";
}

TEST_F(UdsServerTest, connectReceiveAndDisconnectFromClient_serverCallbacksCalled)
Expand Down Expand Up @@ -125,7 +139,7 @@ TEST_F(UdsServerTest, sendToClient)
.WillOnce(DoAll(SaveArg<1>(&sender), ReleaseSemaphore(semaphore)));
EXPECT_CALL(mockUdsServerCallbacks, receivedMock(_, _, _)).Times(0);
EXPECT_CALL(mockUdsServerCallbacks, disconnected(_))
.Times(0); // Server disconnects before client
.Times(1);
auto server = createServer(mockUdsServerCallbacks);
server->start();
ASSERT_TRUE(semaphore->waitFor(_waitPeriodForClientServerCommunication))
Expand All @@ -150,8 +164,9 @@ TEST_F(UdsServerTest, robustness_sendException_otherClientsNotAffected)
.Times(2)
.WillRepeatedly(DoAll(SaveArg<1>(&tmpSender), ReleaseSemaphore(connectionSemaphore)));
// disconnected called for erroneous sender, not for nominal one
EXPECT_CALL(mockUdsServerCallbacks, disconnected(_)).Times(1);
EXPECT_CALL(mockUdsServerCallbacks, disconnected(_))
.WillOnce(ReleaseSemaphore(disconnectionSemaphore));
.WillOnce(ReleaseSemaphore(disconnectionSemaphore)).RetiresOnSaturation();
auto server = createServer(mockUdsServerCallbacks);
server->start();
ASSERT_TRUE(connectionSemaphore->waitFor(_waitPeriodForClientServerCommunication))
Expand Down Expand Up @@ -194,10 +209,10 @@ TEST_F(UdsServerTest, robustness_disconnectsErroneousClients_goodClientsNotAffec
.Times(2)
.WillRepeatedly(DoAll(SaveArg<1>(&tmpSender), ReleaseSemaphore(connectSemaphore)));

// disconnected callback called for erroneous client, not for good client
// disconnected callback called first for erroneous client, later for good client
EXPECT_CALL(mockUdsServerCallbacks, disconnected(_)).Times(1);
EXPECT_CALL(mockUdsServerCallbacks, disconnected(_))
.Times(1)
.WillRepeatedly(ReleaseSemaphore(disconnectSemaphore));
.WillOnce(ReleaseSemaphore(disconnectSemaphore)).RetiresOnSaturation();

auto server = createServer(mockUdsServerCallbacks);
server->start();
Expand Down Expand Up @@ -306,8 +321,7 @@ TEST_F(UdsServerTest, robustness_nonblockingWrite)
DoAll(SaveArg<1>(&tmpClientSender), ReleaseSemaphore(connectionSemaphore)));
// disconnected not called, since no connection loss (UDS socket has per default no timeout)
EXPECT_CALL(mockUdsServerCallbacks, disconnected(_))
.Times(1)
.WillOnce(ReleaseSemaphore(disconnectSemaphore));
.WillRepeatedly(ReleaseSemaphore(disconnectSemaphore));
auto server = createServer(mockUdsServerCallbacks);
server->start();
ASSERT_TRUE(connectionSemaphore->waitFor(_waitPeriodForClientServerCommunication))
Expand Down

0 comments on commit 2586748

Please sign in to comment.