Skip to content

Commit

Permalink
Atomically persit push actions when we persist the event
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Feb 9, 2016
1 parent f28cc45 commit 7b0d846
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 65 deletions.
1 change: 1 addition & 0 deletions synapse/events/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ def __init__(self, current_state=None):
self.current_state = current_state
self.state_group = None
self.rejected = False
self.push_actions = []
10 changes: 5 additions & 5 deletions synapse/handlers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,13 @@ def handle_new_client_event(self, event, context, extra_users=[]):
"You don't have permission to redact events"
)

(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)

action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
event, self, context.current_state
event, context, self
)

(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)

destinations = set()
Expand Down
12 changes: 6 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,6 @@ def on_receive_pdu(self, origin, pdu, backfilled, state=None,
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)

if not backfilled and not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
event, self
)

@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(
Expand Down Expand Up @@ -1073,6 +1067,12 @@ def _handle_new_event(self, origin, event, state=None, backfilled=False,
auth_events=auth_events,
)

if not backfilled and not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
event, context, self
)

event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
Expand Down
20 changes: 5 additions & 15 deletions synapse/push/action_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import logging

from synapse.api.constants import EventTypes

logger = logging.getLogger(__name__)


Expand All @@ -36,23 +34,15 @@ def __init__(self, hs):
# tag (ie. we just need all the users).

@defer.inlineCallbacks
def handle_push_actions_for_event(self, event, handler, current_state):
if event.type == EventTypes.Redaction and event.redacts is not None:
yield self.store.remove_push_actions_for_event_id(
event.room_id, event.redacts
)

def handle_push_actions_for_event(self, event, context, handler):
bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
event.room_id, self.hs, self.store
)

actions_by_user = yield bulk_evaluator.action_for_event_by_user(
event, handler, current_state
event, handler, context.current_state
)

yield self.store.set_push_actions_for_event_and_users(
event,
[
(uid, None, actions) for uid, actions in actions_by_user.items()
]
)
context.push_actions = [
(uid, None, actions) for uid, actions in actions_by_user.items()
]
45 changes: 16 additions & 29 deletions synapse/storage/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@


class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks
def set_push_actions_for_event_and_users(self, event, tuples):
def _set_push_actions_for_event_and_users(self, txn, event, tuples):
"""
:param event: the event set actions for
:param tuples: list of tuples of (user_id, profile_tag, actions)
Expand All @@ -44,18 +43,12 @@ def set_push_actions_for_event_and_users(self, event, tuples):
'highlight': 1 if _action_has_highlight(actions) else 0,
})

def f(txn):
for uid, _, __ in tuples:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid)
)
return self._simple_insert_many_txn(txn, "event_push_actions", values)

yield self.runInteraction(
"set_actions_for_event_and_users",
f,
)
for uid, _, __ in tuples:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(event.room_id, uid)
)
self._simple_insert_many_txn(txn, "event_push_actions", values)

@cachedInlineCallbacks(num_args=3, lru=True, tree=True)
def get_unread_event_push_actions_by_room_for_user(
Expand Down Expand Up @@ -107,21 +100,15 @@ def _get_unread_event_push_actions_by_room(txn):
)
defer.returnValue(ret)

@defer.inlineCallbacks
def remove_push_actions_for_event_id(self, room_id, event_id):
def f(txn):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id,)
)
txn.execute(
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id)
)
yield self.runInteraction(
"remove_push_actions_for_event_id",
f
def _remove_push_actions_for_event_id(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id,)
)
txn.execute(
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id)
)


Expand Down
26 changes: 16 additions & 10 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,29 @@ def _persist_event_txn(self, txn, event, context, backfilled,
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
is_new_state=True):

# Remove the any existing cache entries for the event_ids
for event, _ in events_and_contexts:
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
txn.call_after(self._invalidate_get_event_cache, event.event_id)

if not backfilled:
txn.call_after(
self._events_stream_cache.entity_has_changed,
event.room_id, event.internal_metadata.stream_ordering,
)

depth_updates = {}
for event, _ in events_and_contexts:
if event.internal_metadata.is_outlier():
continue
depth_updates[event.room_id] = max(
event.depth, depth_updates.get(event.room_id, event.depth)
if not event.internal_metadata.is_outlier():
depth_updates[event.room_id] = max(
event.depth, depth_updates.get(event.room_id, event.depth)
)

if context.push_actions:
self._set_push_actions_for_event_and_users(
txn, event, context.push_actions
)

if event.type == EventTypes.Redaction and event.redacts is not None:
self._remove_push_actions_for_event_id(
txn, event.room_id, event.redacts
)

for room_id, depth in depth_updates.items():
Expand Down

0 comments on commit 7b0d846

Please sign in to comment.