From 5e201f224cac18d2a1c91e4288b7013e3ff5e2d7 Mon Sep 17 00:00:00 2001 From: Guido Ostkamp Date: Mon, 28 Nov 2022 10:20:04 +0100 Subject: [PATCH] [C++] UdsServer TSAN fixes * Connection now holds ioContext as shared_ptr because using weak_ptr is insufficient, since it allows destruction of ioContext prior to the socket which requires it. This case results in a heap-used-after-free problem reported by TSAN. * Every created connection object gets stored as weak_ptr into a _connectionMap indexed by an increasing _connectionIndex. When UdsServer destructor is invoked, shutdown() is invoked on any connection that still exists and destructor waits until connections got destructed prior to invoking ioContext->stop() and afterwards waiting for the worker thread to be joined after it has returned from ioContext->run(). ioContext is thus still available and running when the connections are destructed incluing their socket members. * A workGuard has been introduced to keep ioContext->run() from exiting prematurely when there is otherwise no further work to do. * Expired connection entries are purged from _connectionMap whenever the UdsServer is preparing to accept the next connection. * disconnect callback will be invoked for connections even when the shutdown is initiated by the server side. Doxygen comment and tests have been adjusted accordingly. * keep acceptorLock over _acceptor.async_accept(...) to avoid race condition. * doAcceptClient(...) immediately returns when UdsServer destruction is in progress to prevent calling _acceptor.async_accept(...) after _acceptor.cancel() has already been done. * Test case multipleServerReusingSocket got additional semaphore handling since just waiting for client side to invoke connect callback does not make sure that connect callback has also been invoked on server side before reaching end of test case which auto destructs the servers and sporadically caused test failures since expectations for server callbacks had not been met. --- cpp/libjoynr/uds/UdsServer.cpp | 128 +++++++++++++++------ cpp/libjoynr/uds/include/joynr/UdsServer.h | 14 ++- cpp/tests/unit-tests/uds/UdsServerTest.cpp | 32 ++++-- 3 files changed, 127 insertions(+), 47 deletions(-) diff --git a/cpp/libjoynr/uds/UdsServer.cpp b/cpp/libjoynr/uds/UdsServer.cpp index af26ad72f1..9f692d9e4b 100644 --- a/cpp/libjoynr/uds/UdsServer.cpp +++ b/cpp/libjoynr/uds/UdsServer.cpp @@ -41,7 +41,10 @@ UdsServer::UdsServer(const UdsSettings& settings) _endpoint(settings.getSocketPath()), _acceptor(*_ioContext), _started{false}, - _acceptorMutex() + _acceptorMutex(), + _connectionMap(), + _connectionIndex(0), + _workGuard(std::make_shared(*_ioContext)) { _remoteConfig._maxSendQueueSize = settings.getSendingQueueSize(); } @@ -53,7 +56,29 @@ UdsServer::~UdsServer() boost::system::error_code ignore; std::unique_lock acceptorLock(_acceptorMutex); _acceptor.cancel(ignore); // Acceptor will not create further connections + + // shutdown all still existing connections + while (!_connectionMap.empty()) { + auto it = _connectionMap.cbegin(); + std::weak_ptr connection = it->second; + _connectionMap.erase(it); + if (auto connectionSharedPtr = connection.lock()) { + _acceptorMutex.unlock(); + // shutdown must be invoked without lock to avoid + // potential deadlock situation + connectionSharedPtr->shutdown(); + connectionSharedPtr.reset(); + // wait until the object has been destructed + while (connection.lock()) { + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + } + _acceptorMutex.lock(); + } + } + + _workGuard.reset(); acceptorLock.unlock(); + _ioContext->stop(); // Wait for worker before destructing members try { @@ -134,22 +159,51 @@ void UdsServer::run() void UdsServer::doAcceptClient() noexcept { - _newConnection = std::make_shared(_ioContext, _remoteConfig); + std::unique_lock acceptorLock(_acceptorMutex); + if (!_started.load()) { + JOYNR_LOG_INFO(logger(), + "Stop accepting new clients because UdsServer destructor has been invoked."); + return; + } + + // check if some of the existing entries can be removed + // which is the case when the weak_ptr has expired since the + // connection object got already destructed + std::unordered_set expiredEntries; + for (const auto& connectionsPair : _connectionMap) { + auto connectionIndex = connectionsPair.first; + auto connection = connectionsPair.second; + if (connection.expired()) { + expiredEntries.insert(connectionIndex); + } + } + for (const auto& connectionIndex : expiredEntries) { + _connectionMap.erase(connectionIndex); + } + + uint64_t index = _connectionIndex++; + auto newConnection = std::make_shared(_ioContext, _remoteConfig, index); + _connectionMap[index] = newConnection; + _acceptor.async_accept( - _newConnection->getSocket(), [this](boost::system::error_code acceptFailure) { + newConnection->getSocket(), + [this, newConnection, index](boost::system::error_code acceptFailure) { if (acceptFailure) { JOYNR_LOG_ERROR( logger(), "Failed to accept new client: {}", acceptFailure.message()); } else { - JOYNR_LOG_INFO(logger(), "Connection request received from new client."); - _newConnection->doReadInitHeader(); + JOYNR_LOG_INFO(logger(), + "Connection index {} request received from new client.", + index); + newConnection->doReadInitHeader(); doAcceptClient(); } }); } UdsServer::Connection::Connection(std::shared_ptr& ioContext, - const ConnectionConfig& config) noexcept + const ConnectionConfig& config, + std::uint64_t connectionIndex) noexcept : _ioContext{ioContext}, _socket(*ioContext), _connectedCallback{config._connectedCallback}, @@ -158,7 +212,8 @@ UdsServer::Connection::Connection(std::shared_ptr& ioCo _isClosed{false}, _username("connection not established"), _sendQueue(std::make_unique>(config._maxSendQueueSize)), - _readBuffer(std::make_unique()) + _readBuffer(std::make_unique()), + _connectionIndex(connectionIndex) { } @@ -197,7 +252,10 @@ std::string UdsServer::Connection::getUserName() username = std::string("anonymous"); int storedErrno = errno; JOYNR_LOG_ERROR( - logger(), "Could not obtain peer credentials from socket, errno {}", storedErrno); + logger(), + "Connection index {} could not obtain peer credentials from socket, errno {}", + _connectionIndex, + storedErrno); } return username; } @@ -213,17 +271,9 @@ void UdsServer::Connection::send(const smrf::ByteArrayView& msg, if (_isClosed.load()) { throw std::runtime_error("Connection already closed."); } - auto ioContext = _ioContext.lock(); - if (!ioContext) { - JOYNR_LOG_WARN(logger(), - "Forced close of connection to {} ({}) since server shutting down.", - _address.getId(), - getUserName()); - return; - } try { // UdsFrameBufferV1 first since it can cause exception - ioContext->post( + _ioContext->post( [frame = UdsFrameBufferV1(msg), self = shared_from_this(), callback]() mutable { try { if (self->_sendQueue->pushBack(std::move(frame), callback)) { @@ -235,7 +285,7 @@ void UdsServer::Connection::send(const smrf::ByteArrayView& msg, }); } catch (const joynr::exceptions::JoynrRuntimeException& e) { // In case generation of frame buffer failed, close connection - ioContext->post([self = shared_from_this(), e]() mutable { + _ioContext->post([self = shared_from_this(), e]() mutable { self->doClose("Failed to construct message", e); }); throw; @@ -248,14 +298,15 @@ void UdsServer::Connection::shutdown() return; } const std::string clientId = _address.getId().empty() ? "[unknown ID]" : _address.getId(); - JOYNR_LOG_INFO(logger(), "Closing connection to {} ({}).", clientId, getUserName()); - auto ioContext = _ioContext.lock(); - if (ioContext) { - ioContext->dispatch([self = shared_from_this()]() { self->doClose(); }); + JOYNR_LOG_INFO(logger(), + "Closing connection index {} to {} ({}).", + _connectionIndex, + clientId, + _username); + if (_ioContext) { + _ioContext->dispatch([self = shared_from_this()]() { self->doClose(); }); } - ioContext.reset(); - // Wait till close is processed or the server is shutting down - while ((!_isClosed.load()) && (!_ioContext.expired())) { + while (!_isClosed.load()) { std::this_thread::yield(); } } @@ -284,11 +335,12 @@ void UdsServer::Connection::doReadInitBody() noexcept try { self->_username = self->getUserName(); self->_address = self->_readBuffer->readInit(); - JOYNR_LOG_INFO( - logger(), - "Initialize connection for client with User / ID: {} / {}", - self->_username, - self->_address.getId()); + JOYNR_LOG_INFO(logger(), + "Initialize connection index {} for client with User / " + "ID: {} / {}", + self->_connectionIndex, + self->_username, + self->_address.getId()); self->_connectedCallback(self->_address, std::make_unique( std::weak_ptr(self))); @@ -372,7 +424,11 @@ void UdsServer::Connection::doClose(const std::string& errorMessage) noexcept { if (!_isClosed.load()) { const std::string clientId = _address.getId().empty() ? "[unknown ID]" : _address.getId(); - JOYNR_LOG_FATAL(logger(), "Connection to {} corrupted: {}", clientId, errorMessage); + JOYNR_LOG_FATAL(logger(), + "Connection index {} to {} corrupted: {}", + _connectionIndex, + clientId, + errorMessage); } doClose(); } @@ -384,7 +440,10 @@ void UdsServer::Connection::doClose() noexcept try { _disconnectedCallback(_address); } catch (const std::exception& e) { - JOYNR_LOG_ERROR(logger(), "Failed to process disconnection: {}", e.what()); + JOYNR_LOG_ERROR(logger(), + "Connection index {} failed to process disconnection: {}", + _connectionIndex, + e.what()); } } boost::system::error_code ignore; @@ -393,7 +452,10 @@ void UdsServer::Connection::doClose() noexcept try { _sendQueue->emptyQueueAndNotify("Connection closed."); } catch (const std::exception& e) { - JOYNR_LOG_ERROR(logger(), "Failed to process send-failure: {}", e.what()); + JOYNR_LOG_ERROR(logger(), + "Connection index {} failed to process send-failure: {}", + _connectionIndex, + e.what()); } } } diff --git a/cpp/libjoynr/uds/include/joynr/UdsServer.h b/cpp/libjoynr/uds/include/joynr/UdsServer.h index 3593169c82..4aabe816f3 100644 --- a/cpp/libjoynr/uds/include/joynr/UdsServer.h +++ b/cpp/libjoynr/uds/include/joynr/UdsServer.h @@ -82,8 +82,7 @@ class UdsServer void setConnectCallback(const Connected& callback); /** - * @brief Sets callback for disconnection of the client (not called if server is stopped before - * client disconnection) + * @brief Sets callback for disconnection of the client * @param callback Callback */ void setDisconnectCallback(const Disconnected& callback); @@ -117,7 +116,8 @@ class UdsServer public: Connection(std::shared_ptr& ioContext, - const ConnectionConfig& config) noexcept; + const ConnectionConfig& config, + std::uint64_t connectionIndex) noexcept; /** Notifies sender if the connection got lost (error occured) */ ~Connection() = default; @@ -146,7 +146,7 @@ class UdsServer void doClose(const std::string& errorMessage) noexcept; void doClose() noexcept; - std::weak_ptr _ioContext; + std::shared_ptr _ioContext; uds::socket _socket; Connected _connectedCallback; Disconnected _disconnectedCallback; @@ -162,6 +162,8 @@ class UdsServer std::unique_ptr> _sendQueue; std::unique_ptr _readBuffer; + std::uint64_t _connectionIndex; + ADD_LOGGER(Connection) }; @@ -191,10 +193,12 @@ class UdsServer std::chrono::milliseconds _openSleepTime; uds::endpoint _endpoint; uds::acceptor _acceptor; - std::shared_ptr _newConnection; std::future _worker; std::atomic_bool _started; std::mutex _acceptorMutex; + std::unordered_map> _connectionMap; + std::atomic _connectionIndex; + std::shared_ptr _workGuard; ADD_LOGGER(UdsServer) }; diff --git a/cpp/tests/unit-tests/uds/UdsServerTest.cpp b/cpp/tests/unit-tests/uds/UdsServerTest.cpp index a1586b8ce2..ec24da8727 100644 --- a/cpp/tests/unit-tests/uds/UdsServerTest.cpp +++ b/cpp/tests/unit-tests/uds/UdsServerTest.cpp @@ -41,28 +41,42 @@ TEST_F(UdsServerTest, multipleServerReusingSocket) { std::shared_ptr sender1; std::shared_ptr sender2; + auto semaphore1 = std::make_shared(); + auto semaphore2 = std::make_shared(); MockUdsServerCallbacks mockUdsServerCallbacks1; MockUdsServerCallbacks mockUdsServerCallbacks2; EXPECT_CALL(mockUdsServerCallbacks1, connectedMock(_, _)) .Times(1) - .WillOnce(SaveArg<1>(&sender1)); + .WillOnce(DoAll(SaveArg<1>(&sender1) , ReleaseSemaphore(semaphore1))); + EXPECT_CALL(mockUdsServerCallbacks1, disconnected(_)) + .Times(1); EXPECT_CALL(mockUdsServerCallbacks2, connectedMock(_, _)) .Times(1) - .WillOnce(SaveArg<1>(&sender2)); + .WillOnce(DoAll(SaveArg<1>(&sender2), ReleaseSemaphore(semaphore2))); + EXPECT_CALL(mockUdsServerCallbacks2, disconnected(_)) + .Times(1); auto server1 = createServer(mockUdsServerCallbacks1); server1->start(); + // expect client to get connected to server1 ASSERT_EQ(waitClientConnected(true), true) << "Client is not connected to initial server."; + ASSERT_TRUE(semaphore1->waitFor(_waitPeriodForClientServerCommunication)) + << "Failed to invoke connected callback of server 1."; auto server2 = createServer(mockUdsServerCallbacks2); server2->start(); std::this_thread::yield(); + // expect client to be still connected to server1 ASSERT_EQ(waitClientConnected(true), true) << "Client lost connection after second server reuses socket."; server1.reset(); + // expect client to now be disconnected since server1 got shutdown ASSERT_EQ(waitClientConnected(false), false) << "Client is not disconnected after initial server stopped."; restartClient(); + // expect client to now be connected to server2 after restart ASSERT_EQ(waitClientConnected(true), true) << "Client is not connected to second server after restart."; + ASSERT_TRUE(semaphore2->waitFor(_waitPeriodForClientServerCommunication)) + << "Failed to invoke connected callback of server 2."; } TEST_F(UdsServerTest, connectReceiveAndDisconnectFromClient_serverCallbacksCalled) @@ -125,7 +139,7 @@ TEST_F(UdsServerTest, sendToClient) .WillOnce(DoAll(SaveArg<1>(&sender), ReleaseSemaphore(semaphore))); EXPECT_CALL(mockUdsServerCallbacks, receivedMock(_, _, _)).Times(0); EXPECT_CALL(mockUdsServerCallbacks, disconnected(_)) - .Times(0); // Server disconnects before client + .Times(1); auto server = createServer(mockUdsServerCallbacks); server->start(); ASSERT_TRUE(semaphore->waitFor(_waitPeriodForClientServerCommunication)) @@ -150,8 +164,9 @@ TEST_F(UdsServerTest, robustness_sendException_otherClientsNotAffected) .Times(2) .WillRepeatedly(DoAll(SaveArg<1>(&tmpSender), ReleaseSemaphore(connectionSemaphore))); // disconnected called for erroneous sender, not for nominal one + EXPECT_CALL(mockUdsServerCallbacks, disconnected(_)).Times(1); EXPECT_CALL(mockUdsServerCallbacks, disconnected(_)) - .WillOnce(ReleaseSemaphore(disconnectionSemaphore)); + .WillOnce(ReleaseSemaphore(disconnectionSemaphore)).RetiresOnSaturation(); auto server = createServer(mockUdsServerCallbacks); server->start(); ASSERT_TRUE(connectionSemaphore->waitFor(_waitPeriodForClientServerCommunication)) @@ -194,10 +209,10 @@ TEST_F(UdsServerTest, robustness_disconnectsErroneousClients_goodClientsNotAffec .Times(2) .WillRepeatedly(DoAll(SaveArg<1>(&tmpSender), ReleaseSemaphore(connectSemaphore))); - // disconnected callback called for erroneous client, not for good client + // disconnected callback called first for erroneous client, later for good client + EXPECT_CALL(mockUdsServerCallbacks, disconnected(_)).Times(1); EXPECT_CALL(mockUdsServerCallbacks, disconnected(_)) - .Times(1) - .WillRepeatedly(ReleaseSemaphore(disconnectSemaphore)); + .WillOnce(ReleaseSemaphore(disconnectSemaphore)).RetiresOnSaturation(); auto server = createServer(mockUdsServerCallbacks); server->start(); @@ -306,8 +321,7 @@ TEST_F(UdsServerTest, robustness_nonblockingWrite) DoAll(SaveArg<1>(&tmpClientSender), ReleaseSemaphore(connectionSemaphore))); // disconnected not called, since no connection loss (UDS socket has per default no timeout) EXPECT_CALL(mockUdsServerCallbacks, disconnected(_)) - .Times(1) - .WillOnce(ReleaseSemaphore(disconnectSemaphore)); + .WillRepeatedly(ReleaseSemaphore(disconnectSemaphore)); auto server = createServer(mockUdsServerCallbacks); server->start(); ASSERT_TRUE(connectionSemaphore->waitFor(_waitPeriodForClientServerCommunication))