Skip to content

Commit

Permalink
Remove obsolete send pump thread
Browse files Browse the repository at this point in the history
  • Loading branch information
ungive committed Jul 19, 2024
1 parent 53c5256 commit ef6d648
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 71 deletions.
63 changes: 3 additions & 60 deletions clients/cpp/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,19 @@ ClientImpl::ClientImpl(std::string const& address, ClientOptions options)
&ClientImpl::on_websocket_message, this, std::placeholders::_1));
// Logging
loon::init_logging();
// Start the send pump thread
m_send_pump_thread = std::thread(&ClientImpl::send_pump, this);
// Start the manager loop thread
m_manager_loop_thread = std::thread(&ClientImpl::manager_loop, this);
}

ClientImpl::~ClientImpl()
{
stop();
// Stop the send pump thread
{
const std::lock_guard<std::mutex> lock(m_send_pump_comm_mutex);
m_stop_send_pump = true;
m_cv_send_pump.notify_all();
}
// Stop the manager loop thread
{
const std::lock_guard<std::mutex> lock(m_mutex);
m_stop_manager_loop = true;
m_cv_manager.notify_all();
}
// Join all running threads
m_send_pump_thread.join();
m_manager_loop_thread.join();
}

Expand Down Expand Up @@ -499,60 +489,12 @@ void ClientImpl::handle_message(ServerMessage const& message)

bool ClientImpl::send(ClientMessage const& message)
{
const std::lock_guard<std::mutex> send_lock(m_send_pump_send_mutex);
std::unique_lock<std::mutex> comm_lock(m_send_pump_comm_mutex);
m_send_pump_message = &message;
m_cv_send_pump.notify_one(); // notify message
m_cv_send_pump.wait(comm_lock, [this] { // wait until sent
return m_stop_send_pump || m_send_pump_message == nullptr;
});
if (m_stop_send_pump) {
return false;
}
return m_send_pump_result;
}

void ClientImpl::send_pump()
{
std::unique_lock<std::mutex> lock(m_send_pump_comm_mutex);
while (true) {
m_cv_send_pump.wait(
lock, [this] { // wait for a message or until stopped
return m_stop_send_pump || m_send_pump_message != nullptr;
});
if (m_stop_send_pump) {
return;
}
bool is_sent = internal_send(*m_send_pump_message);
m_send_pump_result = is_sent;
m_send_pump_message = nullptr;
m_cv_send_pump.notify_one(); // notify done
{
// While restarting, the mutex has to be unlocked,
// so that it can be locked by the send method again,
// once it receives the above condition variable signal.
lock.unlock();
if (!is_sent) {
// If the send operation failed, restart the connection.
// Do this after (!) the send operation has finished,
// such that the request handler can unlock its mutex
// and the restart operation can close the request handler.
// Otherwise a deadlock would occur.

// FIXME: no locking
restart();
}
lock.lock();
}
}
}

bool ClientImpl::internal_send(ClientMessage const& message)
{
const std::lock_guard<std::mutex> send_lock(m_write_mutex);
auto result = message.SerializeAsString();
if (result.empty()) {
log(Error) << "failed to serialize client message"
<< var("data_case", message.data_case());
restart();
return false;
}
#ifdef LOON_TEST
Expand All @@ -565,6 +507,7 @@ bool ClientImpl::internal_send(ClientMessage const& message)
#endif
if (n <= 0) {
log(Error) << "failed to send message" << var("retval", n);
restart();
return false;
}
return true;
Expand Down
13 changes: 2 additions & 11 deletions clients/cpp/src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,6 @@ class ClientImpl : public IClient

std::string make_url(std::string const& path);

void send_pump();
bool internal_send(ClientMessage const& message);

std::thread m_send_pump_thread{};
std::mutex m_send_pump_send_mutex{};
std::mutex m_send_pump_comm_mutex{};
bool m_stop_send_pump{ false };
std::condition_variable m_cv_send_pump{};
ClientMessage const* m_send_pump_message{ nullptr };
bool m_send_pump_result{ false };

// clang-format off
struct ManagerAction
{
Expand Down Expand Up @@ -278,6 +267,8 @@ class ClientImpl : public IClient
// which would call close and would trigger the close callback,
// which locks this mutex again.
std::mutex m_mutex{};
// Mutex for writing to the connection.
std::mutex m_write_mutex{};
// Requests mutex. For reads/writes from/to m_requests.
std::mutex m_request_mutex{};
// Use an "any" condition variable, so it works with a recursive mutex.
Expand Down

0 comments on commit ef6d648

Please sign in to comment.