-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: unsubscribe pub/sub connections after cluster migration #4529
Conversation
Signed-off-by: kostas <[email protected]>
Signed-off-by: kostas <[email protected]>
Needs a little bit of clean up which I will handle tomorrow. |
src/server/cluster/cluster_family.cc
Outdated
auto* channel_store = ServerState::tlocal()->channel_store(); | ||
auto deleted = SlotSet(deleted_slots); | ||
channel_store->UnsubscribeAfterClusterSlotMigration(deleted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this inside DeleteSlots
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
src/server/conn_context.cc
Outdated
@@ -223,6 +223,16 @@ size_t ConnectionContext::UsedMemory() const { | |||
return facade::ConnectionContext::UsedMemory() + dfly::HeapSize(conn_state); | |||
} | |||
|
|||
void ConnectionContext::Unsubscribe(std::string_view channel) { | |||
auto* sinfo = conn_state.subscribe_info.get(); | |||
sinfo->channels.erase(channel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add DCHECK result > 0
cb.update_mu.lock(); | ||
auto* store = cb.most_recent.load(memory_order_relaxed); | ||
|
||
// Deep copy, we will remove channels | ||
auto* target = new ChannelStore::ChannelMap{*store->channels_}; | ||
|
||
for (auto key : ops_) { | ||
auto it = target->find(key); | ||
freelist_.push_back(it->second.Get()); | ||
target->erase(it); | ||
continue; | ||
} | ||
|
||
// Prepare replacement. | ||
auto* replacement = new ChannelStore{target, store->patterns_}; | ||
|
||
// Update control block and unlock it. | ||
cb.most_recent.store(replacement, memory_order_relaxed); | ||
cb.update_mu.unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use lock_guard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually prefer using lock_guard
with a scope {}
. However, here I need to release the lock on line 367 but I need to use the variables defined in the lock scope store at the end of this function. Therefore, to avoid declaring those in an outerscope I rather do this manually via lock
and unlock
src/server/channel_store.cc
Outdated
// queued SubscribeMaps in the freelist are no longer in use. | ||
shard_set->pool()->AwaitFiberOnAll([this](unsigned idx, util::ProactorBase*) { | ||
ServerState::tlocal()->UnsubscribeSlotsAndUpdateChannelStore( | ||
ops_, ChannelStore::control_block.most_recent.load(memory_order_relaxed)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to read ChannelStore::control_block.most_recent if you already have it a couple of lines above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this code should also be executed under lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to read ChannelStore::control_block.most_recent if you already have it a couple of lines above
This is how our channel store works -- via a ReadCopyUpdate operation. What that means in practice, is that a thread copies the channel store locally, updates its state and finally streamlines the new channel store to all proactors. That's why you see ChannelStore::control_block.most_recent.load(memory_order_relaxed)
.
We do not need any lock here. This is how it works. For more info see channel_store.h
, it has a bunch of comments there. I agree it looks weird, but these are the semantics 🤷
After a slot migration we should unsubscribe all connections from affected channels. This PR adds this support.
resolves #4517