Skip to content

Commit

Permalink
fix(wait_for): Support stop wait_for by event
Browse files Browse the repository at this point in the history
During prepare new nemesis BootstrapStreaming error, for some case
it is need to stop wait_for function ealier and don't wait whole timeout

example when you expect that bootstrap will failed and you can find
appropriate error in log, but not need to raise any exception or critical
event, wait for db up will continue to wait while port will be available

For such case it will be good to have flag, if it is set,
than stop wait_for function
  • Loading branch information
aleksbykov authored and fruch committed Oct 23, 2023
1 parent d60806e commit 54bc99f
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 8 deletions.
8 changes: 8 additions & 0 deletions sdcm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,11 @@ class BootstrapStreamErrorFailure(Exception): # pylint: disable=too-few-public-

class KillNemesis(BaseException):
"""Exception that would be raised, when a nemesis thread is killed at teardown of the test"""


class WaitForTimeoutError(Exception):
"""Exception that would be raised timeout exceeded in wait.wait_for function"""


class ExitByEventError(Exception):
"""Exception that would be raised if wait.wait_for stopped by event"""
20 changes: 13 additions & 7 deletions sdcm/wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@

import tenacity

from sdcm.exceptions import WaitForTimeoutError, ExitByEventError

LOGGER = logging.getLogger("sdcm.wait")

R = TypeVar("R") # pylint: disable=invalid-name


class WaitForTimeoutError(Exception):
pass


def wait_for(func, step=1, text=None, timeout=None, throw_exc=True, **kwargs):
def wait_for(func, step=1, text=None, timeout=None, throw_exc=True, stop_event=None, **kwargs): # pylint: disable=too-many-arguments
"""
Wrapper function to wait with timeout option.
Expand All @@ -42,6 +39,7 @@ def wait_for(func, step=1, text=None, timeout=None, throw_exc=True, **kwargs):
:param timeout: Timeout in seconds
:param throw_exc: Raise exception if timeout expired, but disrupt_func result is not True
:param kwargs: Keyword arguments to disrupt_func
:param stop_event: instance of threading.Event class to stop retrying
:return: Return value of disrupt_func.
"""
if not timeout:
Expand All @@ -57,11 +55,14 @@ def retry_logger(retry_state):
retry_state.attempt_number,
retry_state.outcome._exception or retry_state.outcome._result,
)
stops = [tenacity.stop_after_delay(timeout)]
if stop_event:
stops.append(tenacity.stop.stop_when_event_set(stop_event))

try:
retry = tenacity.Retrying(
reraise=throw_exc,
stop=tenacity.stop_after_delay(timeout),
stop=tenacity.stop_any(*stops),
wait=tenacity.wait_fixed(step),
before_sleep=retry_logger,
retry=(tenacity.retry_if_result(lambda value: not value) | tenacity.retry_if_exception_type())
Expand All @@ -70,14 +71,19 @@ def retry_logger(retry_state):

except Exception as ex: # pylint: disable=broad-except
err = f"Wait for: {text or func.__name__}: timeout - {timeout} seconds - expired"
raising_exc = WaitForTimeoutError(err)
if stop_event and stop_event.is_set():
err = f": {text or func.__name__}: stopped by Event"
raising_exc = ExitByEventError(err)

LOGGER.error(err)
if hasattr(ex, 'last_attempt') and ex.last_attempt.exception() is not None: # pylint: disable=no-member
LOGGER.error("last error: %r", ex.last_attempt.exception()) # pylint: disable=no-member
else:
LOGGER.error("last error: %r", ex)
if throw_exc:
if hasattr(ex, 'last_attempt') and not ex.last_attempt._result: # pylint: disable=protected-access,no-member
raise WaitForTimeoutError(err) from ex
raise raising_exc from ex
raise

return res
Expand Down
106 changes: 105 additions & 1 deletion unit_tests/test_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import time
import unittest

from concurrent.futures import ThreadPoolExecutor

import pytest

from sdcm.cluster import BaseNode
from sdcm.wait import wait_for, wait_for_log_lines
from sdcm.wait import wait_for, wait_for_log_lines, WaitForTimeoutError, ExitByEventError

logging.basicConfig(level=logging.DEBUG)

Expand Down Expand Up @@ -80,6 +82,108 @@ def callback(arg1, arg2):
self.assertEqual(len(calls), 1)


from parameterized import parameterized


class TestSdcmWaitWithEventStop(unittest.TestCase):

def setUp(self):
self.calls = []
self.callback_return_true_after = 0
self.ev = threading.Event()

def tearDown(self):
self.calls = []
self.callback_return_true_after = 0
self.ev.set()

def callback(self, arg1, arg2):
self.calls.append((arg1, arg2))
if len(self.calls) == self.callback_return_true_after:
return "what ever"
return False

def set_stop_in_timeout(self, ev: threading.Event, set_after: int):
while not ev.is_set():
if len(self.calls) == set_after:
ev.set()

@parameterized.expand([(True, ), (False, )])
def test_04_stop_by_event(self, throw_exc):
self.callback_return_true_after = 3
th = threading.Thread(target=self.set_stop_in_timeout, kwargs={"ev": self.ev, "set_after": 1})
th.start()
if throw_exc:
self.assertRaisesRegex(ExitByEventError, "callback: stopped by Event", wait_for,
self.callback, timeout=3, throw_exc=throw_exc, stop_event=self.ev, step=0.5, arg1=1, arg2=3)
else:
res = wait_for(self.callback, timeout=3, step=.5, throw_exc=throw_exc, stop_event=self.ev, arg1=1, arg2=3)
self.assertFalse(res)

self.assertTrue(len(self.calls) < 6, f"{len(self.calls)}")

def test_04_stop_by_event_in_main_thread(self):
self.callback_return_true_after = 3
th = ThreadPoolExecutor(max_workers=1).submit(wait_for, func=self.callback, timeout=self.callback_return_true_after,
step=.5, throw_exc=False, stop_event=self.ev, arg1=1, arg2=3)

self.set_stop_in_timeout(self.ev, set_after=1)
res = th.result()
exc = th.exception()
self.assertFalse(exc, f"{exc}")
self.assertFalse(res, f"{res}")
self.assertTrue(len(self.calls) < 5)

def test_04_return_result_before_stop_event_and_wait_timeout(self):
self.callback_return_true_after = 2
th = threading.Thread(target=self.set_stop_in_timeout, kwargs={"ev": self.ev, "set_after": 4})
th.start()
res = wait_for(self.callback, timeout=3, step=.5, throw_exc=False, stop_event=self.ev, arg1=1, arg2=3)
self.assertEqual(res, "what ever")
self.assertEqual(len(self.calls), 2)

def test_04_raise_by_timeout_before_set_event(self):
self.callback_return_true_after = 8

th = threading.Thread(target=self.set_stop_in_timeout, kwargs={"ev": self.ev, "set_after": 7})
th.start()
self.assertRaisesRegex(WaitForTimeoutError, "callback: timeout - 3 seconds - expired", wait_for,
self.callback, timeout=3, throw_exc=True, stop_event=self.ev, step=0.5, arg1=1, arg2=3)
self.assertEqual(len(self.calls), 7)

@parameterized.expand([(True, ), (False, )])
def test_04_raise_exception_in_func_before_set_event(self, throw_exc):

def callback(arg1, arg2):
self.calls.append((arg1, arg2))
if len(self.calls) == 3:
raise Exception("Raise before event")

if len(self.calls) == 10:
return "what ever"
return False
th = threading.Thread(target=self.set_stop_in_timeout, kwargs={"ev": self.ev, "set_after": 5})
th.start()
if throw_exc == True:
self.assertRaisesRegex(ExitByEventError, "callback: stopped by Event", wait_for,
callback, timeout=4, throw_exc=throw_exc, stop_event=self.ev, step=0.5, arg1=1, arg2=3)
else:
res = wait_for(callback, timeout=4, throw_exc=throw_exc, stop_event=self.ev, step=.5, arg1=1, arg2=3)
self.assertFalse(res)
self.assertEqual(len(self.calls), 6)

def test_04_set_event_timeout_at_same_time(self):
""" if event was set at same time as timeout exceed
and throw_exc is true wait_for will raise Exception with
message wait_for stopped by event"""
self.callback_return_true_after = 8
th = threading.Thread(target=self.set_stop_in_timeout, kwargs={"ev": self.ev, "set_after": 4})
th.start()
self.assertRaisesRegex(ExitByEventError, "callback: stopped by Event", wait_for,
self.callback, timeout=4, throw_exc=True, stop_event=self.ev, step=0.5, arg1=1, arg2=3)
self.assertEqual(len(self.calls), 5)


class DummyNode(BaseNode):
name = "node_1"
system_log = ""
Expand Down

0 comments on commit 54bc99f

Please sign in to comment.