Skip to content

Commit

Permalink
cache experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
Fizzadar committed Jul 17, 2022
1 parent 96cf81e commit b878cf9
Show file tree
Hide file tree
Showing 28 changed files with 1,188 additions and 1,845 deletions.
7 changes: 0 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ jobs:

trial:
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -127,7 +126,6 @@ jobs:
trial-olddeps:
# Note: sqlite only; no postgres
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -155,7 +153,6 @@ jobs:
# Very slow; only run if the branch name includes 'pypy'
# Note: sqlite only; no postgres. Completely untested since poetry move.
if: ${{ contains(github.ref, 'pypy') && !failure() && !cancelled() }}
needs: linting-done
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -186,7 +183,6 @@ jobs:
sytest:
if: ${{ !failure() && !cancelled() }}
needs: linting-done
runs-on: ubuntu-latest
container:
image: matrixdotorg/sytest-synapse:${{ matrix.sytest-tag }}
Expand Down Expand Up @@ -277,7 +273,6 @@ jobs:

portdb:
if: ${{ !failure() && !cancelled() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
env:
TOP: ${{ github.workspace }}
Expand Down Expand Up @@ -315,7 +310,6 @@ jobs:

complement:
if: "${{ !failure() && !cancelled() }}"
needs: linting-done
runs-on: ubuntu-latest

strategy:
Expand Down Expand Up @@ -349,7 +343,6 @@ jobs:
# See https://github.com/matrix-org/synapse/issues/13161
complement-workers:
if: "${{ !failure() && !cancelled() }}"
needs: linting-done
runs-on: ubuntu-latest

steps:
Expand Down
14 changes: 7 additions & 7 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ def __init__(
def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == DeviceListsStream.NAME:
self._device_list_id_gen.advance(instance_name, token)
self._invalidate_caches_for_devices(token, rows)
await self._invalidate_caches_for_devices(token, rows)
elif stream_name == UserSignatureStream.NAME:
self._device_list_id_gen.advance(instance_name, token)
for row in rows:
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
return await super().process_replication_rows(stream_name, instance_name, token, rows)

def _invalidate_caches_for_devices(
async def _invalidate_caches_for_devices(
self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
) -> None:
for row in rows:
Expand All @@ -70,9 +70,9 @@ def _invalidate_caches_for_devices(
# changes.
if row.entity.startswith("@"):
self._device_list_stream_cache.entity_has_changed(row.entity, token)
self.get_cached_devices_for_user.invalidate((row.entity,))
self._get_cached_user_device.invalidate((row.entity,))
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
await self.get_cached_devices_for_user.invalidate((row.entity,))
await self._get_cached_user_device.invalidate((row.entity,))
await self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))

else:
self._device_list_federation_stream_cache.entity_has_changed(
Expand Down
8 changes: 4 additions & 4 deletions synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def get_max_push_rules_stream_id(self) -> int:
return self._push_rules_stream_id_gen.get_current_token()

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == PushRulesStream.NAME:
self._push_rules_stream_id_gen.advance(instance_name, token)
for row in rows:
self.get_push_rules_for_user.invalidate((row.user_id,))
self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
await self.get_push_rules_for_user.invalidate((row.user_id,))
await self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
return await super().process_replication_rows(stream_name, instance_name, token, rows)
4 changes: 2 additions & 2 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ def __init__(
def get_pushers_stream_token(self) -> int:
return self._pushers_id_gen.get_current_token()

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == PushersStream.NAME:
self._pushers_id_gen.advance(instance_name, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
return await super().process_replication_rows(stream_name, instance_name, token, rows)
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async def on_rdata(
token: stream token for this batch of rows
rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
"""
self.store.process_replication_rows(stream_name, instance_name, token, rows)
await self.store.process_replication_rows(stream_name, instance_name, token, rows)

if self.send_handler:
await self.send_handler.process_replication_rows(stream_name, token, rows)
Expand Down
2 changes: 1 addition & 1 deletion synapse/server_notices/server_notices_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def get_or_create_notice_room_for_user(self, user_id: str) -> str:
)
room_id = info["room_id"]

self.maybe_get_notice_room_for_user.invalidate((user_id,))
await self.maybe_get_notice_room_for_user.invalidate((user_id,))

max_id = await self._account_data_handler.add_tag_to_room(
user_id, room_id, SERVER_NOTICE_ROOM_TAG, {}
Expand Down
28 changes: 14 additions & 14 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
self.database_engine = database.engine
self.db_pool = database

def process_replication_rows(
async def process_replication_rows(
self,
stream_name: str,
instance_name: str,
Expand All @@ -56,7 +56,7 @@ def process_replication_rows(
) -> None:
pass

def _invalidate_state_caches(
async def _invalidate_state_caches(
self, room_id: str, members_changed: Collection[str]
) -> None:
"""Invalidates caches that are based on the current state, but does
Expand All @@ -68,28 +68,28 @@ def _invalidate_state_caches(
"""
# If there were any membership changes, purge the appropriate caches.
for host in {get_domain_from_id(u) for u in members_changed}:
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
await self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
if members_changed:
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
self._attempt_to_invalidate_cache(
await self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
await self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
await self._attempt_to_invalidate_cache(
"get_users_in_room_with_profiles", (room_id,)
)
self._attempt_to_invalidate_cache(
await self._attempt_to_invalidate_cache(
"get_number_joined_users_in_room", (room_id,)
)
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
await self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))

for user_id in members_changed:
self._attempt_to_invalidate_cache(
await self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)

# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
await self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
await self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))

def _attempt_to_invalidate_cache(
async def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
) -> None:
"""Attempts to invalidate the cache of the given name, ignoring if the
Expand All @@ -110,9 +110,9 @@ def _attempt_to_invalidate_cache(
return

if key is None:
cache.invalidate_all()
await cache.invalidate_all()
else:
cache.invalidate(tuple(key))
await cache.invalidate(tuple(key))


def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
Expand Down
22 changes: 11 additions & 11 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ async def ignored_users(self, user_id: str) -> FrozenSet[str]:
)
)

def process_replication_rows(
async def process_replication_rows(
self,
stream_name: str,
instance_name: str,
Expand All @@ -427,17 +427,17 @@ def process_replication_rows(
self._account_data_id_gen.advance(instance_name, token)
for row in rows:
if not row.room_id:
self.get_global_account_data_by_type_for_user.invalidate(
await self.get_global_account_data_by_type_for_user.invalidate(
(row.user_id, row.data_type)
)
self.get_account_data_for_user.invalidate((row.user_id,))
self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
self.get_account_data_for_room_and_type.invalidate(
await self.get_account_data_for_user.invalidate((row.user_id,))
await self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
await self.get_account_data_for_room_and_type.invalidate(
(row.user_id, row.room_id, row.data_type)
)
self._account_data_stream_cache.entity_has_changed(row.user_id, token)

super().process_replication_rows(stream_name, instance_name, token, rows)
await super().process_replication_rows(stream_name, instance_name, token, rows)

async def add_account_data_to_room(
self, user_id: str, room_id: str, account_data_type: str, content: JsonDict
Expand Down Expand Up @@ -475,9 +475,9 @@ async def add_account_data_to_room(
)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id))
self.get_account_data_for_room_and_type.prefill(
await self.get_account_data_for_user.invalidate((user_id,))
await self.get_account_data_for_room.invalidate((user_id, room_id))
await self.get_account_data_for_room_and_type.prefill(
(user_id, room_id, account_data_type), content
)

Expand Down Expand Up @@ -510,8 +510,8 @@ async def add_account_data_for_user(
)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.invalidate(
await self.get_account_data_for_user.invalidate((user_id,))
await self.get_global_account_data_by_type_for_user.invalidate(
(user_id, account_data_type)
)

Expand Down
50 changes: 25 additions & 25 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ def get_all_updated_caches_txn(
"get_all_updated_caches", get_all_updated_caches_txn
)

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == EventsStream.NAME:
for row in rows:
self._process_event_stream_row(token, row)
await self._process_event_stream_row(token, row)
elif stream_name == BackfillStream.NAME:
for row in rows:
self._invalidate_caches_for_event(
await self._invalidate_caches_for_event(
-token,
row.event_id,
row.room_id,
Expand All @@ -150,18 +150,18 @@ def process_replication_rows(

room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
await self._invalidate_state_caches(room_id, members_changed)
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
await self._attempt_to_invalidate_cache(row.cache_func, row.keys)

super().process_replication_rows(stream_name, instance_name, token, rows)
await super().process_replication_rows(stream_name, instance_name, token, rows)

def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
async def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
data = row.data

if row.type == EventsStreamEventRow.TypeId:
assert isinstance(data, EventsStreamEventRow)
self._invalidate_caches_for_event(
await self._invalidate_caches_for_event(
token,
data.event_id,
data.room_id,
Expand All @@ -176,13 +176,13 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)

if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
await self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

def _invalidate_caches_for_event(
async def _invalidate_caches_for_event(
self,
stream_ordering: int,
event_id: str,
Expand All @@ -196,37 +196,37 @@ def _invalidate_caches_for_event(
# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))
await self._invalidate_local_get_event_cache(event_id)
await self.have_seen_event.invalidate((room_id, event_id))

self.get_latest_event_ids_in_room.invalidate((room_id,))
await self.get_latest_event_ids_in_room.invalidate((room_id,))

self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))
await self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))

# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
self._get_membership_from_event_id.invalidate((event_id,))
await self._get_membership_from_event_id.invalidate((event_id,))

if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)

if redacts:
self._invalidate_local_get_event_cache(redacts)
await self._invalidate_local_get_event_cache(redacts)
# Caches which might leak edits must be invalidated for the event being
# redacted.
self.get_relations_for_event.invalidate((redacts,))
self.get_applicable_edit.invalidate((redacts,))
await self.get_relations_for_event.invalidate((redacts,))
await self.get_applicable_edit.invalidate((redacts,))

if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_local_user.invalidate((state_key,))
await self.get_invited_rooms_for_local_user.invalidate((state_key,))

if relates_to:
self.get_relations_for_event.invalidate((relates_to,))
self.get_aggregation_groups_for_event.invalidate((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))
self.get_thread_summary.invalidate((relates_to,))
self.get_thread_participated.invalidate((relates_to,))
await self.get_relations_for_event.invalidate((relates_to,))
await self.get_aggregation_groups_for_event.invalidate((relates_to,))
await self.get_applicable_edit.invalidate((relates_to,))
await self.get_thread_summary.invalidate((relates_to,))
await self.get_thread_participated.invalidate((relates_to,))

async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
Expand All @@ -242,7 +242,7 @@ async def invalidate_cache_and_stream(
if not cache_func:
return

cache_func.invalidate(keys)
await cache_func.invalidate(keys)
await self.db_pool.runInteraction(
"invalidate_cache_and_stream",
self._send_invalidation_to_replication,
Expand Down
Loading

0 comments on commit b878cf9

Please sign in to comment.