Skip to content

Commit

Permalink
Only calculate initial sync for 10 rooms at a time
Browse files Browse the repository at this point in the history
This helps to ensure we don't completely starve other requests.
  • Loading branch information
erikjohnston committed Feb 10, 2016
1 parent e557dc8 commit 8e49892
Showing 1 changed file with 29 additions and 24 deletions.
53 changes: 29 additions & 24 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
from synapse.util.metrics import Measure

from twisted.internet import defer
Expand Down Expand Up @@ -228,10 +228,14 @@ def full_state_sync(self, sync_config, timeline_since_token):
invited = []
archived = []
deferreds = []
for event in room_list:
if event.membership == Membership.JOIN:
with PreserveLoggingContext(LoggingContext.current_context()):
room_sync_deferred = self.full_state_sync_for_joined_room(

room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
for room_list_chunk in room_list_chunks:
for event in room_list_chunk:
if event.membership == Membership.JOIN:
room_sync_deferred = preserve_fn(
self.full_state_sync_for_joined_room
)(
room_id=event.room_id,
sync_config=sync_config,
now_token=now_token,
Expand All @@ -240,20 +244,21 @@ def full_state_sync(self, sync_config, timeline_since_token):
tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room,
)
room_sync_deferred.addCallback(joined.append)
deferreds.append(room_sync_deferred)
elif event.membership == Membership.INVITE:
invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult(
room_id=event.room_id,
invite=invite,
))
elif event.membership in (Membership.LEAVE, Membership.BAN):
leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
)
with PreserveLoggingContext(LoggingContext.current_context()):
room_sync_deferred = self.full_state_sync_for_archived_room(
room_sync_deferred.addCallback(joined.append)
deferreds.append(room_sync_deferred)
elif event.membership == Membership.INVITE:
invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult(
room_id=event.room_id,
invite=invite,
))
elif event.membership in (Membership.LEAVE, Membership.BAN):
leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
)
room_sync_deferred = preserve_fn(
self.full_state_sync_for_archived_room
)(
sync_config=sync_config,
room_id=event.room_id,
leave_event_id=event.event_id,
Expand All @@ -262,12 +267,12 @@ def full_state_sync(self, sync_config, timeline_since_token):
tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room,
)
room_sync_deferred.addCallback(archived.append)
deferreds.append(room_sync_deferred)
room_sync_deferred.addCallback(archived.append)
deferreds.append(room_sync_deferred)

yield defer.gatherResults(
deferreds, consumeErrors=True
).addErrback(unwrapFirstError)
yield defer.gatherResults(
deferreds, consumeErrors=True
).addErrback(unwrapFirstError)

account_data_for_user = sync_config.filter_collection.filter_account_data(
self.account_data_for_user(account_data)
Expand Down

0 comments on commit 8e49892

Please sign in to comment.