-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bust _membership_stream_cache
cache when current state changes
#17732
base: develop
Are you sure you want to change the base?
Changes from all commits
2bd0e63
d327c62
64eb71f
dd87620
7c53716
bceb4e0
944d1e0
20ff239
f40e649
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix membership caches not updating in state reset scenarios. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -219,6 +219,11 @@ def process_replication_rows( | |
room_id = row.keys[0] | ||
members_changed = set(row.keys[1:]) | ||
self._invalidate_state_caches(room_id, members_changed) | ||
self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] | ||
room_id, token | ||
) | ||
for user_id in members_changed: | ||
self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined] | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+222
to
+226
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wherever we are busting We've forgotten to bust |
||
elif row.cache_func == PURGE_HISTORY_CACHE_NAME: | ||
if row.keys is None: | ||
raise Exception( | ||
|
@@ -236,6 +241,10 @@ def process_replication_rows( | |
room_id = row.keys[0] | ||
self._invalidate_caches_for_room_events(room_id) | ||
self._invalidate_caches_for_room(room_id) | ||
self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] | ||
room_id, token | ||
) | ||
self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] | ||
else: | ||
self._attempt_to_invalidate_cache(row.cache_func, row.keys) | ||
|
||
|
@@ -275,6 +284,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: | |
self._attempt_to_invalidate_cache( | ||
"get_sliding_sync_rooms_for_user", None | ||
) | ||
self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined] | ||
elif data.type == EventTypes.RoomEncryption: | ||
self._attempt_to_invalidate_cache( | ||
"get_room_encryption", (data.room_id,) | ||
|
@@ -291,6 +301,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: | |
# Similar to the above, but the entire caches are invalidated. This is | ||
# unfortunate for the membership caches, but should recover quickly. | ||
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] | ||
self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] | ||
self._attempt_to_invalidate_cache("get_rooms_for_user", None) | ||
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,)) | ||
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1605,7 +1605,13 @@ def _update_current_state_txn( | |
room_id | ||
delta_state: Deltas that are going to be used to update the | ||
`current_state_events` table. Changes to the current state of the room. | ||
stream_id: TODO | ||
stream_id: This is expected to be the minimum `stream_ordering` for the | ||
batch of events that we are persisting; which means we do not end up in a | ||
situation where workers see events before the `current_state_delta` updates. | ||
FIXME: However, this function also gets called with next upcoming | ||
`stream_ordering` when we re-sync the state of a partial stated room (see | ||
`update_current_state(...)`) which may be "correct" but it would be good to | ||
nail down what exactly is the expected value here. | ||
Comment on lines
+1608
to
+1614
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previous conversation: #17512 (comment) I decided to define it in some way given we're using it for cache busting below and was curious if it is actually correct. Still not confident whether it's perfect for cache busting but might be good enough. |
||
sliding_sync_table_changes: Changes to the | ||
`sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables | ||
derived from the given `delta_state` (see | ||
|
@@ -1908,6 +1914,13 @@ def _update_current_state_txn( | |
stream_id, | ||
) | ||
|
||
for user_id in members_to_cache_bust: | ||
txn.call_after( | ||
self.store._membership_stream_cache.entity_has_changed, | ||
user_id, | ||
stream_id, | ||
) | ||
Comment on lines
+1917
to
+1922
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This matches what we do for
Comment on lines
+1917
to
+1922
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the actual call that busts the the membership cache for the tests. I assume that is because this is what busts in monolith mode vs the other calls I've added are more for workers over replication |
||
|
||
# Invalidate the various caches | ||
self.store._invalidate_state_caches_and_stream( | ||
txn, room_id, members_to_cache_bust | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -314,6 +314,17 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: | |
self._entity_to_key[entity] = stream_pos | ||
self._evict() | ||
|
||
def all_entities_changed(self, stream_pos: int) -> None: | ||
""" | ||
Mark all entities as changed. This is useful when the cache is invalidated and | ||
there may be some potential change for all of the entities. | ||
""" | ||
Comment on lines
+317
to
+321
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re: I don't think it makes sense to drop all of the keys as we're essentially not sure if something has changed so we have to update them to say "something might have changed but we don't know for sure". I think this is the way and is just "unfortunate for the membership caches" |
||
# All entities are at the same stream position now. | ||
self._cache = SortedDict({stream_pos: set(self._entity_to_key.keys())}) | ||
self._entity_to_key = { | ||
entity: stream_pos for entity in self._entity_to_key.keys() | ||
} | ||
|
||
def _evict(self) -> None: | ||
""" | ||
Ensure the cache has not exceeded the maximum size. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -255,3 +255,19 @@ def test_max_pos(self) -> None: | |
|
||
# Unknown entities will return None | ||
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), None) | ||
|
||
def test_all_entities_changed(self) -> None: | ||
""" | ||
`StreamChangeCache.all_entities_changed(...)` will mark all entites as changed. | ||
""" | ||
cache = StreamChangeCache("#test", 1, max_size=10) | ||
|
||
cache.entity_has_changed("[email protected]", 2) | ||
cache.entity_has_changed("[email protected]", 3) | ||
cache.entity_has_changed("[email protected]", 4) | ||
|
||
cache.all_entities_changed(5) | ||
|
||
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), 5) | ||
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), 5) | ||
self.assertEqual(cache.get_max_pos_of_last_change("[email protected]"), 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kinda weird to just stick this here (same with the others in
process_replication_rows
). Better way to organize this?