Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache Experiments #1

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ jobs:

trial:
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -127,7 +126,6 @@ jobs:
trial-olddeps:
# Note: sqlite only; no postgres
if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -155,7 +153,6 @@ jobs:
# Very slow; only run if the branch name includes 'pypy'
# Note: sqlite only; no postgres. Completely untested since poetry move.
if: ${{ contains(github.ref, 'pypy') && !failure() && !cancelled() }}
needs: linting-done
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -186,7 +183,6 @@ jobs:
sytest:
if: ${{ !failure() && !cancelled() }}
needs: linting-done
runs-on: ubuntu-latest
container:
image: matrixdotorg/sytest-synapse:${{ matrix.sytest-tag }}
Expand Down Expand Up @@ -277,7 +273,6 @@ jobs:

portdb:
if: ${{ !failure() && !cancelled() }} # Allow previous steps to be skipped, but not fail
needs: linting-done
runs-on: ubuntu-latest
env:
TOP: ${{ github.workspace }}
Expand Down Expand Up @@ -315,7 +310,6 @@ jobs:

complement:
if: "${{ !failure() && !cancelled() }}"
needs: linting-done
runs-on: ubuntu-latest

strategy:
Expand Down Expand Up @@ -349,7 +343,6 @@ jobs:
# See https://github.com/matrix-org/synapse/issues/13161
complement-workers:
if: "${{ !failure() && !cancelled() }}"
needs: linting-done
runs-on: ubuntu-latest

steps:
Expand Down
18 changes: 11 additions & 7 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,21 @@ def __init__(
def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == DeviceListsStream.NAME:
self._device_list_id_gen.advance(instance_name, token)
self._invalidate_caches_for_devices(token, rows)
await self._invalidate_caches_for_devices(token, rows)
elif stream_name == UserSignatureStream.NAME:
self._device_list_id_gen.advance(instance_name, token)
for row in rows:
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
return await super().process_replication_rows(
stream_name, instance_name, token, rows
)

def _invalidate_caches_for_devices(
async def _invalidate_caches_for_devices(
self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
) -> None:
for row in rows:
Expand All @@ -70,9 +72,11 @@ def _invalidate_caches_for_devices(
# changes.
if row.entity.startswith("@"):
self._device_list_stream_cache.entity_has_changed(row.entity, token)
self.get_cached_devices_for_user.invalidate((row.entity,))
self._get_cached_user_device.invalidate((row.entity,))
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
await self.get_cached_devices_for_user.invalidate((row.entity,))
await self._get_cached_user_device.invalidate((row.entity,))
await self.get_device_list_last_stream_id_for_remote.invalidate(
(row.entity,)
)

else:
self._device_list_federation_stream_cache.entity_has_changed(
Expand Down
10 changes: 6 additions & 4 deletions synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def get_max_push_rules_stream_id(self) -> int:
return self._push_rules_stream_id_gen.get_current_token()

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == PushRulesStream.NAME:
self._push_rules_stream_id_gen.advance(instance_name, token)
for row in rows:
self.get_push_rules_for_user.invalidate((row.user_id,))
self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
await self.get_push_rules_for_user.invalidate((row.user_id,))
await self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
return await super().process_replication_rows(
stream_name, instance_name, token, rows
)
6 changes: 4 additions & 2 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ def __init__(
def get_pushers_stream_token(self) -> int:
return self._pushers_id_gen.get_current_token()

def process_replication_rows(
async def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None:
if stream_name == PushersStream.NAME:
self._pushers_id_gen.advance(instance_name, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
return await super().process_replication_rows(
stream_name, instance_name, token, rows
)
4 changes: 3 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ async def on_rdata(
token: stream token for this batch of rows
rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
"""
self.store.process_replication_rows(stream_name, instance_name, token, rows)
await self.store.process_replication_rows(
stream_name, instance_name, token, rows
)

if self.send_handler:
await self.send_handler.process_replication_rows(stream_name, token, rows)
Expand Down
2 changes: 1 addition & 1 deletion synapse/server_notices/server_notices_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def get_or_create_notice_room_for_user(self, user_id: str) -> str:
)
room_id = info["room_id"]

self.maybe_get_notice_room_for_user.invalidate((user_id,))
await self.maybe_get_notice_room_for_user.invalidate((user_id,))

max_id = await self._account_data_handler.add_tag_to_room(
user_id, room_id, SERVER_NOTICE_ROOM_TAG, {}
Expand Down
34 changes: 20 additions & 14 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
self.database_engine = database.engine
self.db_pool = database

def process_replication_rows(
async def process_replication_rows(
self,
stream_name: str,
instance_name: str,
Expand All @@ -56,7 +56,7 @@ def process_replication_rows(
) -> None:
pass

def _invalidate_state_caches(
async def _invalidate_state_caches(
self, room_id: str, members_changed: Collection[str]
) -> None:
"""Invalidates caches that are based on the current state, but does
Expand All @@ -68,28 +68,34 @@ def _invalidate_state_caches(
"""
# If there were any membership changes, purge the appropriate caches.
for host in {get_domain_from_id(u) for u in members_changed}:
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
await self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
if members_changed:
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
self._attempt_to_invalidate_cache(
await self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
await self._attempt_to_invalidate_cache(
"get_current_hosts_in_room", (room_id,)
)
await self._attempt_to_invalidate_cache(
"get_users_in_room_with_profiles", (room_id,)
)
self._attempt_to_invalidate_cache(
await self._attempt_to_invalidate_cache(
"get_number_joined_users_in_room", (room_id,)
)
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
await self._attempt_to_invalidate_cache(
"get_local_users_in_room", (room_id,)
)

for user_id in members_changed:
self._attempt_to_invalidate_cache(
await self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)

# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
await self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
await self._attempt_to_invalidate_cache(
"get_partial_current_state_ids", (room_id,)
)

def _attempt_to_invalidate_cache(
async def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
) -> None:
"""Attempts to invalidate the cache of the given name, ignoring if the
Expand All @@ -110,9 +116,9 @@ def _attempt_to_invalidate_cache(
return

if key is None:
cache.invalidate_all()
await cache.invalidate_all()
else:
cache.invalidate(tuple(key))
await cache.invalidate(tuple(key))


def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
Expand Down
24 changes: 13 additions & 11 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ async def ignored_users(self, user_id: str) -> FrozenSet[str]:
)
)

def process_replication_rows(
async def process_replication_rows(
self,
stream_name: str,
instance_name: str,
Expand All @@ -427,17 +427,19 @@ def process_replication_rows(
self._account_data_id_gen.advance(instance_name, token)
for row in rows:
if not row.room_id:
self.get_global_account_data_by_type_for_user.invalidate(
await self.get_global_account_data_by_type_for_user.invalidate(
(row.user_id, row.data_type)
)
self.get_account_data_for_user.invalidate((row.user_id,))
self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
self.get_account_data_for_room_and_type.invalidate(
await self.get_account_data_for_user.invalidate((row.user_id,))
await self.get_account_data_for_room.invalidate(
(row.user_id, row.room_id)
)
await self.get_account_data_for_room_and_type.invalidate(
(row.user_id, row.room_id, row.data_type)
)
self._account_data_stream_cache.entity_has_changed(row.user_id, token)

super().process_replication_rows(stream_name, instance_name, token, rows)
await super().process_replication_rows(stream_name, instance_name, token, rows)

async def add_account_data_to_room(
self, user_id: str, room_id: str, account_data_type: str, content: JsonDict
Expand Down Expand Up @@ -475,9 +477,9 @@ async def add_account_data_to_room(
)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id))
self.get_account_data_for_room_and_type.prefill(
await self.get_account_data_for_user.invalidate((user_id,))
await self.get_account_data_for_room.invalidate((user_id, room_id))
await self.get_account_data_for_room_and_type.prefill(
(user_id, room_id, account_data_type), content
)

Expand Down Expand Up @@ -510,8 +512,8 @@ async def add_account_data_for_user(
)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.invalidate(
await self.get_account_data_for_user.invalidate((user_id,))
await self.get_global_account_data_by_type_for_user.invalidate(
(user_id, account_data_type)
)

Expand Down
Loading