Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Avoid fetching left rooms and add back newly_left rooms #17725

Merged
merged 17 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17725.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
More efficiently fetch rooms for Sliding Sync.
26 changes: 19 additions & 7 deletions synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,24 @@ async def get_room_sync_data(
room_sync_config.timeline_limit,
)

# Handle state resets. For example, if we see
# `room_membership_for_user_at_to_token.event_id=None and
# room_membership_for_user_at_to_token.membership is not None`, we should
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state: []`.
state_reset_out_of_room = False
if (
room_membership_for_user_at_to_token.event_id is None
and room_membership_for_user_at_to_token.membership is not None
):
# We only expect the `event_id` to be `None` if you've been state reset out
# of the room (meaning you're no longer in the room). We could put this as
# part of the if-statement above but we want to handle every case where
# `event_id` is `None`.
assert room_membership_for_user_at_to_token.membership is Membership.LEAVE

state_reset_out_of_room = True

# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
Expand Down Expand Up @@ -527,7 +545,7 @@ async def get_room_sync_data(
from_bound = None
initial = True
ignore_timeline_bound = False
if from_token and not newly_joined:
if from_token and not newly_joined and not state_reset_out_of_room:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
Expand Down Expand Up @@ -732,12 +750,6 @@ async def get_room_sync_data(

stripped_state.append(strip_event(invite_or_knock_event))

# TODO: Handle state resets. For example, if we see
# `room_membership_for_user_at_to_token.event_id=None and
# room_membership_for_user_at_to_token.membership is not None`, we should
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`.

# Get the changes to current state in the token range from the
# `current_state_delta_stream` table.
#
Expand Down
281 changes: 183 additions & 98 deletions synapse/handlers/sliding_sync/room_lists.py

Large diffs are not rendered by default.

46 changes: 45 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,7 @@ async def get_sliding_sync_rooms_for_user(
) -> Mapping[str, RoomsForUserSlidingSync]:
"""Get all the rooms for a user to handle a sliding sync request.

Ignores forgotten rooms and rooms that the user has been kicked from.
Ignores forgotten rooms and rooms that the user has left themselves.

Returns:
Map from room ID to membership info
Expand All @@ -1428,6 +1428,7 @@ def get_sliding_sync_rooms_for_user_txn(
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE user_id = ?
AND m.forgotten = 0
AND (m.membership != 'leave' OR m.user_id != m.sender)
"""
txn.execute(sql, (user_id,))
return {
Expand All @@ -1450,6 +1451,49 @@ def get_sliding_sync_rooms_for_user_txn(
get_sliding_sync_rooms_for_user_txn,
)

async def get_sliding_sync_room_for_user(
self, user_id: str, room_id: str
) -> Optional[RoomsForUserSlidingSync]:
"""Get the sliding sync room entry for the given user and room."""

def get_sliding_sync_room_for_user_txn(
txn: LoggingTransaction,
) -> Optional[RoomsForUserSlidingSync]:
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
m.has_known_state,
COALESCE(j.room_type, m.room_type),
COALESCE(j.is_encrypted, m.is_encrypted)
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE user_id = ?
AND m.forgotten = 0
AND m.room_id = ?
"""
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
if not row:
return None

return RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
has_known_state=bool(row[7]),
room_type=row[8],
is_encrypted=row[9],
)

return await self.db_pool.runInteraction(
"get_sliding_sync_room_for_user", get_sliding_sync_room_for_user_txn
)


class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(
Expand Down
20 changes: 18 additions & 2 deletions synapse/storage/databases/main/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,24 @@ async def get_create_event_for_room(self, room_id: str) -> EventBase:
return create_event

@cached(max_entries=10000)
async def get_room_type(self, room_id: str) -> Optional[str]:
raise NotImplementedError()
async def get_room_type(self, room_id: str) -> Union[Optional[str], Sentinel]:
"""Fetch room type for given room.

Since this function is cached, any missing values would be cached as
`None`. In order to distinguish between an unencrypted room that has
`None` encryption and a room that is unknown to the server where we
might want to omit the value (which would make it cached as `None`),
instead we use the sentinel value `ROOM_UNKNOWN_SENTINEL`.
"""

try:
create_event = await self.get_create_event_for_room(room_id)
return create_event.content.get(EventContentFields.ROOM_TYPE)
except NotFoundError:
# We use the sentinel value to distinguish between `None` which is a
# valid room type and a room that is unknown to the server so the value
# is just unset.
return ROOM_UNKNOWN_SENTINEL

@cachedList(cached_method_name="get_room_type", list_name="room_ids")
async def bulk_get_room_type(
Expand Down
7 changes: 7 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,12 @@ async def get_current_state_delta_membership_changes_for_user(
Returns:
All membership changes to the current state in the token range. Events are
sorted by `stream_ordering` ascending.

`event_id`/`sender` can be `None` when the server leaves a room (meaning
everyone locally left) or a state reset which removed the person from the
room. We can't tell the difference between the two cases with what's
available in the `current_state_delta_stream` table. To actually check for a
state reset, you need to check if a membership still exists in the room.
"""
# Start by ruling out cases where a DB query is not necessary.
if from_key == to_key:
Expand Down Expand Up @@ -1052,6 +1058,7 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
membership=(
membership if membership is not None else Membership.LEAVE
),
# This will also be null for the same reasons if `s.event_id = null`
sender=sender,
# Prev event
prev_event_id=prev_event_id,
Expand Down
Loading
Loading