Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace worker connection's mutex with tbb::concurrent_hash_map #2461

Open
wants to merge 30 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3d495ef
remove related mutex with tbb hashmap
LindaSummer Jul 28, 2024
f6192d4
add mutex for erase and iteration
LindaSummer Jul 30, 2024
4416da7
remove useless code
LindaSummer Aug 1, 2024
fe89194
use tbb_parallel function rather than legacy for loop
LindaSummer Aug 4, 2024
1104241
remove useless headers
LindaSummer Aug 4, 2024
effbcf9
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 4, 2024
c0d9efb
refactor duplicated code
LindaSummer Aug 7, 2024
3c4205a
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 7, 2024
c17ae9a
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 8, 2024
9f1b707
remove reduandant concurrency protection in dtor
LindaSummer Aug 9, 2024
8385b41
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 9, 2024
14a0deb
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 9, 2024
f4bbded
fix sonar cloud analysis issue
LindaSummer Aug 9, 2024
797c9d9
Merge branch 'feature/tbb_worker_with_mutex' of github.com:LindaSumme…
LindaSummer Aug 9, 2024
4eff1e8
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 9, 2024
2027b03
make value delete after accessor erased
LindaSummer Aug 9, 2024
e3d992d
Merge branch 'feature/tbb_worker_with_mutex' of github.com:LindaSumme…
LindaSummer Aug 9, 2024
49d6792
make tbb header including use `<>`
LindaSummer Aug 9, 2024
8011d59
fix `monitor_conns` inserting behavior and refactor useless code.
LindaSummer Aug 11, 2024
20678a5
make deleting item before accessor erasing
LindaSummer Aug 11, 2024
b17dcec
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 11, 2024
0f3f8bd
fix sonar issue: `Worker::~Worker()` can keep the original code
LindaSummer Aug 12, 2024
afccc88
Merge branch 'feature/tbb_worker_with_mutex' of github.com:LindaSumme…
LindaSummer Aug 12, 2024
9714558
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 14, 2024
1665dab
fix typo and remove useless debug comment
LindaSummer Aug 15, 2024
65648bf
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 20, 2024
261812d
add thread constraint for tbb parallel function.
LindaSummer Sep 2, 2024
0b64679
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Sep 2, 2024
ee88850
fix free connection issue.
LindaSummer Sep 3, 2024
1484808
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 161 additions & 103 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
#include <list>
#include <utility>

#include "oneapi/tbb/parallel_for.h"
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
#include "oneapi/tbb/parallel_reduce.h"
#include "redis_connection.h"
#include "redis_request.h"
#include "server.h"
Expand Down Expand Up @@ -76,17 +78,29 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new())
}

Worker::~Worker() {
std::vector<redis::Connection *> conns;
conns.reserve(conns_.size() + monitor_conns_.size());

for (const auto &iter : conns_) {
conns.emplace_back(iter.second);
}
for (const auto &iter : monitor_conns_) {
conns.emplace_back(iter.second);
auto collect_conns_fn = [](ConnMap &conns) {
return parallel_reduce(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is parallel_reduce necessary here?

Copy link
Contributor Author

@LindaSummer LindaSummer Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @PragmaTwice ,

I use a reduce action before iteration because the parallel_for will acquire a range rather than one item.
So, I want to move the time costly action Close() outside of the parallel_for.

But since this function is inside the dtor, if the ownership is managed correctly, it should be the only function uses internal state of the Worker object.
I think the parallel_reduce can be replaced by parallel_for or even removed.

I will refactor it and run tests before pushing to current PR.

Best Regards,
Edward

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean maybe a single-thread for is enough here? didn't have a deep look yet cc @mapleFU

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @PragmaTwice ,

I have removed the parallel_reduce from dtor and all tests are passed in my repo's GitHub Actions.
I think in dtor the concurrency protection of internal state may be redundant since all references should be released except the dtor itself.

Best Regards,
Edward

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean maybe a single-thread for is enough here? didn't have a deep look yet

Yeah parallel task also has a dop (degree of parallel), and I guess default of parallel_for would be high and be good at writing cpu-bound tasks like OpenMP

I think maybe lock is prefered here

conns.range(), std::vector<redis::Connection *>{},
[](const ConnMap::range_type &range, std::vector<redis::Connection *> &&result) {
std::transform(range.begin(), range.end(), std::back_inserter(result),
[](const auto &it) { return it.second; });
return result;
},
[](const std::vector<redis::Connection *> &lhs, const std::vector<redis::Connection *> &rhs) {
std::vector<redis::Connection *> result = lhs;
result.insert(result.end(), rhs.begin(), rhs.end());
return result;
});
};
auto conns = collect_conns_fn(conns_);
auto monitor_conns = collect_conns_fn(monitor_conns_);

for (auto conn : conns) {
conn->Close();
}
for (const auto &iter : conns) {
iter->Close();

for (auto conn : monitor_conns) {
conn->Close();
}

timer_.reset();
Expand Down Expand Up @@ -311,9 +325,7 @@ void Worker::Stop(uint32_t wait_seconds) {
}

Status Worker::AddConnection(redis::Connection *c) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(c->GetFD());
if (iter != conns_.end()) {
if (ConnMap::const_accessor accessor; conns_.find(accessor, c->GetFD())) {
return {Status::NotOK, "connection was exists"};
}

Expand All @@ -323,7 +335,8 @@ Status Worker::AddConnection(redis::Connection *c) {
return {Status::NotOK, "max number of clients reached"};
}

conns_.emplace(c->GetFD(), c);
ConnMap::accessor accessor;
conns_.insert(accessor, std::make_pair(c->GetFD(), c));
uint64_t id = srv->GetClientID();
c->SetID(id);

Expand All @@ -333,18 +346,17 @@ Status Worker::AddConnection(redis::Connection *c) {
redis::Connection *Worker::removeConnection(int fd) {
redis::Connection *conn = nullptr;

std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
conn = iter->second;
conns_.erase(iter);
if (ConnMap::accessor accessor; conns_.find(accessor, fd)) {
{
conn = accessor->second;
conns_.erase(accessor);
}
srv->DecrClientNum();
}

iter = monitor_conns_.find(fd);
if (iter != monitor_conns_.end()) {
conn = iter->second;
monitor_conns_.erase(iter);
if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) {
conn = accessor->second;
monitor_conns_.erase(accessor);
srv->DecrClientNum();
srv->DecrMonitorClientNum();
}
Expand Down Expand Up @@ -409,31 +421,27 @@ void Worker::FreeConnection(redis::Connection *conn) {
}

void Worker::FreeConnectionByID(int fd, uint64_t id) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end() && iter->second->GetID() == id) {
if (ConnMap::accessor accessor; conns_.find(accessor, fd)) {
if (rate_limit_group_ != nullptr) {
bufferevent_remove_from_rate_limit_group(iter->second->GetBufferEvent());
bufferevent_remove_from_rate_limit_group(accessor->second->GetBufferEvent());
}
delete iter->second;
conns_.erase(iter);

delete accessor->second;
conns_.erase(accessor);
mapleFU marked this conversation as resolved.
Show resolved Hide resolved

srv->DecrClientNum();
}

iter = monitor_conns_.find(fd);
if (iter != monitor_conns_.end() && iter->second->GetID() == id) {
delete iter->second;
monitor_conns_.erase(iter);
if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) {
delete accessor->second;
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
monitor_conns_.erase(accessor);
srv->DecrClientNum();
srv->DecrMonitorClientNum();
}
}

Status Worker::EnableWriteEvent(int fd) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
auto bev = iter->second->GetBufferEvent();
if (ConnMap::const_accessor accessor; conns_.find(accessor, fd)) {
auto bev = accessor->second->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
return Status::OK();
}
Expand All @@ -442,115 +450,146 @@ Status Worker::EnableWriteEvent(int fd) {
}

Status Worker::Reply(int fd, const std::string &reply) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
iter->second->SetLastInteraction();
redis::Reply(iter->second->Output(), reply);
if (ConnMap::accessor accessor; conns_.find(accessor, fd)) {
accessor->second->SetLastInteraction();
redis::Reply(accessor->second->Output(), reply);
return Status::OK();
}

return {Status::NotOK, "connection doesn't exist"};
}

void Worker::BecomeMonitorConn(redis::Connection *conn) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mapleFU ,

Sorry for delay reply.
I'd like to use this way on all accessors to mitigate potential inconsistency in monitor_conns_ and conns_.

Suggested change
void Worker::BecomeMonitorConn(redis::Connection *conn) {
void Worker::BecomeMonitorConn(redis::Connection *conn) {
ConnMap::accessor accessor;
ConnMap::accessor monitor_accessor;
bool find_conn = conns_.find(accessor, conn->GetFD());
bool find_monitor = monitor_conns_.find(monitor_accessor, conn->GetFD());
if (find_conn) {
conns_.erase(accessor);
}
if (find_monitor) {
monitor_accessor->second = conn;
} else {
monitor_conns_.insert(monitor_accessor, std::make_pair(conn->GetFD(), conn));
}
srv->IncrMonitorClientNum();
conn->EnableFlag(redis::Connection::kMonitor);
}

Best Regards,
Edward

{
std::lock_guard<std::mutex> guard(conns_mu_);
conns_.erase(conn->GetFD());
monitor_conns_[conn->GetFD()] = conn;
if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) {
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
conns_.erase(accessor);
accessor.release();

if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) {
accessor->second = conn;
} else {
monitor_conns_.insert(accessor, std::make_pair(conn->GetFD(), conn));
}
}
srv->IncrMonitorClientNum();
conn->EnableFlag(redis::Connection::kMonitor);
}

void Worker::QuitMonitorConn(redis::Connection *conn) {
{
std::lock_guard<std::mutex> guard(conns_mu_);
monitor_conns_.erase(conn->GetFD());
conns_[conn->GetFD()] = conn;
if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) {
{
monitor_conns_.erase(accessor);
accessor.release();
}
if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) {
accessor->second = conn;
} else {
conns_.insert(accessor, std::make_pair(conn->GetFD(), conn));
}
}
srv->DecrMonitorClientNum();
conn->DisableFlag(redis::Connection::kMonitor);
}

void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) {
std::unique_lock<std::mutex> lock(conns_mu_);

for (const auto &iter : monitor_conns_) {
if (conn == iter.second) continue; // skip the monitor command

if (conn->GetNamespace() == iter.second->GetNamespace() || iter.second->GetNamespace() == kDefaultNamespace) {
iter.second->Reply(response);
tbb::parallel_for(monitor_conns_.range(), [conn, response](const ConnMap::range_type &range) {
for (auto &it : range) {
const auto &value = it.second;
if (conn == value) continue; // skip the monitor command
if (conn->GetNamespace() == value->GetNamespace() || value->GetNamespace() == kDefaultNamespace) {
value->Reply(response);
}
}
}
});
}

std::string Worker::GetClientsStr() {
std::unique_lock<std::mutex> lock(conns_mu_);

std::string output;
for (const auto &iter : conns_) {
redis::Connection *conn = iter.second;
output.append(conn->ToString());
}

return output;
return tbb::parallel_reduce(
conns_.range(), std::string{},
[](const ConnMap::range_type &range, std::string &&result) {
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
for (auto &it : range) {
result.append(it.second->ToString());
}
return result;
},
[](const std::string &lhs, const std::string &rhs) {
std::string result = lhs;
result.append(rhs);
return result;
});
}

void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string &addr, uint64_t type, bool skipme,
int64_t *killed) {
std::lock_guard<std::mutex> guard(conns_mu_);

for (const auto &iter : conns_) {
redis::Connection *conn = iter.second;
if (skipme && self == conn) continue;

// no need to kill the client again if the kCloseAfterReply flag is set
if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) {
continue;
}

if ((type & conn->GetClientType()) ||
(!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) ||
(id != 0 && conn->GetID() == id)) {
conn->EnableFlag(redis::Connection::kCloseAfterReply);
// enable write event to notify worker wake up ASAP, and remove the connection
if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection
auto bev = conn->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
for (const auto key : getConnFds()) {
if (ConnMap::accessor accessor; conns_.find(accessor, key)) {
auto conn = accessor->second;
if (skipme && self == conn) continue;

// no need to kill the client again if the kCloseAfterReply flag is set
if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) {
continue;
}
if ((type & conn->GetClientType()) ||
(!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) ||
(id != 0 && conn->GetID() == id)) {
conn->EnableFlag(redis::Connection::kCloseAfterReply);
// enable write event to notify worker wake up ASAP, and remove the connection
if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection
auto bev = conn->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
}
(*killed)++;
}
(*killed)++;
}
}
}

void Worker::KickoutIdleClients(int timeout) {
std::vector<std::pair<int, uint64_t>> to_be_killed_conns;

{
std::lock_guard<std::mutex> guard(conns_mu_);
if (conns_.empty()) {
return;
}
auto fd_list = getConnFds();
if (fd_list.empty()) {
return;
}

int iterations = std::min(static_cast<int>(conns_.size()), 50);
auto iter = conns_.upper_bound(last_iter_conn_fd_);
while (iterations--) {
if (iter == conns_.end()) iter = conns_.begin();
if (static_cast<int>(iter->second->GetIdleTime()) >= timeout) {
to_be_killed_conns.emplace_back(iter->first, iter->second->GetID());
}
iter++;
std::set<int> fds(fd_list.cbegin(), fd_list.cend());

int iterations = std::min(static_cast<int>(conns_.size()), 50);
auto iter = fds.upper_bound(last_iter_conn_fd_);
while (iterations--) {
if (iter == fds.end()) {
iter = fds.begin();
}
if (ConnMap::const_accessor accessor;
conns_.find(accessor, *iter) && static_cast<int>(accessor->second->GetIdleTime()) >= timeout) {
to_be_killed_conns.emplace_back(accessor->first, accessor->second->GetID());
}
iter--;
last_iter_conn_fd_ = iter->first;
iter++;
}
iter--;
last_iter_conn_fd_ = *iter;

for (const auto &conn : to_be_killed_conns) {
FreeConnectionByID(conn.first, conn.second);
}
}

std::vector<int> Worker::getConnFds() const {
return tbb::parallel_reduce(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO parallel_for looks like some cpu-bound operations, and would it occupies more threads than expected here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mapleFU ,
Thanks for your review suggestion.
I read the tbb's official document, and find the schedule uses all cores on default.
Maybe we should set a limitation for tbb schedule.
In fact, I'm not sure do we really need tbb hashmap to replace current mutex now. 🤔

Best Regards,
Edward

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I'm not sure do we really need tbb hashmap to replace current mutex now.

I think it's worth doing if we don't traverse all conns_ frequently.

conns_.range(), std::vector<int>{},
[](const ConnMap::const_range_type &range, std::vector<int> result) {
for (const auto &fd : range) {
result.emplace_back(fd.first);
}
return result;
},
[](const std::vector<int> &lhs, const std::vector<int> &rhs) {
std::vector<int> result = lhs;
result.insert(result.end(), rhs.begin(), rhs.end());
return result;
});
}

void WorkerThread::Start() {
auto s = util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); });

Expand All @@ -564,6 +603,25 @@ void WorkerThread::Start() {
LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
}

std::map<int, redis::Connection *> Worker::GetConnections() const {
std::map<int, redis::Connection *> result;
result = tbb::parallel_reduce(
conns_.range(), result,
[](const ConnMap::const_range_type &range, std::map<int, redis::Connection *> &&tmp_result) {
// std::map<int, redis::Connection *> tmp_result;
for (auto &it : range) {
tmp_result.emplace(it.first, it.second);
}
return tmp_result;
},
[](const std::map<int, redis::Connection *> &lhs, const std::map<int, redis::Connection *> &rhs) {
std::map<int, redis::Connection *> result = lhs;
result.insert(rhs.cbegin(), rhs.cend());
return result;
});
return result;
}

void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }

void WorkerThread::Join() {
Expand Down
Loading
Loading