Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an Admin API endpoint to redact all a user's events #17506

Merged
merged 15 commits into from
Sep 18, 2024
70 changes: 56 additions & 14 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,25 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
return writer.finished()

async def start_redact_events(
self, user_id: str, rooms: list, requester: JsonMapping
self,
user_id: str,
rooms: list,
requester: JsonMapping,
reason: Optional[str],
limit: Optional[int],
) -> str:
"""
Start a task redacting the events of the given user in the givent rooms
Start a task redacting the events of the given user in the given rooms

Args:
user_id: the user ID of the user whose events should be redacted
rooms: the rooms in which to redact the user's events
requester: the user requesting the events
reason: reason for requesting the redaction, ie spam, etc
limit: limit on the number of events in each room to redact

Returns:
a unique ID which can be used to query the status of the task
"""
active_tasks = await self._task_scheduler.get_tasks(
actions=[REDACT_ALL_EVENTS_ACTION_NAME],
Expand All @@ -366,7 +376,13 @@ async def start_redact_events(
redact_id = await self._task_scheduler.schedule_task(
REDACT_ALL_EVENTS_ACTION_NAME,
resource_id=user_id,
params={"rooms": rooms, "requester": requester, "user_id": user_id},
params={
"rooms": rooms,
"requester": requester,
"user_id": user_id,
"reason": reason,
"limit": limit,
},
)

logger.info(
Expand All @@ -380,7 +396,7 @@ async def _redact_all_events(
self, task: ScheduledTask
) -> Tuple[TaskStatus, Optional[Mapping[str, Any]], Optional[str]]:
"""
Task to redact all a users events in the given rooms, tracking which, if any, events
Task to redact all of a users events in the given rooms, tracking which, if any, events
whose redaction failed
"""

Expand All @@ -395,30 +411,56 @@ async def _redact_all_events(
user_id = task.params.get("user_id")
assert user_id is not None

result: Dict[str, Any] = {"result": []}
reason = task.params.get("reason")
limit = task.params.get("limit")
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

result: Mapping[str, Any] = (
task.result
if task.result
else {"failed_redactions": {}, "successful_redactions": []}
)
for room in rooms:
room_version = await self._store.get_room_version(room)
events = await self._store.get_events_sent_by_user(user_id, room)
events = await self._store.get_events_sent_by_user_in_room(
user_id, room, limit, "m.room.redaction"
)

if not events:
# there's nothing to redact
return TaskStatus.COMPLETE, result, None

for event in events:
# if we've already successfully redacted this event then skip processing it
if event in result["successful_redactions"]:
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
continue

event_dict = {
"type": EventTypes.Redaction,
"content": {},
"content": {"reason": reason} if reason else {},
"room_id": room,
"sender": requester.user.to_string(),
"sender": user_id,
}
if room_version.updated_redaction_rules:
event_dict["content"]["redacts"] = event[0]
event_dict["content"]["redacts"] = event
else:
event_dict["redacts"] = event[0]
event_dict["redacts"] = event

try:
await self.event_creation_handler.create_and_send_nonmember_event(
requester, event_dict
# set the prev event to the offending message to allow for redactions
# to be processed in the case where the user has been kicked/banned before
# redactions are requested
redaction, _ = (
await self.event_creation_handler.create_and_send_nonmember_event(
requester,
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
event_dict,
prev_event_ids=[event],
ratelimit=False,
)
)
result["successful_redactions"].append(event)
except Exception as ex:
logger.info(f"Redaction of event {event[0]} failed due to: {ex}")
result["result"].append(event[0])
logger.info(f"Redaction of event {event} failed due to: {ex}")
result["failed_redactions"][event] = str(ex)
await self._task_scheduler.update_task(task.id, result=result)

return TaskStatus.COMPLETE, result, None
Expand Down
73 changes: 58 additions & 15 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2440,23 +2440,66 @@ def mark_event_rejected_txn(

self.invalidate_get_event_cache_after_txn(txn, event_id)

async def get_events_sent_by_user(self, user_id: str, room_id: str) -> List[tuple]:
async def get_events_sent_by_user_in_room(
self, user_id: str, room_id: str, limit: Optional[int], filter: str = "none"
) -> Optional[List[str]]:
"""
Get a list of event ids of events sent by user in room
Get a list of event ids and event info of events sent by the user in the specified room
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

Args:
user_id: user ID to search against
room_id: room ID of the room to search for events in
filter: type of event to filter out
limit: maximum number of event ids to return
"""

def _get_events_by_user_txn(
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
txn: LoggingTransaction, user_id: str, room_id: str
) -> List[tuple]:
return self.db_pool.simple_select_many_txn(
txn,
"events",
"sender",
[user_id],
{"room_id": room_id},
retcols=["event_id"],
)
txn: LoggingTransaction,
user_id: str,
room_id: str,
filter: Optional[str],
batch_size: int,
offset: int,
) -> Tuple[Optional[List[str]], int]:

return await self.db_pool.runInteraction(
"get_events_by_user", _get_events_by_user_txn, user_id, room_id
)
sql = """
SELECT event_id FROM events
WHERE sender = ? AND room_id = ?
AND type != ?
ORDER BY received_ts DESC
LIMIT ?
OFFSET ?
"""
txn.execute(sql, (user_id, room_id, filter, batch_size, offset))
res = txn.fetchall()
if res:
events = [row[0] for row in res]
else:
events = None

return events, offset + batch_size

if not limit:
limit = 1000

offset = 0
batch_size = 100
if batch_size < limit:
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
batch_size = limit

selected_ids: List[str] = []
while offset < limit:
res, offset = await self.db_pool.runInteraction(
"get_events_by_user",
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
_get_events_by_user_txn,
user_id,
room_id,
filter,
batch_size,
offset,
)
if res:
selected_ids = selected_ids + res
else:
return selected_ids
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
return selected_ids