From 41453890d39c17e7723ca44d65152845daa47748 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Wed, 20 Apr 2022 23:15:43 +0300 Subject: [PATCH] Add timeout for executing seds --- sed.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/sed.py b/sed.py index e4bbb59..da0ae85 100644 --- a/sed.py +++ b/sed.py @@ -14,10 +14,13 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from typing import Tuple, NamedTuple, Pattern, Optional, Dict, Deque +from types import FrameType from collections import deque, defaultdict +from contextlib import contextmanager from difflib import SequenceMatcher from html import escape import string +import signal import time import re @@ -35,6 +38,20 @@ SedMatch = Tuple[str, str, str, str, str] +def raise_timeout(sig: signal.Signals, frame_type: FrameType) -> None: + raise TimeoutError() + + +@contextmanager +def timeout(max_time: float = 0.5) -> None: + signal.signal(signal.SIGALRM, raise_timeout) + signal.setitimer(signal.ITIMER_REAL, max_time) + try: + yield + finally: + signal.alarm(0) + + class SedBot(Plugin): prev_user_events: Dict[RoomID, Dict[UserID, MessageEvent]] prev_room_events: Dict[RoomID, Deque[MessageEvent]] @@ -120,7 +137,8 @@ def _compile_passive_statement(cls, match: SedMatch) -> Optional[SedStatement]: @staticmethod def _exec(stmt: SedStatement, body: str) -> str: - return stmt.find.sub(stmt.replace, body, count=0 if stmt.is_global else 1) + with timeout(): + return stmt.find.sub(stmt.replace, body, count=0 if stmt.is_global else 1) @staticmethod def op_to_str(tag: str, old_text: str, new_text: str) -> str: @@ -183,7 +201,14 @@ def _is_recent(evt: MessageEvent) -> bool: @command.passive(r"(?:^|[^a-zA-Z0-9])sed (s.+)") @command.passive(r"^(s[#/].+[#/].+)$") async def command_handler(self, evt: MessageEvent, match: SedMatch) -> None: - stmt = self._compile_passive_statement(match) + try: + await self._command_handler(evt, match) + except TimeoutError: + await evt.reply("3:<") + + async def _command_handler(self, evt: MessageEvent, match: SedMatch): + with timeout(): + stmt = self._compile_passive_statement(match) if not stmt: return try: