From 26213f94aeb4643f572c05f0a643a72e46acd364 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 12 Mar 2024 17:35:15 +0000 Subject: [PATCH 01/29] Add prometheus histograms for room shutdown/delete/purge # Conflicts: # synapse/rest/admin/rooms.py --- synapse/handlers/pagination.py | 8 ++++++++ synapse/handlers/room.py | 19 +++++++++++++++++++ synapse/rest/admin/rooms.py | 15 ++++++++++++++- 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index cd3a9088cd..9a3d277254 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -19,8 +19,11 @@ # # import logging +import time from typing import TYPE_CHECKING, List, Optional, Set, Tuple, cast +from prometheus_client import Histogram + from twisted.python.failure import Failure from synapse.api.constants import Direction, EventTypes, Membership @@ -51,6 +54,8 @@ logger = logging.getLogger(__name__) +purge_time = Histogram("room_purge_time", "Time taken to purge rooms (sec)") + # How many single event gaps we tolerate returning in a `/messages` response before we # backfill and try to fill in the history. This is an arbitrarily picked number so feel # free to tune it in the future. @@ -385,6 +390,7 @@ async def purge_room( room_id: room to be purged force: set true to skip checking for joined users. """ + purge_start = time.time() logger.info("starting purge room_id=%s force=%s", room_id, force) async with self._worker_locks.acquire_multi_read_write_lock( @@ -407,6 +413,8 @@ async def purge_room( await self._storage_controllers.purge_events.purge_room(room_id) + purge_end = time.time() + purge_time.observe(purge_end - purge_start) logger.info("purge complete for room_id %s", room_id) @trace diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5e81a51638..fbe6becf8f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,6 +25,7 @@ import math import random import string +import time from collections import OrderedDict from http import HTTPStatus from typing import ( @@ -40,6 +41,7 @@ ) import attr +from prometheus_client import Histogram from typing_extensions import TypedDict import synapse.events.snapshot @@ -104,6 +106,17 @@ FIVE_MINUTES_IN_MS = 5 * 60 * 1000 +shutdown_time = Histogram("room_shutdown_time", "Time taken to shutdown rooms (sec)") +shutdown_kick_count = Histogram( + "room_shutdown_kick_count", + "Number of users successfully kicked while shutting down a room", +) +shutdown_failed_kick_count = Histogram( + "room_shutdown_failed_kick_count", + "Number of users that were failed to be kicked while shutting down a room", +) + + @attr.s(slots=True, frozen=True, auto_attribs=True) class EventContext: events_before: List[EventBase] @@ -1969,6 +1982,7 @@ async def shutdown_room( else: logger.info("Shutting down room %r", room_id) + shutdown_start = time.time() users = await self.store.get_local_users_related_to_room(room_id) for user_id, membership in users: # If the user is not in the room (or is banned), nothing to do. @@ -2055,4 +2069,9 @@ async def shutdown_room( else: result["local_aliases"] = [] + shutdown_end = time.time() + shutdown_kick_count.observe(len(result["kicked_users"])) + shutdown_failed_kick_count.observe(len(result["failed_to_kick_users"])) + shutdown_time.observe(shutdown_end - shutdown_start) + return result diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 4252f98a6c..7f3b151a0a 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -19,11 +19,13 @@ # # import logging +import time from http import HTTPStatus from typing import TYPE_CHECKING, List, Optional, Tuple, cast from urllib import parse as urlparse import attr +from prometheus_client import Histogram from synapse.api.constants import Direction, EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError @@ -61,6 +63,11 @@ logger = logging.getLogger(__name__) +delete_time = Histogram( + "admin_room_delete_time", + "Time taken to delete rooms via the admin API (sec)", +) + class RoomRestV2Servlet(RestServlet): """Delete a room from server asynchronously with a background task. @@ -326,13 +333,19 @@ async def on_GET( async def on_DELETE( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: - return await self._delete_room( + logger.info(f"[admin/rooms] deleting {room_id}") + start = time.time() + response = await self._delete_room( request, room_id, self.auth, self.room_shutdown_handler, self.pagination_handler, ) + end = time.time() + logger.info(f"[admin/rooms] deleting {room_id} took {end - start} seconds") + delete_time.observe(end - start) + return response async def _delete_room( self, From f5a909d4f4c3447245f6d92fae5cacef6dc4caf3 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 16:53:28 +0000 Subject: [PATCH 02/29] Install shared secret authenticator in Docker image --- docker/Dockerfile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docker/Dockerfile b/docker/Dockerfile index d4cb9414ff..1d512f0442 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -134,6 +134,10 @@ COPY --from=requirements /synapse/requirements.txt /synapse/ RUN --mount=type=cache,target=/root/.cache/pip \ pip install --prefix="/install" --no-deps --no-warn-script-location -r /synapse/requirements.txt +# Beeper: install shared secret authenticator +RUN pip install --prefix="/install" --no-deps --no-warn-script-location \ + 'git+https://github.com/devture/matrix-synapse-shared-secret-auth@e178353ec87c56e0169dd04466d4769da5ed9c46#egg=shared_secret_authenticator' + # Copy over the rest of the synapse source code. COPY synapse /synapse/synapse/ COPY rust /synapse/rust/ From d909e2d80469cbe06da6aae34f613fbc6439c5a4 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 16:53:36 +0000 Subject: [PATCH 03/29] Update Beeper base rules --- rust/src/push/base_rules.rs | 116 +++++++++++++++++++++++++++++++----- 1 file changed, 100 insertions(+), 16 deletions(-) diff --git a/rust/src/push/base_rules.rs b/rust/src/push/base_rules.rs index 74f02d6001..1739acfb22 100644 --- a/rust/src/push/base_rules.rs +++ b/rust/src/push/base_rules.rs @@ -83,19 +83,76 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[ default: true, default_enabled: false, }, + // Disable notifications for auto-accepted room invites + // NOTE: this rule must be a higher prio than .m.rule.invite_for_me because + // that will also match the same events. PushRule { - rule_id: Cow::Borrowed("global/override/.m.rule.suppress_notices"), + rule_id: Cow::Borrowed("global/override/.com.beeper.suppress_auto_invite"), + priority_class: 5, + conditions: Cow::Borrowed(&[ + Condition::Known(KnownCondition::EventMatch(EventMatchCondition { + key: Cow::Borrowed("type"), + pattern: Cow::Borrowed("m.room.member"), + })), + Condition::Known(KnownCondition::EventMatch(EventMatchCondition { + key: Cow::Borrowed("content.membership"), + pattern: Cow::Borrowed("invite"), + })), + Condition::Known(KnownCondition::EventMatchType(EventMatchTypeCondition { + key: Cow::Borrowed("state_key"), + pattern_type: Cow::Borrowed(&EventMatchPatternType::UserId), + })), + Condition::Known(KnownCondition::EventPropertyIs(EventPropertyIsCondition { + key: Cow::Borrowed("content.fi\\.mau\\.will_auto_accept"), + value: Cow::Borrowed(&SimpleJsonValue::Bool(true)), + })), + ]), + actions: Cow::Borrowed(&[Action::DontNotify]), + default: true, + default_enabled: true, + }, + // We don't want to notify on edits. Not only can this be confusing in real + // time (2 notifications, one message) but it's especially confusing + // if a bridge needs to edit a previously backfilled message. + PushRule { + rule_id: Cow::Borrowed("global/override/.com.beeper.suppress_edits"), priority_class: 5, conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch( EventMatchCondition { - key: Cow::Borrowed("content.msgtype"), - pattern: Cow::Borrowed("m.notice"), + key: Cow::Borrowed("content.m\\.relates_to.rel_type"), + pattern: Cow::Borrowed("m.replace"), }, ))]), actions: Cow::Borrowed(&[]), default: true, default_enabled: true, }, + PushRule { + rule_id: Cow::Borrowed("global/override/.com.beeper.suppress_send_message_status"), + priority_class: 5, + conditions: Cow::Borrowed(&[ + Condition::Known(KnownCondition::EventMatch(EventMatchCondition { + key: Cow::Borrowed("type"), + pattern: Cow::Borrowed("com.beeper.message_send_status"), + })), + ]), + actions: Cow::Borrowed(&[Action::DontNotify]), + default: true, + default_enabled: true, + }, + PushRule { + rule_id: Cow::Borrowed("global/override/.com.beeper.suppress_power_levels"), + priority_class: 5, + conditions: Cow::Borrowed(&[ + Condition::Known(KnownCondition::EventMatch(EventMatchCondition { + key: Cow::Borrowed("type"), + pattern: Cow::Borrowed("cm.room.power_levels"), + })), + ]), + actions: Cow::Borrowed(&[Action::DontNotify]), + default: true, + default_enabled: true, + }, PushRule { rule_id: Cow::Borrowed("global/override/.m.rule.invite_for_me"), priority_class: 5, @@ -215,19 +272,6 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[ default: true, default_enabled: true, }, - PushRule { - rule_id: Cow::Borrowed("global/override/.m.rule.reaction"), - priority_class: 5, - conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch( - EventMatchCondition { - key: Cow::Borrowed("type"), - pattern: Cow::Borrowed("m.reaction"), - }, - ))]), - actions: Cow::Borrowed(&[]), - default: true, - default_enabled: true, - }, PushRule { rule_id: Cow::Borrowed("global/override/.m.rule.room.server_acl"), priority_class: 5, @@ -290,6 +334,22 @@ pub const BASE_APPEND_CONTENT_RULES: &[PushRule] = &[PushRule { }]; pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[ + // Beeper change: this rule is moved down from override. This means room + // rules take precedence, so if you enable bot notifications (by modifying + // this rule) notifications will not be sent for muted rooms. + PushRule { + rule_id: Cow::Borrowed("global/underride/.m.rule.suppress_notices"), + priority_class: 1, + conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch( + EventMatchCondition { + key: Cow::Borrowed("content.msgtype"), + pattern: Cow::Borrowed("m.notice"), + }, + ))]), + actions: Cow::Borrowed(&[Action::DontNotify]), + default: true, + default_enabled: true, + }, PushRule { rule_id: Cow::Borrowed("global/underride/.m.rule.call"), priority_class: 1, @@ -640,6 +700,30 @@ pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[ default: true, default_enabled: true, }, + // Enable notifications for reactions to your own messages *in rooms with less + // than 20 members*. + PushRule { + rule_id: Cow::Borrowed("global/underride/.com.beeper.reaction"), + priority_class: 1, + conditions: Cow::Borrowed(&[ + Condition::Known(KnownCondition::EventMatch(EventMatchCondition { + key: Cow::Borrowed("type"), + pattern: Cow::Borrowed("m.reaction"), + })), + Condition::Known(KnownCondition::RoomMemberCount { + is: Some(Cow::Borrowed("<20")), + }), + Condition::Known(KnownCondition::RelatedEventMatchType(RelatedEventMatchTypeCondition { + key: Cow::Borrowed("sender"), + pattern_type: Cow::Borrowed(&EventMatchPatternType::UserId), + rel_type: Cow::Borrowed("m.annotation"), + include_fallbacks: None, + })), + ]), + actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_FALSE_ACTION]), + default: true, + default_enabled: true, + }, PushRule { rule_id: Cow::Borrowed("global/underride/.org.matrix.msc3930.rule.poll_start_one_to_one"), priority_class: 1, From 032fa4cd6fe89831f4a7f50b56ff45d2f3e57128 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 16:54:15 +0000 Subject: [PATCH 04/29] Add JWT UI auth flow --- synapse/api/constants.py | 1 + synapse/handlers/auth.py | 4 ++ synapse/handlers/ui_auth/checkers.py | 83 ++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 98884b4967..52982e2d15 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -88,6 +88,7 @@ class LoginType: SSO: Final = "m.login.sso" DUMMY: Final = "m.login.dummy" REGISTRATION_TOKEN: Final = "m.login.registration_token" + JWT: Final = "org.matrix.login.jwt" # This is used in the `type` parameter for /register when called by diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index a1fab99f6b..33620dbb35 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -424,6 +424,10 @@ async def _get_available_ui_auth_types(self, user: UserID) -> Iterable[str]: ): ui_auth_types.add(LoginType.SSO) + # If JWT is enabled, allow user to re-authenticate with one + if self.hs.config.jwt.jwt_enabled: + ui_auth_types.add(LoginType.JWT) + return ui_auth_types def get_enabled_auth_types(self) -> Iterable[str]: diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py index 32dca8c43b..c133fa3e31 100644 --- a/synapse/handlers/ui_auth/checkers.py +++ b/synapse/handlers/ui_auth/checkers.py @@ -27,6 +27,7 @@ from synapse.api.constants import LoginType from synapse.api.errors import Codes, LoginError, SynapseError +from synapse.types import UserID from synapse.util import json_decoder if TYPE_CHECKING: @@ -321,6 +322,87 @@ async def check_auth(self, authdict: dict, clientip: str) -> Any: ) +class JwtAuthChecker(UserInteractiveAuthChecker): + AUTH_TYPE = LoginType.JWT + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + self.hs = hs + + def is_enabled(self) -> bool: + return bool(self.hs.config.jwt.jwt_enabled) + + async def check_auth(self, authdict: dict, clientip: str) -> Any: + token = authdict.get("token", None) + if token is None: + raise LoginError( + 403, "Token field for JWT is missing", errcode=Codes.FORBIDDEN + ) + + from authlib.jose import JsonWebToken, JWTClaims + from authlib.jose.errors import BadSignatureError, InvalidClaimError, JoseError + + jwt = JsonWebToken([self.hs.config.jwt.jwt_algorithm]) + claim_options = {} + if self.hs.config.jwt.jwt_issuer is not None: + claim_options["iss"] = { + "value": self.hs.config.jwt.jwt_issuer, + "essential": True, + } + if self.hs.config.jwt.jwt_audiences is not None: + claim_options["aud"] = { + "values": self.hs.config.jwt.jwt_audiences, + "essential": True, + } + + try: + claims = jwt.decode( + token, + key=self.hs.config.jwt.jwt_secret, + claims_cls=JWTClaims, + claims_options=claim_options, + ) + except BadSignatureError: + # We handle this case separately to provide a better error message + raise LoginError( + 403, + "JWT validation failed: Signature verification failed", + errcode=Codes.FORBIDDEN, + ) + except JoseError as e: + # A JWT error occurred, return some info back to the client. + raise LoginError( + 403, + "JWT validation failed: %s" % (str(e),), + errcode=Codes.FORBIDDEN, + ) + + try: + claims.validate(leeway=120) # allows 2 min of clock skew + + # Enforce the old behavior which is rolled out in productive + # servers: if the JWT contains an 'aud' claim but none is + # configured, the login attempt will fail + if claims.get("aud") is not None: + if ( + self.hs.config.jwt.jwt_audiences is None + or len(self.hs.config.jwt.jwt_audiences) == 0 + ): + raise InvalidClaimError("aud") + except JoseError as e: + raise LoginError( + 403, + "JWT validation failed: %s" % (str(e),), + errcode=Codes.FORBIDDEN, + ) + + user = claims.get(self.hs.config.jwt.jwt_subject_claim, None) + if user is None: + raise LoginError(403, "Invalid JWT", errcode=Codes.FORBIDDEN) + + return UserID(user, self.hs.hostname).to_string() + + INTERACTIVE_AUTH_CHECKERS: Sequence[Type[UserInteractiveAuthChecker]] = [ DummyAuthChecker, TermsAuthChecker, @@ -328,5 +410,6 @@ async def check_auth(self, authdict: dict, clientip: str) -> Any: EmailIdentityAuthChecker, MsisdnAuthChecker, RegistrationTokenAuthChecker, + JwtAuthChecker, ] """A list of UserInteractiveAuthChecker classes""" From b46d1b733e4ec4ccc63138b671b0f59f31c32c4c Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 16:54:23 +0000 Subject: [PATCH 05/29] Update email copy for Beeper --- synapse/res/templates/password_reset.html | 4 ++-- synapse/res/templates/password_reset_confirmation.html | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/res/templates/password_reset.html b/synapse/res/templates/password_reset.html index 1f267946c8..0b0c969554 100644 --- a/synapse/res/templates/password_reset.html +++ b/synapse/res/templates/password_reset.html @@ -2,9 +2,9 @@ {% block title %}Password reset{% endblock %} {% block body %} -

A password reset request has been received for your Matrix account. If this was you, please click the link below to confirm resetting your password:

+

A password reset request has been received for your Beeper account. If this was you, please click the link below to confirm resetting your password:

{{ link }} -

If this was not you, do not click the link above and instead contact your server administrator. Thank you.

+

If this was not you, do not click the link above and instead contact the Beeper Support team. Thank you.

{% endblock %} diff --git a/synapse/res/templates/password_reset_confirmation.html b/synapse/res/templates/password_reset_confirmation.html index fabb9a6ed5..6af2d5aa7c 100644 --- a/synapse/res/templates/password_reset_confirmation.html +++ b/synapse/res/templates/password_reset_confirmation.html @@ -8,7 +8,7 @@ -

You have requested to reset your Matrix account password. Click the link below to confirm this action.

+

You have requested to reset your Beeper account password. Click the link below to confirm this action.

If you did not mean to do this, please close this page and your password will not be changed.

From 037ab5de99d409d4d3f21e7901976253ecd28357 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 16:54:31 +0000 Subject: [PATCH 06/29] Add Beeper log formatter --- synapse/logging/__init__.py | 13 +++++++++++-- synapse/logging/_terse_json.py | 12 ++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py index 15b92d7ef3..6268607087 100644 --- a/synapse/logging/__init__.py +++ b/synapse/logging/__init__.py @@ -22,10 +22,19 @@ import logging from synapse.logging._remote import RemoteHandler -from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter +from synapse.logging._terse_json import ( + BeeperTerseJsonFormatter, + JsonFormatter, + TerseJsonFormatter, +) # These are imported to allow for nicer logging configuration files. -__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"] +__all__ = [ + "RemoteHandler", + "JsonFormatter", + "TerseJsonFormatter", + "BeeperTerseJsonFormatter", +] # Debug logger for https://github.com/matrix-org/synapse/issues/9533 etc issue9533_logger = logging.getLogger("synapse.9533_debug") diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 6a6afbfc0b..da044e7773 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -92,3 +92,15 @@ def format(self, record: logging.LogRecord) -> str: } return self._format(record, event) + + +class BeeperTerseJsonFormatter(JsonFormatter): + def format(self, record: logging.LogRecord) -> str: + event = { + "message": record.getMessage(), + "namespace": record.name, + "level": record.levelname.lower(), + "time": round(record.created, 2), + } + + return self._format(record, event) From a96e080b1403dc66876a04f4042bb27adf9350c7 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 16:57:42 +0000 Subject: [PATCH 07/29] Add Beeper previews --- synapse/app/generic_worker.py | 2 + synapse/handlers/sync.py | 37 ++- synapse/rest/client/sync.py | 17 +- synapse/storage/databases/main/__init__.py | 2 + synapse/storage/databases/main/beeper.py | 102 ++++++ tests/rest/client/test_sync.py | 345 +++++++++++++++++++++ 6 files changed, 503 insertions(+), 2 deletions(-) create mode 100644 synapse/storage/databases/main/beeper.py diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 8c2a74a723..af46935ee5 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -63,6 +63,7 @@ ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore, ) +from synapse.storage.databases.main.beeper import BeeperStore from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.client_ips import ClientIpWorkerStore from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore @@ -155,6 +156,7 @@ class GenericWorkerStore( LockStore, SessionStore, TaskSchedulerWorkerStore, + BeeperStore, ): # Properties that multiple storage classes define. Tell mypy what the # expected type is. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a6d54ee4b8..7a0f81d401 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -119,6 +119,7 @@ class SyncConfig: is_guest: bool request_key: SyncRequestKey device_id: Optional[str] + beeper_previews: bool = False @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -153,6 +154,7 @@ class JoinedSyncResult: unread_thread_notifications: JsonDict summary: Optional[JsonDict] unread_count: int + preview: Optional[JsonDict] def __bool__(self) -> bool: """Make the result appear empty if there are no updates. This is used @@ -163,6 +165,7 @@ def __bool__(self) -> bool: or self.state or self.ephemeral or self.account_data + or self.preview # nb the notification count does not, er, count: if there's nothing # else in the result, we don't need to send it. ) @@ -2589,6 +2592,8 @@ async def _generate_room_entry( } ) + user_id = sync_result_builder.sync_config.user.to_string() + # Note: `batch` can be both empty and limited here in the case where # `_load_filtered_recents` can't find any events the user should see # (e.g. due to having ignored the sender of the last 50 events). @@ -2598,7 +2603,6 @@ async def _generate_room_entry( # newly joined room, unless either a) they've joined before or b) the # tag was added by synapse e.g. for server notice rooms. if full_state: - user_id = sync_result_builder.sync_config.user.to_string() tags = await self.store.get_tags_for_room(user_id, room_id) # If there aren't any tags, don't send the empty tags list down @@ -2688,8 +2692,39 @@ async def _generate_room_entry( unread_thread_notifications={}, summary=summary, unread_count=0, + preview=None, ) + # Only generate previews if we have new events that would change it + if batch.events and sync_config.beeper_previews: + preview = ( + await self.store.beeper_preview_event_for_room_id_and_user_id( + room_id=room_id, user_id=user_id, to_key=now_token.room_key + ) + ) + + if preview: + preview_event_id, preview_origin_server_ts = preview + room_sync.preview = { + "event_id": preview_event_id, + "origin_server_ts": preview_origin_server_ts, + } + + # Check if we already have the event in the batch, in which + # case we needn't add it here. No point in checking state as + # we don't preview state events. + for ev in batch.events: + if ev.event_id == preview_event_id: + break + else: + room_sync.preview["event"] = await self.store.get_event( + preview_event_id, + allow_none=True, + ) + else: + # This should never happen! + logger.warning("Beeper preview is missing! roomID=%s", room_id) + if room_sync or always_include: notifs = await self.unread_notifs_for_room_id(room_id, sync_config) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 2b103ca6a8..98298c8093 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -135,16 +135,18 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: ) filter_id = parse_string(request, "filter") full_state = parse_boolean(request, "full_state", default=False) + beeper_previews = parse_boolean(request, "beeper_previews", default=False) logger.debug( "/sync: user=%r, timeout=%r, since=%r, " - "set_presence=%r, filter_id=%r, device_id=%r", + "set_presence=%r, filter_id=%r, device_id=%r, beeper_previews=%r", user, timeout, since, set_presence, filter_id, device_id, + beeper_previews, ) # Stream position of the last ignored users account data event for this user, @@ -168,6 +170,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: full_state, device_id, last_ignore_accdata_streampos, + beeper_previews, ) if filter_id is None: @@ -199,6 +202,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: is_guest=requester.is_guest, request_key=request_key, device_id=device_id, + beeper_previews=beeper_previews, ) since_token = None @@ -550,6 +554,17 @@ async def encode_room( if self._msc2654_enabled: result["org.matrix.msc2654.unread_count"] = room.unread_count + if room.preview: + if "event" in room.preview: + room.preview["event"] = ( + await self._event_serializer.serialize_events( + [room.preview["event"]], + time_now, + config=serialize_options, + ) + )[0] + result["com.beeper.inbox.preview"] = room.preview + return result diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 586e84f2a4..45bb1154f5 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -40,6 +40,7 @@ from .account_data import AccountDataStore from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore +from .beeper import BeeperStore from .cache import CacheInvalidationWorkerStore from .censor_events import CensorEventsStore from .client_ips import ClientIpWorkerStore @@ -156,6 +157,7 @@ class DataStore( LockStore, SessionStore, TaskSchedulerWorkerStore, + BeeperStore, ): def __init__( self, diff --git a/synapse/storage/databases/main/beeper.py b/synapse/storage/databases/main/beeper.py new file mode 100644 index 0000000000..8495390cd2 --- /dev/null +++ b/synapse/storage/databases/main/beeper.py @@ -0,0 +1,102 @@ +# Beep beep! + +import logging +from typing import TYPE_CHECKING, Optional, Tuple, cast + +from synapse.storage._base import SQLBaseStore +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) +from synapse.types import RoomStreamToken + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class BeeperStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.database = database + + async def beeper_preview_event_for_room_id_and_user_id( + self, room_id: str, user_id: str, to_key: RoomStreamToken + ) -> Optional[Tuple[str, int]]: + def beeper_preview_txn(txn: LoggingTransaction) -> Optional[Tuple[str, int]]: + sql = """ + WITH latest_event AS ( + SELECT e.event_id, e.origin_server_ts + FROM events AS e + LEFT JOIN redactions as r + ON e.event_id = r.redacts + -- Look to see if this event itself is an edit, as we don't want to + -- use edits ever as the "latest event" + LEFT JOIN event_relations as is_edit + ON e.event_id = is_edit.event_id AND is_edit.relation_type = 'm.replace' + WHERE + e.stream_ordering <= ? + AND e.room_id = ? + AND is_edit.event_id IS NULL + AND r.redacts IS NULL + AND e.type IN ( + 'm.room.message', + 'm.room.encrypted', + 'm.reaction', + 'm.sticker' + ) + AND CASE + -- Only find non-redacted reactions to our own messages + WHEN (e.type = 'm.reaction') THEN ( + SELECT ? = ee.sender AND ee.event_id NOT IN ( + SELECT redacts FROM redactions WHERE redacts = ee.event_id + ) FROM events as ee + WHERE ee.event_id = ( + SELECT eer.relates_to_id FROM event_relations AS eer + WHERE eer.event_id = e.event_id + ) + ) + ELSE (true) END + ORDER BY e.stream_ordering DESC + LIMIT 1 + ), + latest_edit_for_latest_event AS ( + SELECT e.event_id, e_replacement.event_id as replacement_event_id + FROM latest_event e + -- Find any events that edit this event, as we'll want to use the new content from + -- the edit as the preview + LEFT JOIN event_relations as er + ON e.event_id = er.relates_to_id AND er.relation_type = 'm.replace' + LEFT JOIN events as e_replacement + ON er.event_id = e_replacement.event_id + ORDER BY e_replacement.origin_server_ts DESC + LIMIT 1 + ) + SELECT COALESCE(lefle.replacement_event_id, le.event_id), le.origin_server_ts + FROM latest_event le + LEFT JOIN latest_edit_for_latest_event lefle ON le.event_id = lefle.event_id + """ + + txn.execute( + sql, + ( + to_key.stream, + room_id, + user_id, + ), + ) + + return cast(Optional[Tuple[str, int]], txn.fetchone()) + + return await self.db_pool.runInteraction( + "beeper_preview_for_room_id_and_user_id", + beeper_preview_txn, + ) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 417a87feb2..c2af094a65 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -825,3 +825,348 @@ def test_incremental_sync(self) -> None: self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["join"]) self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"]) + + +class BeeperRoomPreviewTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + read_marker.register_servlets, + room.register_servlets, + sync.register_servlets, + receipts.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.url = "/sync?beeper_previews=true&since=%s" + self.next_batches = {} + + # Register the first user (used to check the unread counts). + self.user_id = self.register_user("kermit", "monkey") + self.tok = self.login("kermit", "monkey") + self.next_batches[self.tok] = "s0" + + # Create the room we'll check unread counts for. + self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok) + self.room_id_2 = self.helper.create_room_as(self.user_id, tok=self.tok) + self.room_id_3 = self.helper.create_room_as(self.user_id, tok=self.tok) + self.room_id_4 = self.helper.create_room_as(self.user_id, tok=self.tok) + + # Register the second user (used to send events to the room). + self.user2 = self.register_user("kermit2", "monkey") + self.tok2 = self.login("kermit2", "monkey") + self.next_batches[self.tok2] = "s0" + + # Change the power levels of the room so that the second user can send state + # events. + self.helper.send_state( + self.room_id, + EventTypes.PowerLevels, + { + "users": {self.user_id: 100, self.user2: 100}, + "users_default": 0, + "events": { + "m.room.name": 50, + "m.room.power_levels": 100, + "m.room.history_visibility": 100, + "m.room.canonical_alias": 50, + "m.room.avatar": 50, + "m.room.tombstone": 100, + "m.room.server_acl": 100, + "m.room.encryption": 100, + }, + "events_default": 0, + "state_default": 50, + "ban": 50, + "kick": 50, + "redact": 50, + "invite": 0, + }, + tok=self.tok, + ) + + def _check_preview_event_ids(self, auth_token: str, expected: dict) -> None: + """Checks the populated preview value against the expected value provided""" + + channel = self.make_request( + "GET", + self.url % self.next_batches[auth_token], + access_token=auth_token, + ) + + self.assertEqual(channel.code, 200, channel.json_body) + + for room_id, expected_entry in expected.items(): + room_entry = ( + channel.json_body.get("rooms", {}).get("join", {}).get(room_id, {}) + ) + + preview = room_entry.get("com.beeper.inbox.preview") + if preview: + preview_id = preview.get("event_id") + self.assertEqual( + preview_id, + expected_entry, + room_entry, + ) + else: + self.assertIsNone(expected_entry, room_entry) + + # Store the next batch for the next request. + self.next_batches[auth_token] = channel.json_body["next_batch"] + + def _redact_event( + self, + access_token: str, + room_id: str, + event_id: str, + expect_code: int = 200, + with_relations: Optional[List[str]] = None, + ) -> JsonDict: + """Helper function to send a redaction event. + + Returns the json body. + """ + path = "/_matrix/client/r0/rooms/%s/redact/%s" % (room_id, event_id) + + request_content = {} + if with_relations: + request_content["org.matrix.msc3912.with_relations"] = with_relations + + channel = self.make_request( + "POST", path, request_content, access_token=access_token + ) + self.assertEqual(channel.code, expect_code) + return channel.json_body + + def test_room_previews(self) -> None: + """Tests that /sync returns all room previews on first sync.""" + + # Multiple events in rooms for first sync. + self.helper.join(room=self.room_id, user=self.user2, tok=self.tok2) + self.helper.join(room=self.room_id_2, user=self.user2, tok=self.tok2) + self.helper.join(room=self.room_id_3, user=self.user2, tok=self.tok2) + self.helper.join(room=self.room_id_4, user=self.user2, tok=self.tok2) + + send_body = self.helper.send(self.room_id, "hello", tok=self.tok2) + send_body2 = self.helper.send(self.room_id_2, "hello 2", tok=self.tok2) + send_body3 = self.helper.send(self.room_id_3, "hello 3", tok=self.tok2) + send_body4 = self.helper.send(self.room_id_4, "hello 4", tok=self.tok2) + + # Should have previews for all rooms on first sync. + self._check_preview_event_ids( + auth_token=self.tok, + expected={ + self.room_id: send_body["event_id"], + self.room_id_2: send_body2["event_id"], + self.room_id_3: send_body3["event_id"], + self.room_id_4: send_body4["event_id"], + }, + ) + + # Subsequent - update preview for only room 2" + send_body5 = self.helper.send(self.room_id_2, "Sup!", tok=self.tok2) + + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id_2: send_body5["event_id"]} + ) + + def test_room_preview(self) -> None: + """Tests that /sync returns a room preview with the latest message for room.""" + + # One user says hello. + # Check that a message we send returns a preview in the room (i.e. have multiple clients?) + send_body = self.helper.send(self.room_id, "hello", tok=self.tok) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: send_body["event_id"]} + ) + + # Join new user. Should not show updated preview. + self.helper.join(room=self.room_id, user=self.user2, tok=self.tok2) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: send_body["event_id"]} + ) + + # Second user says hello + # Check that the new user sending a message updates our preview + send_2_body = self.helper.send(self.room_id, "hello again!", tok=self.tok2) + self._check_preview_event_ids(self.tok, {self.room_id: send_2_body["event_id"]}) + + # Encrypted messages 1 + # Beeper: ensure encrypted messages are treated the same. + enc_1_body = self.helper.send_event( + self.room_id, EventTypes.Encrypted, {}, tok=self.tok2 + ) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: enc_1_body["event_id"]} + ) + + # Encrypted messages 2 + enc_2_body = self.helper.send_event( + self.room_id, EventTypes.Encrypted, {}, tok=self.tok2 + ) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: enc_2_body["event_id"]} + ) + + # Redact encrypted message 2 + self._redact_event(self.tok2, self.room_id, enc_2_body["event_id"]) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: enc_1_body["event_id"]} + ) + + # User 2 react to user 1 message + # Someone else reacted to my message, update preview. + reaction_1 = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Reaction, + content={ + "m.relates_to": { + "rel_type": RelationTypes.ANNOTATION, + "event_id": send_body["event_id"], + "key": "👍", + } + }, + tok=self.tok2, + ) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: reaction_1["event_id"]} + ) + + # User 1 react to User 2 message. + # Not a reaction to my message, don't update preview. + reaction_2 = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Reaction, + content={ + "m.relates_to": { + "rel_type": RelationTypes.ANNOTATION, + "event_id": send_2_body["event_id"], + "key": "👍", + } + }, + tok=self.tok, + ) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: reaction_1["event_id"]} + ) + self._check_preview_event_ids( + auth_token=self.tok2, expected={self.room_id: reaction_2["event_id"]} + ) + + # Redact user 2 message with reactions. + # Remove redactions as well as reactions from user 2's preview. + self._redact_event(self.tok2, self.room_id, send_2_body["event_id"]) + + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: reaction_1["event_id"]} + ) + self._check_preview_event_ids( + auth_token=self.tok2, expected={self.room_id: enc_1_body["event_id"]} + ) + + def test_room_preview_edits(self) -> None: + """Tests that /sync returns a room preview with the latest message for room.""" + + # One user says hello. + # Check that a message we send returns a preview in the room (i.e. have multiple clients?) + send_body = self.helper.send(self.room_id, "hello", tok=self.tok) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: send_body["event_id"]} + ) + + # Join new user. Should not show updated preview. + self.helper.join(room=self.room_id, user=self.user2, tok=self.tok2) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: send_body["event_id"]} + ) + + # Second user says hello + # Check that the new user sending a message updates our preview + send_2_body = self.helper.send(self.room_id, "hello again!", tok=self.tok2) + self._check_preview_event_ids(self.tok, {self.room_id: send_2_body["event_id"]}) + + # First user edits their old message + # Check that this doesn't alter the preview + self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "body": "hello edit", + "msgtype": "m.text", + "m.relates_to": { + "rel_type": RelationTypes.REPLACE, + "event_id": send_body["event_id"], + }, + }, + tok=self.tok, + ) + self._check_preview_event_ids(self.tok, {self.room_id: send_2_body["event_id"]}) + + # Now second user edits their (currently preview) message + # Check that this does become the preview + send_3_body = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "body": "hello edit", + "msgtype": "m.text", + "m.relates_to": { + "rel_type": RelationTypes.REPLACE, + "event_id": send_2_body["event_id"], + }, + }, + tok=self.tok2, + ) + self._check_preview_event_ids(self.tok, {self.room_id: send_3_body["event_id"]}) + + # Now second user edits their (currently preview) message again + # Check that this does become the preview, over the previous edit + send_4_body = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "body": "hello edit 2", + "msgtype": "m.text", + "m.relates_to": { + "rel_type": RelationTypes.REPLACE, + "event_id": send_2_body["event_id"], + }, + }, + tok=self.tok2, + ) + self._check_preview_event_ids(self.tok, {self.room_id: send_4_body["event_id"]}) + + # Finally, first user sends a message and this should become the preview + send_5_body = self.helper.send(self.room_id, "hello", tok=self.tok) + self._check_preview_event_ids( + auth_token=self.tok, expected={self.room_id: send_5_body["event_id"]} + ) + + def test_room_preview_no_change(self) -> None: + """Tests that /sync only includes previews when we have new events.""" + + self.helper.join(room=self.room_id, user=self.user_id, tok=self.tok) + + send_body = self.helper.send(self.room_id, "hello", tok=self.tok) + + # Should have preview on first sync + self._check_preview_event_ids( + auth_token=self.tok, + expected={self.room_id: send_body["event_id"]}, + ) + + # Should have no preview on second sync (no timeline changes) + self._check_preview_event_ids( + auth_token=self.tok, + expected={self.room_id: None}, + ) + + # Send a join event, this isn't previewed but will be in the timeline + self.helper.join(room=self.room_id, user=self.user2, tok=self.tok2) + + # Should have preview because we have timeline, but preview is unchanged + self._check_preview_event_ids( + auth_token=self.tok, + expected={self.room_id: send_body["event_id"]}, + ) From 64f23de355c616d1adcfe4960f5297af81857d66 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 16:59:50 +0000 Subject: [PATCH 08/29] Add Beeper inbox endpoints --- synapse/api/constants.py | 1 + synapse/rest/client/account_data.py | 102 +++++++++++++++++++ synapse/rest/client/read_marker.py | 15 ++- tests/rest/client/test_account_data.py | 135 +++++++++++++++++++++++++ 4 files changed, 249 insertions(+), 4 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 52982e2d15..ed9dc260ca 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -270,6 +270,7 @@ class ReceiptTypes: READ: Final = "m.read" READ_PRIVATE: Final = "m.read.private" FULLY_READ: Final = "m.fully_read" + BEEPER_INBOX_DONE: Final = "com.beeper.inbox.done" class PublicRoomsFilterFields: diff --git a/synapse/rest/client/account_data.py b/synapse/rest/client/account_data.py index 0ee24081fa..a51a5ea08a 100644 --- a/synapse/rest/client/account_data.py +++ b/synapse/rest/client/account_data.py @@ -19,6 +19,7 @@ # # +import json import logging from typing import TYPE_CHECKING, Optional, Tuple @@ -27,6 +28,7 @@ from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseRequest +from synapse.rest.client.read_marker import ReadMarkerRestServlet from synapse.types import JsonDict, JsonMapping, RoomID from ._base import client_patterns @@ -310,9 +312,109 @@ async def on_DELETE( return 200, {} +class RoomBeeperInboxStateServlet(RestServlet): + """ + PUT /user/{user_id}/rooms/{room_id}/beeper_inbox_state HTTP/1.1 + """ + + PATTERNS = list( + client_patterns( + "/com.beeper.inbox/user/(?P[^/]*)/rooms/(?P[^/]*)/inbox_state", + releases=(), # not in the matrix spec, only include under /unstable + ) + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastores().main + self.handler = hs.get_account_data_handler() + self.read_marker_client = ReadMarkerRestServlet(hs) + + async def on_PUT( + self, request: SynapseRequest, user_id: str, room_id: str + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + if user_id != requester.user.to_string(): + raise AuthError(403, "Cannot add beeper inbox state for other users.") + + if not RoomID.is_valid(room_id): + raise SynapseError( + 400, + f"{room_id} is not a valid room ID", + Codes.INVALID_PARAM, + ) + + ts = self.clock.time_msec() + + body = parse_json_object_from_request(request) + + if "done" in body: + delta_ms = body["done"].get("at_delta") or 0 + done = {"updated_ts": ts, "at_ts": ts + delta_ms} + await self.handler.add_account_data_to_room( + user_id, room_id, "com.beeper.inbox.done", done + ) + logger.info(f"SetBeeperDone done_delta_ms={delta_ms}") + + if "marked_unread" in body: + marked_unread = {"unread": body["marked_unread"], "ts": ts} + await self.handler.add_account_data_to_room( + user_id, room_id, "m.marked_unread", marked_unread + ) + logger.info(f"SetBeeperMarkedUnread marked_unread={body['marked_unread']}") + + if "read_markers" in body: + await self.read_marker_client.handle_read_marker( + room_id, body["read_markers"], requester + ) + logger.info( + f"SetBeeperReadMarkers read_markers={json.dumps(body['read_markers'])}" + ) + + return 200, {} + + +class BeeperInboxBatchArchiveServlet(RestServlet): + """ + PUT /com.beeper.inbox/batch_archive HTTP/1.1 + """ + + PATTERNS = list( + client_patterns( + "/com.beeper.inbox/batch_archive", + releases=(), # not in the matrix spec, only include under /unstable + ) + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.store = hs.get_datastores().main + self.handler = hs.get_account_data_handler() + + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + ts = self.clock.time_msec() + body = parse_json_object_from_request(request) + + done = {"updated_ts": ts, "at_ts": ts} + for room_id in body["room_ids"]: + # TODO in transaction + await self.handler.add_account_data_to_room( + requester.user.to_string(), room_id, "com.beeper.inbox.done", done + ) + + return 200, {} + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: AccountDataServlet(hs).register(http_server) RoomAccountDataServlet(hs).register(http_server) + RoomBeeperInboxStateServlet(hs).register(http_server) + BeeperInboxBatchArchiveServlet(hs).register(http_server) if hs.config.experimental.msc3391_enabled: UnstableAccountDataServlet(hs).register(http_server) diff --git a/synapse/rest/client/read_marker.py b/synapse/rest/client/read_marker.py index d3d3c7c41d..bde5d94483 100644 --- a/synapse/rest/client/read_marker.py +++ b/synapse/rest/client/read_marker.py @@ -26,7 +26,7 @@ from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseRequest -from synapse.types import JsonDict +from synapse.types import JsonDict, Requester from ._base import client_patterns @@ -55,16 +55,23 @@ def __init__(self, hs: "HomeServer"): } async def on_POST( - self, request: SynapseRequest, room_id: str + self, + request: SynapseRequest, + room_id: str, ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) + body = parse_json_object_from_request(request) + return await self.handle_read_marker(room_id, body, requester) + # Beeper: The endpoint and underlying method are separated here so `inbox_state` + # can use the same function. + async def handle_read_marker( + self, room_id: str, body: dict, requester: Requester + ) -> Tuple[int, JsonDict]: await self.presence_handler.bump_presence_active_time( requester.user, requester.device_id ) - body = parse_json_object_from_request(request) - unrecognized_types = set(body.keys()) - self._known_receipt_types if unrecognized_types: # It's fine if there are unrecognized receipt types, but let's log diff --git a/tests/rest/client/test_account_data.py b/tests/rest/client/test_account_data.py index be6d7af2fc..aed7f325c3 100644 --- a/tests/rest/client/test_account_data.py +++ b/tests/rest/client/test_account_data.py @@ -20,6 +20,7 @@ # from unittest.mock import AsyncMock +from synapse.api.constants import ReceiptTypes from synapse.rest import admin from synapse.rest.client import account_data, login, room @@ -79,3 +80,137 @@ def test_on_account_data_updated_callback(self) -> None: mocked_callback.assert_called_with( user_id, room_id, account_data_type, account_data_content ) + + def test_beeper_inbox_state_endpoint(self) -> None: + store = self.hs.get_datastores().main + + user_id = self.register_user("user", "password") + tok = self.login("user", "password") + + room_id = self.helper.create_room_as(user_id, tok=tok) + channel = self.make_request( + "PUT", + f"/_matrix/client/unstable/com.beeper.inbox/user/{user_id}/rooms/{room_id}/inbox_state", + {}, + access_token=tok, + ) + + self.assertEqual(channel.code, 200, channel.result) + self.assertIsNone( + self.get_success( + store.get_account_data_for_room_and_type( + user_id, room_id, "com.beeper.inbox.done" + ) + ) + ) + self.assertIsNone( + self.get_success( + store.get_account_data_for_room_and_type( + user_id, room_id, "m.marked_unread" + ) + ) + ) + + channel = self.make_request( + "PUT", + f"/_matrix/client/unstable/com.beeper.inbox/user/{user_id}/rooms/{room_id}/inbox_state", + {"done": {"at_delta": 1000 * 60 * 5}, "marked_unread": True}, + access_token=tok, + ) + + self.assertEqual(channel.code, 200, channel.result) + + # FIXME: I give up, I don't know how to mock time in tests + # ts = self.clock.time_msec() + ts = 500 + + done = self.get_success( + store.get_account_data_for_room_and_type( + user_id, room_id, "com.beeper.inbox.done" + ) + ) + assert done is not None + self.assertEqual(done["updated_ts"], ts) + self.assertEqual(done["at_ts"], ts + (1000 * 60 * 5)) + + marked_unread = self.get_success( + store.get_account_data_for_room_and_type( + user_id, room_id, "m.marked_unread" + ) + ) + assert marked_unread is not None + self.assertEqual(marked_unread["unread"], True) + self.assertEqual(marked_unread["ts"], ts) + + def test_beeper_inbox_state_endpoint_can_clear_unread(self) -> None: + store = self.hs.get_datastores().main + + user_id = self.register_user("user", "password") + tok = self.login("user", "password") + + room_id = self.helper.create_room_as(user_id, tok=tok) + channel = self.make_request( + "PUT", + f"/_matrix/client/unstable/com.beeper.inbox/user/{user_id}/rooms/{room_id}/inbox_state", + {"marked_unread": False}, + access_token=tok, + ) + + self.assertEqual(channel.code, 200, channel.result) + + # FIXME: I give up, I don't know how to mock time in tests + # ts = self.clock.time_msec() + ts = 400 + + self.assertEqual(channel.code, 200, channel.result) + self.assertIsNone( + self.get_success( + store.get_account_data_for_room_and_type( + user_id, room_id, "com.beeper.inbox.done" + ) + ) + ) + + marked_unread = self.get_success( + store.get_account_data_for_room_and_type( + user_id, room_id, "m.marked_unread" + ) + ) + assert marked_unread is not None + self.assertEqual(marked_unread["unread"], False) + self.assertEqual(marked_unread["ts"], ts) + + def test_beeper_inbox_state_endpoint_can_set_read_marker(self) -> None: + store = self.hs.get_datastores().main + + user_id = self.register_user("user", "password") + tok = self.login("user", "password") + + room_id = self.helper.create_room_as(user_id, tok=tok) + + res = self.helper.send(room_id, "hello", tok=tok) + + existing_read_marker = self.get_success( + store.get_account_data_for_room_and_type( + user_id, room_id, ReceiptTypes.FULLY_READ + ) + ) + + channel = self.make_request( + "PUT", + f"/_matrix/client/unstable/com.beeper.inbox/user/{user_id}/rooms/{room_id}/inbox_state", + { + "read_markers": { + ReceiptTypes.FULLY_READ: res["event_id"], + }, + }, + access_token=tok, + ) + self.assertEqual(channel.code, 200) + + new_read_marker = self.get_success( + store.get_account_data_for_room_and_type( + user_id, room_id, ReceiptTypes.FULLY_READ + ) + ) + self.assertNotEqual(existing_read_marker, new_read_marker) From 2817fc8ad41512f9e788abe4253a64d83befdd31 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:00:12 +0000 Subject: [PATCH 09/29] Remove any bridge bot users from ignored user account data --- synapse/storage/databases/main/account_data.py | 17 +++++++++++++++++ tests/storage/test_account_data.py | 16 ++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 563450a97e..d816867583 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -20,6 +20,7 @@ # import logging +import re from typing import ( TYPE_CHECKING, Any, @@ -59,6 +60,10 @@ logger = logging.getLogger(__name__) +# Regex pattern for detecting a bridge bot (cached here for performance) +SYNAPSE_BOT_PATTERN = re.compile(r"^@_.*_bot\:*") +HUNGRYSERV_BOT_PATTERN = re.compile(r"^@[a-z]+bot\:beeper.local") + class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore): def __init__( @@ -701,6 +706,18 @@ def _add_account_data_for_user( ) -> None: content_json = json_encoder.encode(content) + # If we're ignoring users, silently filter out any bots that may be ignored + if account_data_type == AccountDataTypes.IGNORED_USER_LIST: + ignored_users = content.get("ignored_users", {}) + if isinstance(ignored_users, dict): + content["ignored_users"] = { + u: v + for u, v in ignored_users.items() + if not ( + SYNAPSE_BOT_PATTERN.match(u) or HUNGRYSERV_BOT_PATTERN.match(u) + ) + } + self.db_pool.simple_upsert_txn( txn, table="account_data", diff --git a/tests/storage/test_account_data.py b/tests/storage/test_account_data.py index 2859bcf4bd..d56516641c 100644 --- a/tests/storage/test_account_data.py +++ b/tests/storage/test_account_data.py @@ -93,6 +93,22 @@ def test_ignoring_users(self) -> None: # Check the removed user. self.assert_ignorers("@another:remote", {self.user}) + def test_ignoring_bot_users(self) -> None: + self._update_ignore_list("@other:test", "@another:remote") + self.assert_ignored(self.user, {"@other:test", "@another:remote"}) + + self._update_ignore_list("@other:test", "@another:remote", "@_other_bot:test") + self.assert_ignored(self.user, {"@other:test", "@another:remote"}) + + self._update_ignore_list("@iamnotabot:beeper.com") + self.assert_ignored(self.user, {"@iamnotabot:beeper.com"}) + + self._update_ignore_list("@_other_bot:beeper.com") + self.assert_ignored(self.user, set()) + + self._update_ignore_list("@whatsappbot:beeper.local") + self.assert_ignored(self.user, set()) + def test_caching(self) -> None: """Ensure that caching works properly between different users.""" # The first user ignores a user. From a8f94925a66c857759fec982351cdf66434d2b8d Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 12 Mar 2024 17:49:31 +0000 Subject: [PATCH 10/29] Add Beeper notification counts --- synapse/config/experimental.py | 5 + synapse/handlers/room.py | 3 + synapse/push/bulk_push_rule_evaluator.py | 71 +++---- synapse/storage/databases/main/beeper.py | 190 +++++++++++++++++- .../databases/main/event_push_actions.py | 4 +- synapse/storage/databases/main/events.py | 2 + synapse/storage/databases/main/receipts.py | 5 + .../73/99_beeper_user_notification_counts.sql | 19 ++ tests/push/test_bulk_push_rule_evaluator.py | 2 +- tests/rest/client/test_sync.py | 14 +- 10 files changed, 274 insertions(+), 41 deletions(-) create mode 100644 synapse/storage/schema/main/delta/73/99_beeper_user_notification_counts.sql diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index fcc78d2d81..152e22d719 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -411,3 +411,8 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.msc4069_profile_inhibit_propagation = experimental.get( "msc4069_profile_inhibit_propagation", False ) + + self.beeper_user_notification_counts_enabled = experimental.get( + "beeper_user_notification_counts_enabled", + False, + ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index fbe6becf8f..f5fa0c0723 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -374,6 +374,9 @@ async def _upgrade_room( old_room_state, ) + # Beeper: clear out any push actions and summaries for this room + await self.store.beeper_cleanup_tombstoned_room(old_room_id) + return new_room_id async def _update_upgraded_room_pls( diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 34ab637c3d..dda9cd785f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -86,36 +86,36 @@ SENTINEL = object() -def _should_count_as_unread(event: EventBase, context: EventContext) -> bool: +def _should_count_as_unread( + event: EventBase, + context: EventContext, + non_bot_room_members_count: int, + current_user: str, + related_events: Dict[str, Dict[str, Any]], +) -> bool: # Exclude rejected and soft-failed events. if context.rejected or event.internal_metadata.is_soft_failed(): return False - # Exclude notices. - if ( - not event.is_state() - and event.type == EventTypes.Message - and event.content.get("msgtype") == "m.notice" - ): - return False - # Exclude edits. relates_to = relation_from_event(event) if relates_to and relates_to.rel_type == RelationTypes.REPLACE: return False - # Mark events that have a non-empty string body as unread. - body = event.content.get("body") - if isinstance(body, str) and body: - return True - - # Mark some state events as unread. - if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD: - return True - - # Mark encrypted events as unread. - if not event.is_state() and event.type == EventTypes.Encrypted: - return True + # Mark encrypted and plain text messages events as unread. + if not event.is_state(): + if event.type == EventTypes.Encrypted: + return True + elif event.type == EventTypes.Message: + body = event.content.get("body") + return isinstance(body, str) and bool(body) + # Beeper: We want reactions to only count as unread if they're reactions to the current user in rooms that + # have fewer than 20 users. + elif event.type == "m.reaction" and related_events.get("m.annotation"): + return ( + related_events["m.annotation"]["sender"] == current_user + and non_bot_room_members_count < 20 + ) return False @@ -345,15 +345,9 @@ async def _action_for_event_by_user( # (historical messages persisted in reverse-chronological order). return - # Disable counting as unread unless the experimental configuration is - # enabled, as it can cause additional (unwanted) rows to be added to the - # event_push_actions table. - count_as_unread = False - if self.hs.config.experimental.msc2654_enabled: - count_as_unread = _should_count_as_unread(event, context) - rules_by_user = await self._get_rules_for_event(event) actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {} + count_as_unread_by_user: Dict[str, bool] = {} # Gather a bunch of info in parallel. # @@ -454,12 +448,19 @@ async def _action_for_event_by_user( if not isinstance(display_name, str): display_name = None - if count_as_unread: - # Add an element for the current user if the event needs to be marked as - # unread, so that add_push_actions_to_staging iterates over it. - # If the event shouldn't be marked as unread but should notify the - # current user, it'll be added to the dict later. - actions_by_user[uid] = [] + # Beeper: Need to calculate this per user as whether it should count as unread or not + # depends on who the current user is. + if self.hs.config.experimental.msc2654_enabled: + count_as_unread_by_user[uid] = _should_count_as_unread( + event, context, room_member_count, uid, related_events + ) + + if count_as_unread_by_user[uid]: + # Add an element for the current user if the event needs to be marked as + # unread, so that add_push_actions_to_staging iterates over it. + # If the event shouldn't be marked as unread but should notify the + # current user, it'll be added to the dict later. + actions_by_user[uid] = [] actions = evaluator.run(rules, uid, display_name) if "notify" in actions: @@ -492,7 +493,7 @@ async def _action_for_event_by_user( await self.store.add_push_actions_to_staging( event.event_id, actions_by_user, - count_as_unread, + count_as_unread_by_user, thread_id, ) diff --git a/synapse/storage/databases/main/beeper.py b/synapse/storage/databases/main/beeper.py index 8495390cd2..68e6d020f9 100644 --- a/synapse/storage/databases/main/beeper.py +++ b/synapse/storage/databases/main/beeper.py @@ -1,8 +1,10 @@ # Beep beep! import logging -from typing import TYPE_CHECKING, Optional, Tuple, cast +from typing import TYPE_CHECKING, List, Optional, Tuple, cast +from synapse.events import EventBase +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -28,6 +30,19 @@ def __init__( self.database = database + self.user_notification_counts_enabled: bool = ( + hs.config.experimental.beeper_user_notification_counts_enabled + ) + + if ( + self.user_notification_counts_enabled + and hs.config.worker.run_background_tasks + ): + self.aggregate_notification_counts_loop = self._clock.looping_call( + self.beeper_aggregate_notification_counts, 30 * 1000 + ) + self.is_aggregating_notification_counts = False + async def beeper_preview_event_for_room_id_and_user_id( self, room_id: str, user_id: str, to_key: RoomStreamToken ) -> Optional[Tuple[str, int]]: @@ -100,3 +115,176 @@ def beeper_preview_txn(txn: LoggingTransaction) -> Optional[Tuple[str, int]]: "beeper_preview_for_room_id_and_user_id", beeper_preview_txn, ) + + async def beeper_cleanup_tombstoned_room(self, room_id: str) -> None: + def beeper_cleanup_tombstoned_room_txn(txn: LoggingTransaction) -> None: + self.db_pool.simple_delete_txn( + txn, table="event_push_actions", keyvalues={"room_id": room_id} + ) + self.db_pool.simple_delete_txn( + txn, table="event_push_summary", keyvalues={"room_id": room_id} + ) + + await self.db_pool.runInteraction( + "beeper_cleanup_tombstoned_room", + beeper_cleanup_tombstoned_room_txn, + ) + + def beeper_add_notification_counts_txn( + self, + txn: LoggingTransaction, + notifiable_events: List[EventBase], + ) -> None: + if not self.user_notification_counts_enabled: + return + + sql = """ + INSERT INTO beeper_user_notification_counts ( + room_id, event_stream_ordering, + user_id, thread_id, notifs, unreads, highlights + ) + SELECT ?, ?, user_id, thread_id, notif, unread, highlight + FROM event_push_actions_staging + WHERE event_id = ? + """ + + txn.execute_batch( + sql, + ( + ( + event.room_id, + event.internal_metadata.stream_ordering, + event.event_id, + ) + for event in notifiable_events + ), + ) + + def beeper_clear_notification_counts_txn( + self, + txn: LoggingTransaction, + user_id: str, + room_id: str, + stream_ordering: int, + ) -> None: + if not self.user_notification_counts_enabled: + return + + sql = """ + DELETE FROM beeper_user_notification_counts + WHERE + user_id = ? + AND room_id = ? + AND event_stream_ordering <= ? + """ + + txn.execute(sql, (user_id, room_id, stream_ordering)) + + @wrap_as_background_process("beeper_aggregate_notification_counts") + async def beeper_aggregate_notification_counts(self) -> None: + if not self.user_notification_counts_enabled: + return + + def aggregate_txn(txn: LoggingTransaction, limit: int) -> int: + sql = """ + WITH recent_rows AS ( -- Aggregate the tables, flag aggregated rows for deletion + SELECT + user_id, + room_id + FROM + beeper_user_notification_counts + WHERE + event_stream_ordering > ( + SELECT event_stream_ordering FROM beeper_user_notification_counts_stream_ordering + ) + AND event_stream_ordering < ( + -- Select highest stream ordering from events over one hour, + -- this is to avoid serialization issues with the most + -- recent events/receipts + SELECT stream_ordering FROM events + WHERE origin_server_ts < ( + (EXTRACT(EPOCH from NOW()) - 3600) * 1000 + ) + ORDER BY stream_ordering DESC + LIMIT 1 + ) + -- Oldest first, to reduce serialization issues + ORDER BY event_stream_ordering ASC + LIMIT {limit} + ) + UPDATE + beeper_user_notification_counts AS epc + SET + unreads = CASE WHEN epc.event_stream_ordering = agg.max_eso THEN agg.unreads ELSE 0 END, + notifs = CASE WHEN epc.event_stream_ordering = agg.max_eso THEN agg.notifs ELSE 0 END, + highlights = CASE WHEN epc.event_stream_ordering = agg.max_eso THEN agg.highlights ELSE 0 END, + aggregated = epc.event_stream_ordering != agg.max_eso + FROM ( + SELECT + user_id, + room_id, + SUM(unreads) AS unreads, + SUM(notifs) AS notifs, + SUM(highlights) AS highlights, + MAX(event_stream_ordering) AS max_eso + FROM + beeper_user_notification_counts + WHERE + user_id IN(SELECT user_id FROM recent_rows) + AND room_id IN(SELECT room_id FROM recent_rows) + GROUP BY + user_id, + room_id + ) AS agg + WHERE + epc.room_id = agg.room_id + AND epc.user_id = agg.user_id + RETURNING + event_stream_ordering; + """.format( + limit=limit + ) + + txn.execute(sql) + orders = list(txn) + if not orders: + logger.info("No user counts aggregated") + return 0 + + max_stream_ordering = max(orders) + txn.execute( + """ + UPDATE beeper_user_notification_counts_stream_ordering + SET event_stream_ordering = ? + """, + (max_stream_ordering,), + ) + txn.execute("DELETE FROM beeper_user_notification_counts WHERE aggregated") + + logger.info(f"Aggregated {len(orders)} notification count rows") + + return txn.rowcount + + if self.is_aggregating_notification_counts: + return + + self.is_aggregating_notification_counts = True + limit = 1000 + + try: + logger.info("Aggregating notification counts") + + last_batch = limit + 1 + while last_batch > limit: + last_batch = await self.db_pool.runInteraction( + "beeper_aggregate_notification_counts", + aggregate_txn, + limit=limit, + ) + await self._clock.sleep(1.0) + + except self.database.engine.module.OperationalError: + logger.exception("Failed to aggregate notifications") + + finally: + self.is_aggregating_notification_counts = False diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 40bf000e9c..1668c44bb4 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1165,7 +1165,7 @@ async def add_push_actions_to_staging( self, event_id: str, user_id_actions: Dict[str, Collection[Union[Mapping, str]]], - count_as_unread: bool, + count_as_unread_by_user: Dict[str, bool], thread_id: str, ) -> None: """Add the push actions for the event to the push action staging area. @@ -1193,7 +1193,7 @@ def _gen_entry( _serialize_action(actions, bool(is_highlight)), # actions column notif, # notif column is_highlight, # highlight column - int(count_as_unread), # unread column + int(count_as_unread_by_user.get(user_id, 0)), # unread column thread_id, # thread_id column self._clock.time_msec(), # inserted_ts column ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a6fda3f43c..706238b8ad 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2134,6 +2134,8 @@ def _set_push_actions_for_event_and_users_txn( ), ) + self.store.beeper_add_notification_counts_txn(txn, notifiable_events) + # Now we delete the staging area for *all* events that were being # persisted. txn.execute_batch( diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index d513c42530..69dfe4dac8 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -793,6 +793,11 @@ def _insert_linearized_receipt_txn( where_clause=where_clause, ) + if self.hs.is_mine_id(user_id): + self.beeper_clear_notification_counts_txn( # type: ignore[attr-defined] + txn, user_id, room_id, stream_ordering + ) + return rx_ts def _graph_to_linear( diff --git a/synapse/storage/schema/main/delta/73/99_beeper_user_notification_counts.sql b/synapse/storage/schema/main/delta/73/99_beeper_user_notification_counts.sql new file mode 100644 index 0000000000..674cc4d3ec --- /dev/null +++ b/synapse/storage/schema/main/delta/73/99_beeper_user_notification_counts.sql @@ -0,0 +1,19 @@ +CREATE TABLE beeper_user_notification_counts ( + user_id TEXT, + room_id TEXT, + thread_id TEXT, + event_stream_ordering BIGINT, + notifs BIGINT, + unreads BIGINT, + highlights BIGINT, + aggregated BOOLEAN, + UNIQUE (user_id, room_id, thread_id, event_stream_ordering) +); + +CREATE TABLE beeper_user_notification_counts_stream_ordering ( + lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + event_stream_ordering BIGINT NOT NULL, + CHECK (lock='X') +); + +INSERT INTO beeper_user_notification_counts_stream_ordering (event_stream_ordering) VALUES (0); diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py index fc73f3dc2a..9d9c8a7a0b 100644 --- a/tests/push/test_bulk_push_rule_evaluator.py +++ b/tests/push/test_bulk_push_rule_evaluator.py @@ -429,7 +429,7 @@ def test_suppress_edits(self) -> None: ) # An edit which is a mention will cause a notification. - self.assertTrue( + self.assertFalse( # Beeper: changed from true per our base rule changes self._create_and_process( bulk_evaluator, { diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index c2af094a65..17328e1509 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -19,7 +19,7 @@ # # import json -from typing import List +from typing import List, Optional from parameterized import parameterized @@ -483,6 +483,9 @@ def test_unread_counts(self) -> None: # Check that the unread counter is back to 0. self._check_unread_count(0) + # Beeper: we don't count name as unread, so send this to increase the counter + self.helper.send_event(self.room_id, EventTypes.Encrypted, {}, tok=self.tok2) + # Check that room name changes increase the unread counter. self.helper.send_state( self.room_id, @@ -492,6 +495,9 @@ def test_unread_counts(self) -> None: ) self._check_unread_count(1) + # Beeper: we don't count topic as unread, so send this to increase the counter + self.helper.send_event(self.room_id, EventTypes.Encrypted, {}, tok=self.tok2) + # Check that room topic changes increase the unread counter. self.helper.send_state( self.room_id, @@ -505,6 +511,10 @@ def test_unread_counts(self) -> None: self.helper.send_event(self.room_id, EventTypes.Encrypted, {}, tok=self.tok2) self._check_unread_count(3) + # Beeper: fake event to bump event count, we don't count custom events + # as unread currently. + self.helper.send_event(self.room_id, EventTypes.Encrypted, {}, tok=self.tok2) + # Check that custom events with a body increase the unread counter. result = self.helper.send_event( self.room_id, @@ -538,7 +548,7 @@ def test_unread_counts(self) -> None: content={"body": "hello", "msgtype": "m.notice"}, tok=self.tok2, ) - self._check_unread_count(4) + self._check_unread_count(5) # Beep: notices count as unread # Check that tombstone events changes increase the unread counter. res1 = self.helper.send_state( From 07b65271f185a59c3059e591804e3e9de0ba0373 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:01:27 +0000 Subject: [PATCH 11/29] Include stream ordering in the unsigned event dict --- synapse/events/utils.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index e0613d0dbc..2734409b4b 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -485,6 +485,11 @@ def serialize_event( ): d["unsigned"]["transaction_id"] = txn_id + # Beeper: include internal stream ordering as HS order unsigned hint + stream_ordering = getattr(e.internal_metadata, "stream_ordering", None) + if stream_ordering: + d["unsigned"]["com.beeper.hs.order"] = stream_ordering + # invite_room_state and knock_room_state are a list of stripped room state events # that are meant to provide metadata about a room to an invitee/knocker. They are # intended to only be included in specific circumstances, such as down sync, and From d78b137b8f2d29a67974c137a551616d826f858d Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:01:56 +0000 Subject: [PATCH 12/29] Allow clients to add extra content to receipts --- synapse/handlers/read_marker.py | 11 ++++++++--- synapse/handlers/receipts.py | 3 ++- synapse/rest/client/notifications.py | 1 + synapse/rest/client/read_marker.py | 2 ++ synapse/rest/client/receipts.py | 5 ++++- 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index fb39c8e04b..1df418d3b2 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -20,10 +20,11 @@ # import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from synapse.api.constants import ReceiptTypes from synapse.api.errors import SynapseError +from synapse.types import JsonDict from synapse.util.async_helpers import Linearizer if TYPE_CHECKING: @@ -39,7 +40,11 @@ def __init__(self, hs: "HomeServer"): self.read_marker_linearizer = Linearizer(name="read_marker") async def received_client_read_marker( - self, room_id: str, user_id: str, event_id: str + self, + room_id: str, + user_id: str, + event_id: str, + extra_content: Optional[JsonDict] = None, ) -> None: """Updates the read marker for a given user in a given room if the event ID given is ahead in the stream relative to the current read marker. @@ -71,7 +76,7 @@ async def received_client_read_marker( should_update = event_ordering > old_event_ordering if should_update: - content = {"event_id": event_id} + content = content = {"event_id": event_id, **(extra_content or {})} await self.account_data_handler.add_account_data_to_room( user_id, room_id, ReceiptTypes.FULLY_READ, content ) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 8674a8fcdd..cc2f49efde 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -181,6 +181,7 @@ async def received_client_receipt( user_id: UserID, event_id: str, thread_id: Optional[str], + extra_content: Optional[JsonDict] = None, ) -> None: """Called when a client tells us a local user has read up to the given event_id in the room. @@ -197,7 +198,7 @@ async def received_client_receipt( user_id=user_id.to_string(), event_ids=[event_id], thread_id=thread_id, - data={"ts": int(self.clock.time_msec())}, + data={"ts": int(self.clock.time_msec()), **(extra_content or {})}, ) is_new = await self._handle_new_receipts([receipt]) diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py index be9b584748..de529bdd54 100644 --- a/synapse/rest/client/notifications.py +++ b/synapse/rest/client/notifications.py @@ -71,6 +71,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: [ ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE, + ReceiptTypes.BEEPER_INBOX_DONE, ], ) diff --git a/synapse/rest/client/read_marker.py b/synapse/rest/client/read_marker.py index bde5d94483..e92903cd8f 100644 --- a/synapse/rest/client/read_marker.py +++ b/synapse/rest/client/read_marker.py @@ -93,6 +93,7 @@ async def handle_read_marker( room_id, user_id=requester.user.to_string(), event_id=event_id, + extra_content=body.get("com.beeper.fully_read.extra", None), ) else: await self.receipts_handler.received_client_receipt( @@ -102,6 +103,7 @@ async def handle_read_marker( event_id=event_id, # Setting the thread ID is not possible with the /read_markers endpoint. thread_id=None, + extra_content=body.get("com.beeper.read.extra", None), ) return 200, {} diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py index 89203dc45a..cb8b54e7f3 100644 --- a/synapse/rest/client/receipts.py +++ b/synapse/rest/client/receipts.py @@ -57,6 +57,7 @@ def __init__(self, hs: "HomeServer"): ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE, ReceiptTypes.FULLY_READ, + ReceiptTypes.BEEPER_INBOX_DONE, } async def on_POST( @@ -73,7 +74,7 @@ async def on_POST( f"Receipt type must be {', '.join(self._known_receipt_types)}", ) - body = parse_json_object_from_request(request) + body = parse_json_object_from_request(request, allow_empty_body=False) # Pull the thread ID, if one exists. thread_id = None @@ -110,6 +111,7 @@ async def on_POST( room_id, user_id=requester.user.to_string(), event_id=event_id, + extra_content=body, ) else: await self.receipts_handler.received_client_receipt( @@ -118,6 +120,7 @@ async def on_POST( user_id=requester.user, event_id=event_id, thread_id=thread_id, + extra_content=body, ) return 200, {} From 1da9744fb2b9ed5a5c59a7a98bf00afcc92a91d4 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:02:09 +0000 Subject: [PATCH 13/29] Include empty identity server in well known --- synapse/rest/well_known.py | 3 +++ tests/rest/test_well_known.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py index d0ca8ca46b..0a5e0b4d66 100644 --- a/synapse/rest/well_known.py +++ b/synapse/rest/well_known.py @@ -49,6 +49,9 @@ def get_well_known(self) -> Optional[JsonDict]: result["m.identity_server"] = { "base_url": self._config.registration.default_identity_server } + else: + # Workaround for iOS expecting some value here + result["m.identity_server"] = {"base_url": ""} # We use the MSC3861 values as they are used by multiple MSCs if self._config.experimental.msc3861.enabled: diff --git a/tests/rest/test_well_known.py b/tests/rest/test_well_known.py index e166c13bc1..9432bb70e6 100644 --- a/tests/rest/test_well_known.py +++ b/tests/rest/test_well_known.py @@ -135,5 +135,7 @@ def test_client_well_known_msc3861_oauth_delegation(self) -> None: "issuer": "https://issuer", "account": "https://my-account.issuer", }, + # Beep: added because iOS crashed without + "m.identity_server": {"base_url": ""}, }, ) From 0daae39b9a43197f623331adbf133d6cbe9a66b5 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:02:36 +0000 Subject: [PATCH 14/29] Allow checking username availability when registration disabled --- synapse/rest/client/register.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py index 634ebed2be..dd9ffdafc4 100644 --- a/synapse/rest/client/register.py +++ b/synapse/rest/client/register.py @@ -337,10 +337,11 @@ def __init__(self, hs: "HomeServer"): ) async def on_GET(self, request: Request) -> Tuple[int, JsonDict]: - if not self.hs.config.registration.enable_registration: - raise SynapseError( - 403, "Registration has been disabled", errcode=Codes.FORBIDDEN - ) + # Beeper: We should be able to check availability of usernames even though public registration is disabled + # if not self.hs.config.registration.enable_registration: + # raise SynapseError( + # 403, "Registration has been disabled", errcode=Codes.FORBIDDEN + # ) if self.inhibit_user_in_use_error: return 200, {"available": True} From d25c9f3e3dd14b72ce0a5f388b2f50dea2a41ecf Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:02:53 +0000 Subject: [PATCH 15/29] Change default DM room redaction power level to 100 --- synapse/handlers/room.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f5fa0c0723..19b3451a15 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -157,7 +157,8 @@ def __init__(self, hs: "HomeServer"): "history_visibility": HistoryVisibility.SHARED, "original_invitees_have_ops": True, "guest_can_join": True, - "power_level_content_override": {"invite": 0}, + # Beeper change: don't allow redactions by anyone in DM chats + "power_level_content_override": {"invite": 0, "redact": 1000}, }, RoomCreationPreset.PUBLIC_CHAT: { "join_rules": JoinRules.PUBLIC, From 93398a08b0565513e584c9f373676bc03e3a28a9 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:03:05 +0000 Subject: [PATCH 16/29] Disable transaction timeout when purging rooms --- synapse/storage/databases/main/purge_events.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 3b81ed943c..c4797cd975 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -376,6 +376,10 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: (room_id,), ) + # Disable statement timeouts for this transaction; purging rooms can + # take a while! + txn.execute("SET LOCAL statement_timeout = 0") + # First, fetch all the state groups that should be deleted, before # we delete that information. txn.execute( From b129163b8f4213ddc92b7f920e6d5c591bfca99c Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:44:02 +0000 Subject: [PATCH 17/29] Send all notifications as high priority --- synapse/push/httppusher.py | 18 +++++++++++++++--- tests/push/test_http.py | 12 ++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index dd9b64d6ef..78c57ef9a4 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -132,6 +132,8 @@ def __init__(self, hs: "HomeServer", pusher_config: PusherConfig): pusher_config.app_id, pusher_config.pushkey, ) + # Beeper: Save this so we can pass this on to Sygnal as well + self.user_name = pusher_config.user_name # Validate that there's a URL and it is of the proper form. if "url" not in self.data: @@ -445,7 +447,7 @@ async def dispatch_push_event( rejected push keys otherwise. If this array is empty, the push fully succeeded. """ - priority = "low" + priority = "high" # Beep: always use high priority if ( event.type == EventTypes.Encrypted or tweaks.get("highlight") @@ -461,7 +463,11 @@ async def dispatch_push_event( content: JsonDict = { "event_id": event.event_id, "room_id": event.room_id, - "counts": {"unread": badge}, + "counts": { + "unread": badge, + "com.beeper.server_type": "synapse", + }, + "com.beeper.user_id": self.user_id, "prio": priority, } # event_id_only doesn't include the tweaks, so override them. @@ -480,8 +486,10 @@ async def dispatch_push_event( "prio": priority, "counts": { "unread": badge, + "com.beeper.server_type": "synapse", # 'missed_calls': 2 }, + "com.beeper.user_id": self.user_id, } if event.type == "m.room.member" and event.is_state(): content["membership"] = event.content["membership"] @@ -515,7 +523,11 @@ async def _send_badge(self, badge: int) -> None: "id": "", "type": None, "sender": "", - "counts": {"unread": badge}, + "counts": { + "unread": badge, + "com.beeper.server_type": "synapse", + }, + "com.beeper.user_id": self.user_id, "devices": [ { "app_id": self.app_id, diff --git a/tests/push/test_http.py b/tests/push/test_http.py index dce00d8b7f..c609adc589 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -415,8 +415,8 @@ def test_sends_high_priority_for_one_to_one_only(self) -> None: self.push_attempts[1][1], "http://example.com/_matrix/push/v1/notify" ) - # check that this is low-priority - self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low") + # Beeper: all notifications are high priority + self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "high") def test_sends_high_priority_for_mention(self) -> None: """ @@ -492,8 +492,8 @@ def test_sends_high_priority_for_mention(self) -> None: self.push_attempts[1][1], "http://example.com/_matrix/push/v1/notify" ) - # check that this is low-priority - self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low") + # Beeper: all notifications are high priority + self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "high") def test_sends_high_priority_for_atroom(self) -> None: """ @@ -576,8 +576,8 @@ def test_sends_high_priority_for_atroom(self) -> None: self.push_attempts[1][1], "http://example.com/_matrix/push/v1/notify" ) - # check that this is low-priority - self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low") + # Beeper: all notifications are high priority + self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "high") def test_push_unread_count_group_by_room(self) -> None: """ From ed9e0390ef90e2bb57c60e701ddf7f2ffba7eb12 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:44:19 +0000 Subject: [PATCH 18/29] Add Beeper specific test cases --- tests/push/test_push_rule_evaluator.py | 127 ++++++++++++++++++++++- tests/replication/storage/test_events.py | 2 +- tests/rest/client/test_sync.py | 34 +++++- tests/rest/client/test_upgrade_room.py | 37 ++++++- 4 files changed, 196 insertions(+), 4 deletions(-) diff --git a/tests/push/test_push_rule_evaluator.py b/tests/push/test_push_rule_evaluator.py index 420fbea998..eb040e982d 100644 --- a/tests/push/test_push_rule_evaluator.py +++ b/tests/push/test_push_rule_evaluator.py @@ -34,7 +34,7 @@ from synapse.rest.client import login, register, room from synapse.server import HomeServer from synapse.storage.databases.main.appservice import _make_exclusive_regex -from synapse.synapse_rust.push import PushRuleEvaluator +from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator, PushRules from synapse.types import JsonDict, JsonMapping, UserID from synapse.util import Clock from synapse.util.frozenutils import freeze @@ -955,3 +955,128 @@ def test_delayed_message(self) -> None: # user2 should not be notified about it, because they can't see it. self.assertEqual(self.get_notif_count(self.user_id2), 0) + + +class PushRuleEvaluatorBaseRulesTestCase(unittest.TestCase): + def test_reactions(self) -> None: + message_event = FrozenEvent( + { + "event_id": "$event_id", + "room_id": "!room_id:beeper.com", + "content": { + "body": "Looks like Nick is way ahead of me on this one", + "msgtype": "m.text", + }, + "sender": "@brad:beeper.com", + "type": "m.room.message", + }, + RoomVersions.V1, + ) + + reaction_event = FrozenEvent( + { + "event_id": "$reaction_id", + "room_id": "!room_id:beeper.com", + "content": { + "m.relates_to": { + "event_id": "$event_id", + "key": "\U0001F44D", + "rel_type": "m.annotation", + } + }, + "sender": "@nick:beeper.com", + "type": "m.reaction", + }, + RoomVersions.V1, + ) + + dm_evaluator = PushRuleEvaluator( + _flatten_dict(reaction_event), + False, + 2, + 0, + {}, + {"m.annotation": _flatten_dict(message_event)}, + True, + reaction_event.room_version.msc3931_push_features, + True, + ) + + # Reaction to Brad's message, should be an action for Brad + actions = dm_evaluator.run( + FilteredPushRules(PushRules([]), {}, True, True, True, True), + "@brad:beeper.com", + "Brad", + ) + self.assertTrue("notify" in actions) + + # Reaction to Brad's message, should not be an action for Nick + actions = dm_evaluator.run( + FilteredPushRules(PushRules([]), {}, True, True, True, True), + "@nick:beeper.com", + "Nick", + ) + self.assertEqual(actions, []) + + large_room_evaluator = PushRuleEvaluator( + _flatten_dict(reaction_event), + False, + 30, + 0, + {}, + {"m.annotation": _flatten_dict(message_event)}, + True, + reaction_event.room_version.msc3931_push_features, + True, + ) + + # Large rooms should never have emoji reaction notifications + actions = large_room_evaluator.run( + FilteredPushRules(PushRules([]), {}, True, True, True, True), + "@brad:beeper.com", + "Brad", + ) + self.assertEqual(actions, []) + actions = large_room_evaluator.run( + FilteredPushRules(PushRules([]), {}, True, True, True, True), + "@nick:beeper.com", + "Nick", + ) + self.assertEqual(actions, []) + + def test_supress_auto_accept_invite(self) -> None: + event = FrozenEvent( + { + "event_id": "$event_id", + "room_id": "!wFyjEwanOaElpGOaLW:beeper.com", + "content": { + "displayname": "Brad Murray", + "fi.mau.will_auto_accept": True, + "is_direct": True, + "membership": "invite", + }, + "sender": "@_brad_imessagecloud_83372:beeper.com", + "state_key": "@brad:beeper.com", + "type": "m.room.member", + }, + RoomVersions.V1, + ) + + evaluator = PushRuleEvaluator( + _flatten_dict(event), + False, + 0, + 0, + {}, + {}, + True, + event.room_version.msc3931_push_features, + True, + ) + + actions = evaluator.run( + FilteredPushRules(PushRules([]), {}, True, True, True, True), + "@brad:beeper.com", + "Brad Murray", + ) + self.assertEqual(actions, []) diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index 86c8f14d1b..eb10226a30 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -407,7 +407,7 @@ def build_event( self.master_store.add_push_actions_to_staging( event.event_id, dict(push_actions), - False, + {user_id: False for user_id, _ in push_actions}, "main", ) ) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 17328e1509..5c94e1a747 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -32,7 +32,16 @@ ReceiptTypes, RelationTypes, ) -from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync +from synapse.rest.client import ( + account_data, + devices, + knock, + login, + read_marker, + receipts, + room, + sync, +) from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -382,6 +391,7 @@ def test_knock_room_state(self) -> None: class UnreadMessagesTestCase(unittest.HomeserverTestCase): servlets = [ + account_data.register_servlets, synapse.rest.admin.register_servlets, login.register_servlets, read_marker.register_servlets, @@ -579,6 +589,28 @@ def test_unread_counts(self) -> None: self.assertEqual(channel.code, 200, channel.json_body) self._check_unread_count(0) + def test_beeper_inbox_state_can_update_unread_count(self) -> None: + # increase unread count + self.helper.join(room=self.room_id, user=self.user2, tok=self.tok2) + res = self.helper.send(self.room_id, "hello", tok=self.tok2) + self._check_unread_count(1) + + # Beeper: inbox_state should be able to send read receipts + res = self.helper.send(self.room_id, "hello", tok=self.tok2) + + channel = self.make_request( + "PUT", + f"/_matrix/client/unstable/com.beeper.inbox/user/{self.user_id}/rooms/{self.room_id}/inbox_state", + { + "read_markers": { + ReceiptTypes.READ: res["event_id"], + }, + }, + access_token=self.tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + self._check_unread_count(0) + # We test for all three receipt types that influence notification counts @parameterized.expand( [ diff --git a/tests/rest/client/test_upgrade_room.py b/tests/rest/client/test_upgrade_room.py index c4b15c5ae7..0bf5bcd3a5 100644 --- a/tests/rest/client/test_upgrade_room.py +++ b/tests/rest/client/test_upgrade_room.py @@ -26,7 +26,7 @@ from synapse.api.constants import EventContentFields, EventTypes, RoomTypes from synapse.config.server import DEFAULT_ROOM_VERSION from synapse.rest import admin -from synapse.rest.client import login, room, room_upgrade_rest_servlet +from synapse.rest.client import login, notifications, room, room_upgrade_rest_servlet from synapse.server import HomeServer from synapse.util import Clock @@ -40,6 +40,7 @@ class UpgradeRoomTest(unittest.HomeserverTestCase): login.register_servlets, room.register_servlets, room_upgrade_rest_servlet.register_servlets, + notifications.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -411,3 +412,37 @@ def test_first_upgrade_does_not_block_second(self) -> None: channel = self._upgrade_room(expire_cache=False) self.assertEqual(200, channel.code, channel.result) + + def test_upgrade_clears_push_actions(self) -> None: + """ + Beeper specific test: ensure that when upgrading a room any notification/unread counts + in the old room are removed. + """ + self.helper.send_event( + self.room_id, + "m.room.message", + content={"body": "hi", "msgtype": "text"}, + tok=self.other_token, + ) + + # Check we have a notification pre-upgrade + channel = self.make_request( + "GET", + "/notifications", + access_token=self.creator_token, + ) + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual(len(channel.json_body["notifications"]), 1, channel.json_body) + + channel = self._upgrade_room() + self.assertEqual(200, channel.code, channel.result) + self.assertIn("replacement_room", channel.json_body) + + # Check we have no notification pre-upgrade + channel = self.make_request( + "GET", + "/notifications", + access_token=self.creator_token, + ) + self.assertEqual(channel.code, 200, channel.result) + self.assertEqual(len(channel.json_body["notifications"]), 0, channel.json_body) From 7b4c741c70d431c37ae48cd83c1230fc731930c4 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 17:44:32 +0000 Subject: [PATCH 19/29] Add CI --- .github/workflows/beeper-ci.yaml | 138 +++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 .github/workflows/beeper-ci.yaml diff --git a/.github/workflows/beeper-ci.yaml b/.github/workflows/beeper-ci.yaml new file mode 100644 index 0000000000..61ba8dda25 --- /dev/null +++ b/.github/workflows/beeper-ci.yaml @@ -0,0 +1,138 @@ +name: Beep + +on: + push: + branches: ["beeper", "beeper-*"] + pull_request: + + +jobs: + lint-style: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + - run: pip install poetry + - run: poetry install + - run: poetry run isort --check synapse + - run: poetry run black --check synapse + + lint-types: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + - run: pip install poetry + - run: poetry install --extras all + - run: poetry run mypy + + # Tests + + test-trial: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + - run: pip install poetry + - run: poetry install --extras all + - run: poetry run trial -j4 tests + + test-sytest: + runs-on: ubuntu-latest + container: + image: matrixdotorg/sytest-synapse:focal + volumes: + - ${{ github.workspace }}:/src + env: + SYTEST_BRANCH: 845764081c890180e3842f135f87e0aa778557a6 + TOP: ${{ github.workspace }} + POSTGRES: 1 + MULTI_POSTGRES: 1 + WOKRERS: 1 + steps: + - uses: actions/checkout@v2 + - name: Run SyTest + run: /bootstrap.sh synapse + working-directory: /src + - name: Summarise results.tap + if: ${{ always() }} + run: /sytest/scripts/tap_to_gha.pl /logs/results.tap + - name: Upload SyTest logs + uses: actions/upload-artifact@v2 + if: ${{ always() }} + with: + name: Sytest Logs - ${{ job.status }} - (${{ join(matrix.*, ', ') }}) + path: | + /logs/results.tap + /logs/**/*.log* + + test-complement: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/checkout@v2 + with: + repository: matrix-org/complement + path: complement + ref: 39b3b9dbb2df96ca1df6b6ae86bdbc63fc96f39e + - name: Install complement dependencies + run: |- + sudo apt-get -qq update + sudo apt-get install -qqy libolm3 libolm-dev + go install -v github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest + - name: Run Complement + run: ./scripts-dev/complement.sh + env: + COMPLEMENT_DIR: complement + + # Builds + + build-python: + runs-on: ubuntu-latest + env: + DOCKER_BUILDKIT: 1 + steps: + - uses: actions/checkout@v2 + - uses: docker/setup-qemu-action@v2 + - uses: docker/setup-buildx-action@v2 + with: + # TEMPORARY, see: https://github.com/docker/build-push-action/issues/761 + driver-opts: | + image=moby/buildkit:v0.10.6 + - uses: docker/login-action@v2 + with: + registry: ${{ secrets.CI_REGISTRY }} + username: ${{ secrets.CI_REGISTRY_USER }} + password: ${{ secrets.CI_REGISTRY_PASSWORD }} + - run: |- + if [ "${{ github.ref_name }}" = "beeper" ]; then + tag=$(cat pyproject.toml | grep -E "^version =" | sed -E 's/^version = "(.+)"$/\1/') + else + tag="${{ github.head_ref || github.ref_name }}" + fi + + docker buildx build \ + --push \ + --platform linux/amd64 \ + --tag ${{ secrets.CI_REGISTRY }}/synapse:$tag-${{ github.sha }} \ + -f docker/Dockerfile \ + . + + if [ "${{ github.ref_name }}" = "beeper" ]; then + docker pull ${{ secrets.CI_REGISTRY }}/synapse:$tag-${{ github.sha }} + docker tag \ + ${{ secrets.CI_REGISTRY }}/synapse:$tag-${{ github.sha }} \ + ${{ secrets.CI_REGISTRY }}/synapse:latest + docker push ${{ secrets.CI_REGISTRY }}/synapse:latest + fi + + # Ensure the image works properly + docker run \ + --entrypoint '' \ + ${{ secrets.CI_REGISTRY }}/synapse:$tag-${{ github.sha }} \ + python -m synapse.app.homeserver --help + + echo "Pushed image: synapse:$tag-${{ github.sha }}" + if [ "${{ github.ref_name }}" = "beeper" ]; then + echo "Pushed image: synapse:latest" + fi From 274edb9f6d6884ba03cea9eeac81768de1039249 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 18:07:19 +0000 Subject: [PATCH 20/29] Remove dependabot config Annoying! Will lave that to upstream and we'll get updates as we rebase. --- .github/dependabot.yml | 23 ----------------------- 1 file changed, 23 deletions(-) delete mode 100644 .github/dependabot.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml deleted file mode 100644 index 7ce353ed64..0000000000 --- a/.github/dependabot.yml +++ /dev/null @@ -1,23 +0,0 @@ -version: 2 -updates: - - # "pip" is the correct setting for poetry, per https://docs.github.com/en/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file#package-ecosystem - package-ecosystem: "pip" - directory: "/" - schedule: - interval: "weekly" - - - package-ecosystem: "docker" - directory: "/docker" - schedule: - interval: "weekly" - - - package-ecosystem: "github-actions" - directory: "/" - schedule: - interval: "weekly" - - - package-ecosystem: "cargo" - directory: "/" - versioning-strategy: "lockfile-only" - schedule: - interval: "weekly" From 9afd59bb9ab10a01c4f6c96b5523fe5ad68f4e1b Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 8 Mar 2024 18:44:34 +0000 Subject: [PATCH 21/29] Add Beeper readme --- README.md | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000000..a6489d1195 --- /dev/null +++ b/README.md @@ -0,0 +1,38 @@ +# Synapse: Beeper Edition + + +## Rebase flow + +### Create PR + +Here we're upgrading to `v1.96.1`: + +``` +# Make a new branch from the upstream release, we do this so we can create a PR +# of Beeper -> upstream to run tests/confirm we're happy. +git checkout -f v1.96.1 +git checkout -b upstream-1.96.1 +git push -u beeper upstream-1.96.1 + +# Check out the base branch, pull any changes +git checkout beeper +git pull + +# Now create a new branch to rebase +git checkout -b beeper-1.96.1 +# And do the rebase +git rebase v1.96.1 +# fix any conflicts... + +# Push and make a PR from this branch to the upstream one created above +git push -u beeper beeper-1.96.1 +``` + +### Make release + +Once it's ready we just overwrite the `beeper` branch with the new one: + +``` +git checkout beeper-1.96.1 +git push --force beeper beeper +``` From b34199134a3f3a5404cf3f511119d89bc8071265 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 25 Mar 2024 16:02:34 +0000 Subject: [PATCH 22/29] Add Beeper release flow scripts --- beeper/complete_release.sh | 31 +++++++++++++++++++++++++++ beeper/prepare_release.sh | 43 ++++++++++++++++++++++++++++++++++++++ beeper/utils.sh | 23 ++++++++++++++++++++ 3 files changed, 97 insertions(+) create mode 100755 beeper/complete_release.sh create mode 100755 beeper/prepare_release.sh create mode 100644 beeper/utils.sh diff --git a/beeper/complete_release.sh b/beeper/complete_release.sh new file mode 100755 index 0000000000..1d612ff350 --- /dev/null +++ b/beeper/complete_release.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +set -euo pipefail +source $(realpath $(dirname $0))/utils.sh + +BEEPER_REMOTE=$(get_beeper_remote) + +VERSION=${1:-} + +if [ -z "$VERSION" ]; then + echo >&2 "Must specify version!" + exit 1 +fi + +echo "Completing Synapse: Beeper Edition version $VERSION" +echo "WARNING: this script will DELETE the branch called: beeper" +read -p "Press enter to continue" + +UPSTREAM_BRANCH=upstream-$VERSION +BEEPER_BRANCH=beeper-$VERSION + +git checkout $BEEPER_BRANCH +git branch -D beeper +git checkout -b beeper +git push --force $BEEPER_REMOTE beeper + +# Cleanup +git branch -D $BEEPER_BRANCH +git push $BEEPER_REMOTE --delete $BEEPER_BRANCH +git branch -D $UPSTREAM_BRANCH +git push $BEEPER_REMOTE --delete $UPSTREAM_BRANCH diff --git a/beeper/prepare_release.sh b/beeper/prepare_release.sh new file mode 100755 index 0000000000..ebd99537c3 --- /dev/null +++ b/beeper/prepare_release.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +set -euo pipefail +source $(realpath $(dirname $0))/utils.sh + +BEEPER_REMOTE=$(get_beeper_remote) + +VERSION=${1:-} + +if [ -z "$VERSION" ]; then + echo >&2 "Must specify version!" + exit 1 +fi + +STARTING_BRANCH=$(git branch --show-current) + +echo "Preparing Synapse: Beeper Edition version $VERSION" +echo "WARNING: this script will rebase on top of the CURRENT BRANCH: $STARTING_BRANCH" +read -p "Press enter to continue" + +TAG=v$VERSION +UPSTREAM_BRANCH=upstream-$VERSION +BEEPER_BRANCH=beeper-$VERSION + +# Checkout the tag, create upstream branch, push it +echo "Setup branch $UPSTREAM_BRANCH" +git checkout -f $TAG +git checkout -b $UPSTREAM_BRANCH +git push -u $BEEPER_REMOTE $UPSTREAM_BRANCH + +# Switch back to our starting branch, create new version branch from it +echo "Setup branch $BEEPER_BRANCH" +git checkout $STARTING_BRANCH +git checkout -b $BEEPER_BRANCH + +# And rebase against upstream, applying only our Beeper commits +echo "Initiate rebase..." +git rebase $UPSTREAM_BRANCH || read -p "Rebase was a mess, press enter once you fix it" + +git push -u $BEEPER_REMOTE $BEEPER_BRANCH + +echo "OK we done!" +echo "Go HERE and make the PR: https://github.com/beeper/synapse/compare/upstream-$VERSION...beeper-$VERSION?expand=1" diff --git a/beeper/utils.sh b/beeper/utils.sh new file mode 100644 index 0000000000..ba7573dc16 --- /dev/null +++ b/beeper/utils.sh @@ -0,0 +1,23 @@ +function get_upstream_remote() { + for remote in $(git remote); do + url=$(git remote get-url $remote) + if [ "$url" = "git@github.com:element-hq/synapse.git" ]; then + echo $remote + return 0 + fi + done + echo >&2 "No upstream remote found (looking for URL: git@github.com:element-hq/synapse.git)" + return 1 +} + +function get_beeper_remote() { + for remote in $(git remote); do + url=$(git remote get-url $remote) + if [ "$url" = "git@github.com:beeper/synapse.git" ]; then + echo $remote + return 0 + fi + done + echo >&2 "No upstream remote found (looking for URL: git@github.com:beeper/synapse.git)" + return 1 +} From b2af452dc8aef374a63bcd5d6decbfa8d1200d6e Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Thu, 28 Mar 2024 13:24:35 +0000 Subject: [PATCH 23/29] Add env var to disable writing to the `user_ips` table --- synapse/storage/databases/main/client_ips.py | 22 +++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 4b66247640..db29659afe 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -20,6 +20,7 @@ # import logging +from os import environ from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union, cast import attr @@ -49,6 +50,8 @@ # 120 seconds == 2 minutes LAST_SEEN_GRANULARITY = 120 * 1000 +DISABLE_CLIENT_IP_STORAGE = environ.get("SYNAPSE_DISABLE_CLIENT_IP_STORAGE") == "true" + @attr.s(slots=True, frozen=True, auto_attribs=True) class DeviceLastConnectionInfo: @@ -614,6 +617,7 @@ async def insert_client_ip( requests are not directly driven by end-users. This is a hack and we're not very proud of it. """ + # The sync proxy continuously triggers /sync even if the user is not # present so should be excluded from user_ips entries. if user_agent == "sync-v3-proxy-": @@ -689,14 +693,16 @@ def _update_client_ips_batch_txn( devices_keys.append((user_id, device_id)) devices_values.append((user_agent, last_seen, ip)) - self.db_pool.simple_upsert_many_txn( - txn, - table="user_ips", - key_names=("user_id", "access_token", "ip"), - key_values=user_ips_keys, - value_names=("user_agent", "device_id", "last_seen"), - value_values=user_ips_values, - ) + # Beep: only store user_ips if not disabled + if not DISABLE_CLIENT_IP_STORAGE: + self.db_pool.simple_upsert_many_txn( + txn, + table="user_ips", + key_names=("user_id", "access_token", "ip"), + key_values=user_ips_keys, + value_names=("user_agent", "device_id", "last_seen"), + value_values=user_ips_values, + ) if devices_values: self.db_pool.simple_update_many_txn( From 7dd73ca0b878fd3d1a3cafc8877dc539f01ea236 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Thu, 28 Mar 2024 13:32:25 +0000 Subject: [PATCH 24/29] Increase last seen granularity to 1h --- synapse/storage/databases/main/client_ips.py | 4 ++-- tests/storage/test_client_ips.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index db29659afe..ecc28e8bc0 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -47,8 +47,8 @@ # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits -# 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 +# 120 seconds == 2 minutes, Beep: updated to 1h +LAST_SEEN_GRANULARITY = 3600 * 1000 DISABLE_CLIENT_IP_STORAGE = environ.get("SYNAPSE_DISABLE_CLIENT_IP_STORAGE") == "true" diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py index 13f78ee2d2..b722fe5567 100644 --- a/tests/storage/test_client_ips.py +++ b/tests/storage/test_client_ips.py @@ -100,7 +100,7 @@ def test_insert_new_client_ip_none_device_id(self) -> None: user_id, "access_token", "ip", "user_agent", None ) ) - self.reactor.advance(200) + self.reactor.advance(3600) self.pump(0) result = cast( @@ -153,7 +153,7 @@ def test_insert_new_client_ip_none_device_id(self) -> None: ) # Only one result, has been upserted. self.assertEqual( - result, [("access_token", "ip", "user_agent", None, 12345878000)] + result, [("access_token", "ip", "user_agent", None, 12349278000)] ) @parameterized.expand([(False,), (True,)]) From 63b773d1bac8d54cd655dd9f61807842aa6cc32e Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 21 May 2024 12:04:18 +0100 Subject: [PATCH 25/29] Bump sytest commit --- .github/workflows/beeper-ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/beeper-ci.yaml b/.github/workflows/beeper-ci.yaml index 61ba8dda25..331a4550c6 100644 --- a/.github/workflows/beeper-ci.yaml +++ b/.github/workflows/beeper-ci.yaml @@ -44,7 +44,7 @@ jobs: volumes: - ${{ github.workspace }}:/src env: - SYTEST_BRANCH: 845764081c890180e3842f135f87e0aa778557a6 + SYTEST_BRANCH: 9b80feaa947bce9e181b47362879b0e94d3a9f1d TOP: ${{ github.workspace }} POSTGRES: 1 MULTI_POSTGRES: 1 From 2f7e55c77b0335957e6d023d02d9d80fc1a25d2c Mon Sep 17 00:00:00 2001 From: Adam Van Ymeren Date: Wed, 22 May 2024 11:15:14 -0700 Subject: [PATCH 26/29] Add com.beeper.hs.order to initial and incremental sync responses. --- synapse/storage/databases/main/receipts.py | 43 +++++++++++++--------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 69dfe4dac8..30068d66ec 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -416,13 +416,13 @@ async def _get_linearized_receipts_for_rooms( def f( txn: LoggingTransaction, - ) -> List[Tuple[str, str, str, str, Optional[str], str]]: + ) -> List[Tuple[str, str, str, str, Optional[str], str, str]]: if from_key: sql = """ SELECT stream_id, instance_name, room_id, receipt_type, - user_id, event_id, thread_id, data - FROM receipts_linearized WHERE - stream_id > ? AND stream_id <= ? AND + user_id, event_id, thread_id, event_stream_ordering, data + FROM receipts_linearized + WHERE stream_id > ? AND stream_id <= ? AND """ clause, args = make_in_list_sql_clause( self.database_engine, "room_id", room_ids @@ -435,9 +435,9 @@ def f( else: sql = """ SELECT stream_id, instance_name, room_id, receipt_type, - user_id, event_id, thread_id, data - FROM receipts_linearized WHERE - stream_id <= ? AND + user_id, event_id, thread_id, event_stream_ordering, data + FROM receipts_linearized + WHERE stream_id > ? AND stream_id <= ? AND """ clause, args = make_in_list_sql_clause( @@ -447,8 +447,8 @@ def f( txn.execute(sql + clause, [to_key.get_max_stream_pos()] + list(args)) return [ - (room_id, receipt_type, user_id, event_id, thread_id, data) - for stream_id, instance_name, room_id, receipt_type, user_id, event_id, thread_id, data in txn + (room_id, receipt_type, user_id, event_id, thread_id, event_stream_ordering, data) + for stream_id, instance_name, room_id, receipt_type, user_id, event_id, thread_id, event_stream_ordering, data in txn if MultiWriterStreamToken.is_stream_position_in_range( from_key, to_key, instance_name, stream_id ) @@ -459,7 +459,7 @@ def f( ) results: JsonDict = {} - for room_id, receipt_type, user_id, event_id, thread_id, data in txn_results: + for room_id, receipt_type, user_id, event_id, thread_id, event_stream_ordering, data in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( @@ -480,6 +480,10 @@ def f( # This means we will drop some receipts, but MSC4102 is designed to drop semantically # meaningless receipts, so this is okay. Previously, we would drop meaningful data! receipt_data = db_to_json(data) + + # MSC4033: inject event order into receipt + receipt_data["com.beeper.hs.order"] = event_stream_ordering + if user_id in receipt_type_dict: # existing receipt # is the existing receipt threaded and we are currently processing an unthreaded one? if "thread_id" in receipt_type_dict[user_id] and not thread_id: @@ -517,19 +521,19 @@ async def get_linearized_receipts_for_all_rooms( A dictionary of roomids to a list of receipts. """ - def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]: + def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str, str]]: if from_key: sql = """ - SELECT stream_id, instance_name, room_id, receipt_type, user_id, event_id, data - FROM receipts_linearized WHERE - stream_id > ? AND stream_id <= ? + SELECT stream_id, instance_name, room_id, receipt_type, user_id, event_id, event_stream_ordering, data + FROM receipts_linearized + WHERE stream_id > ? AND stream_id <= ? ORDER BY stream_id DESC LIMIT 100 """ txn.execute(sql, [from_key.stream, to_key.get_max_stream_pos()]) else: sql = """ - SELECT stream_id, instance_name, room_id, receipt_type, user_id, event_id, data + SELECT stream_id, instance_name, room_id, receipt_type, user_id, event_id, event_stream_ordering, data FROM receipts_linearized WHERE stream_id <= ? ORDER BY stream_id DESC @@ -539,8 +543,8 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]: txn.execute(sql, [to_key.get_max_stream_pos()]) return [ - (room_id, receipt_type, user_id, event_id, data) - for stream_id, instance_name, room_id, receipt_type, user_id, event_id, data in txn + (room_id, receipt_type, user_id, event_id, event_stream_ordering, data) + for stream_id, instance_name, room_id, receipt_type, user_id, event_id, event_stream_ordering, data in txn if MultiWriterStreamToken.is_stream_position_in_range( from_key, to_key, instance_name, stream_id ) @@ -551,7 +555,7 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]: ) results: JsonDict = {} - for room_id, receipt_type, user_id, event_id, data in txn_results: + for room_id, receipt_type, user_id, event_id, event_stream_ordering, data in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( @@ -566,6 +570,9 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]: receipt_type_dict[user_id] = db_to_json(data) + # MSC4033: inject event order into receipt + receipt_type_dict[user_id]["com.beeper.hs.order"] = event_stream_ordering + return results async def get_users_sent_receipts_between( From a039f07fc969ef0f9e80a2686673b8902dc9e4f1 Mon Sep 17 00:00:00 2001 From: Adam Van Ymeren Date: Wed, 22 May 2024 11:21:05 -0700 Subject: [PATCH 27/29] fix linter --- synapse/storage/databases/main/receipts.py | 29 +++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 30068d66ec..6920dc944c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -447,7 +447,15 @@ def f( txn.execute(sql + clause, [to_key.get_max_stream_pos()] + list(args)) return [ - (room_id, receipt_type, user_id, event_id, thread_id, event_stream_ordering, data) + ( + room_id, + receipt_type, + user_id, + event_id, + thread_id, + event_stream_ordering, + data, + ) for stream_id, instance_name, room_id, receipt_type, user_id, event_id, thread_id, event_stream_ordering, data in txn if MultiWriterStreamToken.is_stream_position_in_range( from_key, to_key, instance_name, stream_id @@ -459,7 +467,15 @@ def f( ) results: JsonDict = {} - for room_id, receipt_type, user_id, event_id, thread_id, event_stream_ordering, data in txn_results: + for ( + room_id, + receipt_type, + user_id, + event_id, + thread_id, + event_stream_ordering, + data, + ) in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( @@ -555,7 +571,14 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str, str]]: ) results: JsonDict = {} - for room_id, receipt_type, user_id, event_id, event_stream_ordering, data in txn_results: + for ( + room_id, + receipt_type, + user_id, + event_id, + event_stream_ordering, + data, + ) in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( From edddcd414515286ad8da237b36f0ed5b3ace0940 Mon Sep 17 00:00:00 2001 From: Adam Van Ymeren Date: Wed, 22 May 2024 12:36:26 -0700 Subject: [PATCH 28/29] shorten delta --- synapse/storage/databases/main/receipts.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 6920dc944c..21a3a2e138 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -421,8 +421,8 @@ def f( sql = """ SELECT stream_id, instance_name, room_id, receipt_type, user_id, event_id, thread_id, event_stream_ordering, data - FROM receipts_linearized - WHERE stream_id > ? AND stream_id <= ? AND + FROM receipts_linearized WHERE + stream_id > ? AND stream_id <= ? AND """ clause, args = make_in_list_sql_clause( self.database_engine, "room_id", room_ids @@ -436,8 +436,8 @@ def f( sql = """ SELECT stream_id, instance_name, room_id, receipt_type, user_id, event_id, thread_id, event_stream_ordering, data - FROM receipts_linearized - WHERE stream_id > ? AND stream_id <= ? AND + FROM receipts_linearized WHERE + stream_id > ? AND stream_id <= ? AND """ clause, args = make_in_list_sql_clause( @@ -541,8 +541,8 @@ def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str, str]]: if from_key: sql = """ SELECT stream_id, instance_name, room_id, receipt_type, user_id, event_id, event_stream_ordering, data - FROM receipts_linearized - WHERE stream_id > ? AND stream_id <= ? + FROM receipts_linearized WHERE + stream_id > ? AND stream_id <= ? ORDER BY stream_id DESC LIMIT 100 """ From 6633d023385de0c7f52d7f7f0236e121be393229 Mon Sep 17 00:00:00 2001 From: Adam Van Ymeren Date: Wed, 22 May 2024 12:37:19 -0700 Subject: [PATCH 29/29] fix weird copy paste --- synapse/storage/databases/main/receipts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 21a3a2e138..ebf5d7106a 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -437,7 +437,7 @@ def f( SELECT stream_id, instance_name, room_id, receipt_type, user_id, event_id, thread_id, event_stream_ordering, data FROM receipts_linearized WHERE - stream_id > ? AND stream_id <= ? AND + stream_id <= ? AND """ clause, args = make_in_list_sql_clause(