Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kostasrim committed Feb 20, 2025
1 parent 6b0ad67 commit aea2394
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
7 changes: 4 additions & 3 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ void DeleteSlots(const SlotRanges& slots_ranges) {
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).FlushSlots(slots_ranges);
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));

auto* channel_store = ServerState::tlocal()->channel_store();
auto deleted = SlotSet(slots_ranges);
channel_store->UnsubscribeAfterClusterSlotMigration(deleted);
}

void WriteFlushSlotsToJournal(const SlotRanges& slot_ranges) {
Expand Down Expand Up @@ -626,9 +630,6 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder
auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges();
deleted_slots.Merge(outgoing_migrations.slot_ranges);
DeleteSlots(deleted_slots);
auto* channel_store = ServerState::tlocal()->channel_store();
auto deleted = SlotSet(deleted_slots);
channel_store->UnsubscribeAfterClusterSlotMigration(deleted);
LOG_IF(INFO, !deleted_slots.Empty())
<< "Flushing newly unowned slots: " << deleted_slots.ToString();
WriteFlushSlotsToJournal(deleted_slots);
Expand Down
4 changes: 3 additions & 1 deletion src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ size_t ConnectionContext::UsedMemory() const {

void ConnectionContext::Unsubscribe(std::string_view channel) {
auto* sinfo = conn_state.subscribe_info.get();
sinfo->channels.erase(channel);
DCHECK(sinfo);
auto erased = sinfo->channels.erase(channel);
DCHECK(erased);
if (sinfo->IsEmpty()) {
conn_state.subscribe_info.reset();
DCHECK_GE(subscriptions, 1u);
Expand Down

0 comments on commit aea2394

Please sign in to comment.