Skip to content

Commit

Permalink
cache experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
Fizzadar committed Jul 17, 2022
1 parent 96cf81e commit 5016d9e
Show file tree
Hide file tree
Showing 29 changed files with 1,242 additions and 1,847 deletions.
7 changes: 0 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ jobs:

trial:
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -127,7 +126,6 @@ jobs:
trial-olddeps:
# Note: sqlite only; no postgres
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -155,7 +153,6 @@ jobs:
# Very slow; only run if the branch name includes 'pypy'
# Note: sqlite only; no postgres. Completely untested since poetry move.
if: ${{ contains(github.ref, 'pypy') && !failure() && !cancelled() }}
needs: linting-done
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -186,7 +183,6 @@ jobs:
sytest:
if: ${{ !failure() && !cancelled() }}
needs: linting-done
runs-on: ubuntu-latest
container:
image: matrixdotorg/sytest-synapse:${{ matrix.sytest-tag }}
Expand Down Expand Up @@ -277,7 +273,6 @@ jobs:

portdb:
if: ${{ !failure() && !cancelled() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
env:
TOP: ${{ github.workspace }}
Expand Down Expand Up @@ -315,7 +310,6 @@ jobs:

complement:
if: "${{ !failure() && !cancelled() }}"
needs: linting-done
runs-on: ubuntu-latest

strategy:
Expand Down Expand Up @@ -349,7 +343,6 @@ jobs:
# See https://github.com/matrix-org/synapse/issues/13161
complement-workers:
if: "${{ !failure() && !cancelled() }}"
needs: linting-done
runs-on: ubuntu-latest

steps:
Expand Down
18 changes: 11 additions & 7 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,21 @@ def __init__(
def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == DeviceListsStream.NAME:
self._device_list_id_gen.advance(instance_name, token)
self._invalidate_caches_for_devices(token, rows)
await self._invalidate_caches_for_devices(token, rows)
elif stream_name == UserSignatureStream.NAME:
self._device_list_id_gen.advance(instance_name, token)
for row in rows:
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
return await super().process_replication_rows(
stream_name, instance_name, token, rows
)

def _invalidate_caches_for_devices(
async def _invalidate_caches_for_devices(
self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
) -> None:
for row in rows:
Expand All @@ -70,9 +72,11 @@ def _invalidate_caches_for_devices(
# changes.
if row.entity.startswith("@"):
self._device_list_stream_cache.entity_has_changed(row.entity, token)
self.get_cached_devices_for_user.invalidate((row.entity,))
self._get_cached_user_device.invalidate((row.entity,))
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
await self.get_cached_devices_for_user.invalidate((row.entity,))
await self._get_cached_user_device.invalidate((row.entity,))
await self.get_device_list_last_stream_id_for_remote.invalidate(
(row.entity,)
)

else:
self._device_list_federation_stream_cache.entity_has_changed(
Expand Down
10 changes: 6 additions & 4 deletions synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def get_max_push_rules_stream_id(self) -> int:
return self._push_rules_stream_id_gen.get_current_token()

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == PushRulesStream.NAME:
self._push_rules_stream_id_gen.advance(instance_name, token)
for row in rows:
self.get_push_rules_for_user.invalidate((row.user_id,))
self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
await self.get_push_rules_for_user.invalidate((row.user_id,))
await self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
return await super().process_replication_rows(
stream_name, instance_name, token, rows
)
6 changes: 4 additions & 2 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ def __init__(
def get_pushers_stream_token(self) -> int:
return self._pushers_id_gen.get_current_token()

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == PushersStream.NAME:
self._pushers_id_gen.advance(instance_name, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
return await super().process_replication_rows(
stream_name, instance_name, token, rows
)
4 changes: 3 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ async def on_rdata(
token: stream token for this batch of rows
rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
"""
self.store.process_replication_rows(stream_name, instance_name, token, rows)
await self.store.process_replication_rows(
stream_name, instance_name, token, rows
)

if self.send_handler:
await self.send_handler.process_replication_rows(stream_name, token, rows)
Expand Down
2 changes: 1 addition & 1 deletion synapse/server_notices/server_notices_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def get_or_create_notice_room_for_user(self, user_id: str) -> str:
)
room_id = info["room_id"]

self.maybe_get_notice_room_for_user.invalidate((user_id,))
await self.maybe_get_notice_room_for_user.invalidate((user_id,))

max_id = await self._account_data_handler.add_tag_to_room(
user_id, room_id, SERVER_NOTICE_ROOM_TAG, {}
Expand Down
34 changes: 20 additions & 14 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
self.database_engine = database.engine
self.db_pool = database

def process_replication_rows(
async def process_replication_rows(
self,
stream_name: str,
instance_name: str,
Expand All @@ -56,7 +56,7 @@ def process_replication_rows(
) -> None:
pass

def _invalidate_state_caches(
async def _invalidate_state_caches(
self, room_id: str, members_changed: Collection[str]
) -> None:
"""Invalidates caches that are based on the current state, but does
Expand All @@ -68,28 +68,34 @@ def _invalidate_state_caches(
"""
# If there were any membership changes, purge the appropriate caches.
for host in {get_domain_from_id(u) for u in members_changed}:
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
await self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
if members_changed:
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
self._attempt_to_invalidate_cache(
await self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
await self._attempt_to_invalidate_cache(
"get_current_hosts_in_room", (room_id,)
)
await self._attempt_to_invalidate_cache(
"get_users_in_room_with_profiles", (room_id,)
)
self._attempt_to_invalidate_cache(
await self._attempt_to_invalidate_cache(
"get_number_joined_users_in_room", (room_id,)
)
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
await self._attempt_to_invalidate_cache(
"get_local_users_in_room", (room_id,)
)

for user_id in members_changed:
self._attempt_to_invalidate_cache(
await self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)

# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
await self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
await self._attempt_to_invalidate_cache(
"get_partial_current_state_ids", (room_id,)
)

def _attempt_to_invalidate_cache(
async def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
) -> None:
"""Attempts to invalidate the cache of the given name, ignoring if the
Expand All @@ -110,9 +116,9 @@ def _attempt_to_invalidate_cache(
return

if key is None:
cache.invalidate_all()
await cache.invalidate_all()
else:
cache.invalidate(tuple(key))
await cache.invalidate(tuple(key))


def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
Expand Down
24 changes: 13 additions & 11 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ async def ignored_users(self, user_id: str) -> FrozenSet[str]:
)
)

def process_replication_rows(
async def process_replication_rows(
self,
stream_name: str,
instance_name: str,
Expand All @@ -427,17 +427,19 @@ def process_replication_rows(
self._account_data_id_gen.advance(instance_name, token)
for row in rows:
if not row.room_id:
self.get_global_account_data_by_type_for_user.invalidate(
await self.get_global_account_data_by_type_for_user.invalidate(
(row.user_id, row.data_type)
)
self.get_account_data_for_user.invalidate((row.user_id,))
self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
self.get_account_data_for_room_and_type.invalidate(
await self.get_account_data_for_user.invalidate((row.user_id,))
await self.get_account_data_for_room.invalidate(
(row.user_id, row.room_id)
)
await self.get_account_data_for_room_and_type.invalidate(
(row.user_id, row.room_id, row.data_type)
)
self._account_data_stream_cache.entity_has_changed(row.user_id, token)

super().process_replication_rows(stream_name, instance_name, token, rows)
await super().process_replication_rows(stream_name, instance_name, token, rows)

async def add_account_data_to_room(
self, user_id: str, room_id: str, account_data_type: str, content: JsonDict
Expand Down Expand Up @@ -475,9 +477,9 @@ async def add_account_data_to_room(
)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id))
self.get_account_data_for_room_and_type.prefill(
await self.get_account_data_for_user.invalidate((user_id,))
await self.get_account_data_for_room.invalidate((user_id, room_id))
await self.get_account_data_for_room_and_type.prefill(
(user_id, room_id, account_data_type), content
)

Expand Down Expand Up @@ -510,8 +512,8 @@ async def add_account_data_for_user(
)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.invalidate(
await self.get_account_data_for_user.invalidate((user_id,))
await self.get_global_account_data_by_type_for_user.invalidate(
(user_id, account_data_type)
)

Expand Down
Loading

0 comments on commit 5016d9e

Please sign in to comment.