From 06d0c16571044928bb084ee8d6070b5caa1f7168 Mon Sep 17 00:00:00 2001 From: mernmic Date: Tue, 17 Jan 2023 12:30:00 -0600 Subject: [PATCH 01/12] extend RedisSettings retry settings --- arq/connections.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/arq/connections.py b/arq/connections.py index d4fc4434..01e86576 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -9,6 +9,7 @@ from uuid import uuid4 from redis.asyncio import ConnectionPool, Redis +from redis.asyncio.retry import Retry from redis.asyncio.sentinel import Sentinel from redis.exceptions import RedisError, WatchError @@ -47,6 +48,10 @@ class RedisSettings: sentinel: bool = False sentinel_master: str = 'mymaster' + retry_on_timeout: bool = False + retry_on_error: Optional[List[Any]] = None + retry: Optional[Retry] = None + @classmethod def from_dsn(cls, dsn: str) -> 'RedisSettings': conf = urlparse(dsn) @@ -250,6 +255,9 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis: ssl_ca_certs=settings.ssl_ca_certs, ssl_ca_data=settings.ssl_ca_data, ssl_check_hostname=settings.ssl_check_hostname, + retry=settings.retry, + retry_on_timeout=settings.retry_on_timeout, + retry_on_error=settings.retry_on_error, ) while True: From 4cb9d78b9c21c1c01490c128d54263a63a2bca56 Mon Sep 17 00:00:00 2001 From: mernmic Date: Tue, 17 Jan 2023 13:09:25 -0600 Subject: [PATCH 02/12] fix type and settings test --- arq/connections.py | 3 +-- tests/test_utils.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index 01e86576..68a7bb70 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -9,7 +9,6 @@ from uuid import uuid4 from redis.asyncio import ConnectionPool, Redis -from redis.asyncio.retry import Retry from redis.asyncio.sentinel import Sentinel from redis.exceptions import RedisError, WatchError @@ -50,7 +49,7 @@ class RedisSettings: retry_on_timeout: bool = False retry_on_error: Optional[List[Any]] = None - retry: Optional[Retry] = None + retry: Optional[Any] = None @classmethod def from_dsn(cls, dsn: str) -> 'RedisSettings': diff --git a/tests/test_utils.py b/tests/test_utils.py index e499d85f..5dbddfd8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -21,7 +21,7 @@ def test_settings_changed(): "RedisSettings(host='localhost', port=123, unix_socket_path=None, database=0, username=None, password=None, " "ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs='required', ssl_ca_certs=None, " 'ssl_ca_data=None, ssl_check_hostname=False, conn_timeout=1, conn_retries=5, conn_retry_delay=1, ' - "sentinel=False, sentinel_master='mymaster')" + "sentinel=False, sentinel_master='mymaster', retry_on_timeout=False, retry_on_error=None, retry=None)" ) == str(settings) From cf654152d1148cb621584d4ee3ce563c0fd93150 Mon Sep 17 00:00:00 2001 From: mernmic Date: Wed, 18 Jan 2023 09:07:00 -0600 Subject: [PATCH 03/12] add redis.Retry type --- arq/connections.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/arq/connections.py b/arq/connections.py index 68a7bb70..017e1fd5 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -8,6 +8,7 @@ from urllib.parse import parse_qs, urlparse from uuid import uuid4 +import redis.retry from redis.asyncio import ConnectionPool, Redis from redis.asyncio.sentinel import Sentinel from redis.exceptions import RedisError, WatchError @@ -49,7 +50,7 @@ class RedisSettings: retry_on_timeout: bool = False retry_on_error: Optional[List[Any]] = None - retry: Optional[Any] = None + retry: Optional[redis.retry.Retry] = None @classmethod def from_dsn(cls, dsn: str) -> 'RedisSettings': From 94c4f74846806cb814593c0fb963b236e4b48162 Mon Sep 17 00:00:00 2001 From: mernmic Date: Wed, 18 Jan 2023 09:15:14 -0600 Subject: [PATCH 04/12] fix test to allow arbitrary types --- tests/test_utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_utils.py b/tests/test_utils.py index 5dbddfd8..1b471019 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -112,6 +112,9 @@ def test_redis_settings_validation(): class Settings(BaseModel): redis_settings: RedisSettings + class Config: + arbitrary_types_allowed = True + @validator('redis_settings', always=True, pre=True) def parse_redis_settings(cls, v): if isinstance(v, str): From ccf6bcb23c993b5254de583bbf739b240a62770c Mon Sep 17 00:00:00 2001 From: mernmic Date: Wed, 18 Jan 2023 16:13:28 -0600 Subject: [PATCH 05/12] add testing for retry settings --- tests/conftest.py | 40 +++++++++++++++++++++++++ tests/test_worker.py | 70 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 755aeec6..65289455 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,9 @@ import msgpack import pytest +import redis.exceptions +from redis.asyncio.retry import Retry +from redis.backoff import NoBackoff from redislite import Redis from arq.connections import ArqRedis, create_pool @@ -52,6 +55,21 @@ async def arq_redis_msgpack(loop): await redis_.close(close_connection_pool=True) +@pytest.fixture +async def arq_redis_retry(loop): + redis_ = ArqRedis( + host='localhost', + port=6379, + encoding='utf-8', + retry=Retry(backoff=NoBackoff(), retries=3), + retry_on_timeout=True, + retry_on_error=[redis.exceptions.ConnectionError], + ) + await redis_.flushall() + yield redis_ + await redis_.close(close_connection_pool=True) + + @pytest.fixture async def worker(arq_redis): worker_: Worker = None @@ -69,6 +87,28 @@ def create(functions=[], burst=True, poll_delay=0, max_jobs=10, arq_redis=arq_re await worker_.close() +@pytest.fixture +async def worker_retry(arq_redis_retry): + worker_retry_: Worker = None + + def create(functions=[], burst=True, poll_delay=0, max_jobs=10, arq_redis=arq_redis_retry, **kwargs): + nonlocal worker_retry_ + worker_retry_ = Worker( + functions=functions, + redis_pool=arq_redis, + burst=burst, + poll_delay=poll_delay, + max_jobs=max_jobs, + **kwargs, + ) + return worker_retry_ + + yield create + + if worker_retry_: + await worker_retry_.close() + + @pytest.fixture(name='create_pool') async def fix_create_pool(loop): pools = [] diff --git a/tests/test_worker.py b/tests/test_worker.py index aa56085b..6379b97a 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -9,6 +9,7 @@ import msgpack import pytest +import redis.exceptions from arq.connections import ArqRedis, RedisSettings from arq.constants import abort_jobs_ss, default_queue_name, expires_extra_ms, health_check_key_suffix, job_key_prefix @@ -988,3 +989,72 @@ async def test_worker_timezone_defaults_to_system_timezone(worker): worker = worker(functions=[func(foobar)]) assert worker.timezone is not None assert worker.timezone == datetime.now().astimezone().tzinfo + + +@pytest.mark.parametrize( + 'exception_thrown', + [ + redis.exceptions.ConnectionError('Error while reading from host'), + redis.exceptions.TimeoutError('Timeout reading from host'), + ], +) +async def test_worker_retry(mocker, worker_retry, exception_thrown): + # Testing redis exceptions, with retry settings specified + try: + worker_retry = worker_retry(functions=[func(foobar)]) + + # baseline + assert await worker_retry.pool.ping() + + # patch db read_response + mocker.patch.object( + worker_retry.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown + ) + + # spy method handling call_with_retry failure + retry_spy = mocker.spy(worker_retry.pool, '_disconnect_raise') + + # assert exception thrown + with pytest.raises(type(exception_thrown)): + await worker_retry.pool.ping() + + # assert retry counts and no exception thrown during '_disconnect_raise' + assert retry_spy.call_count == 4 # retries setting + 1 + assert retry_spy.spy_exception is None + finally: + # cleanup patch for post test flushall + mocker.patch.object( + worker_retry.pool.connection_pool.connection_class, + 'read_response', + return_value=redis.asyncio.connection.Connection.read_response, + ) + + +@pytest.mark.parametrize( + 'exception_thrown', + [ + redis.exceptions.ConnectionError('Error while reading from host'), + redis.exceptions.TimeoutError('Timeout reading from host'), + ], +) +async def test_worker_crash(mocker, worker, exception_thrown): + # Testing redis exceptions, no retry settings specified + try: + worker = worker(functions=[func(foobar)]) + assert await worker.pool.ping() + + mocker.patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) + + spy = mocker.spy(worker.pool, '_disconnect_raise') + + with pytest.raises(type(exception_thrown)): + await worker.pool.ping() + + assert spy.call_count == 1 + assert spy.spy_exception == exception_thrown + finally: + mocker.patch.object( + worker.pool.connection_pool.connection_class, + 'read_response', + return_value=redis.asyncio.connection.Connection.read_response, + ) From 7947759b248bfd9b632cc4674d5692c1ed7b3471 Mon Sep 17 00:00:00 2001 From: mernmic Date: Wed, 18 Jan 2023 17:14:54 -0600 Subject: [PATCH 06/12] update tests --- tests/test_worker.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 6379b97a..7ef8e57b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1001,30 +1001,29 @@ async def test_worker_timezone_defaults_to_system_timezone(worker): async def test_worker_retry(mocker, worker_retry, exception_thrown): # Testing redis exceptions, with retry settings specified try: - worker_retry = worker_retry(functions=[func(foobar)]) + worker = worker_retry(functions=[func(foobar)]) # baseline - assert await worker_retry.pool.ping() + await worker.main() + await worker._poll_iteration() # patch db read_response - mocker.patch.object( - worker_retry.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown - ) + mocker.patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) # spy method handling call_with_retry failure - retry_spy = mocker.spy(worker_retry.pool, '_disconnect_raise') + spy = mocker.spy(worker.pool, '_disconnect_raise') # assert exception thrown with pytest.raises(type(exception_thrown)): - await worker_retry.pool.ping() + await worker._poll_iteration() # assert retry counts and no exception thrown during '_disconnect_raise' - assert retry_spy.call_count == 4 # retries setting + 1 - assert retry_spy.spy_exception is None + assert spy.call_count == 4 # retries setting + 1 + assert spy.spy_exception is None finally: # cleanup patch for post test flushall mocker.patch.object( - worker_retry.pool.connection_pool.connection_class, + worker.pool.connection_pool.connection_class, 'read_response', return_value=redis.asyncio.connection.Connection.read_response, ) @@ -1041,14 +1040,15 @@ async def test_worker_crash(mocker, worker, exception_thrown): # Testing redis exceptions, no retry settings specified try: worker = worker(functions=[func(foobar)]) - assert await worker.pool.ping() + await worker.main() + await worker._poll_iteration() mocker.patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) spy = mocker.spy(worker.pool, '_disconnect_raise') with pytest.raises(type(exception_thrown)): - await worker.pool.ping() + await worker._poll_iteration() assert spy.call_count == 1 assert spy.spy_exception == exception_thrown From 734295a16ecc11d3415cb151bf8ab1da4dcf22da Mon Sep 17 00:00:00 2001 From: mernmic Date: Wed, 18 Jan 2023 18:08:43 -0600 Subject: [PATCH 07/12] granular patch handling --- tests/test_worker.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 7ef8e57b..a31cba9f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -5,7 +5,7 @@ import signal import sys from datetime import datetime, timedelta -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import msgpack import pytest @@ -1008,7 +1008,8 @@ async def test_worker_retry(mocker, worker_retry, exception_thrown): await worker._poll_iteration() # patch db read_response - mocker.patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) + p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) + p.start() # spy method handling call_with_retry failure spy = mocker.spy(worker.pool, '_disconnect_raise') @@ -1021,12 +1022,8 @@ async def test_worker_retry(mocker, worker_retry, exception_thrown): assert spy.call_count == 4 # retries setting + 1 assert spy.spy_exception is None finally: - # cleanup patch for post test flushall - mocker.patch.object( - worker.pool.connection_pool.connection_class, - 'read_response', - return_value=redis.asyncio.connection.Connection.read_response, - ) + # stop patch to allow worker cleanup + p.stop() @pytest.mark.parametrize( @@ -1043,7 +1040,9 @@ async def test_worker_crash(mocker, worker, exception_thrown): await worker.main() await worker._poll_iteration() - mocker.patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) + # patch db read_response + p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) + p.start() spy = mocker.spy(worker.pool, '_disconnect_raise') @@ -1053,8 +1052,4 @@ async def test_worker_crash(mocker, worker, exception_thrown): assert spy.call_count == 1 assert spy.spy_exception == exception_thrown finally: - mocker.patch.object( - worker.pool.connection_pool.connection_class, - 'read_response', - return_value=redis.asyncio.connection.Connection.read_response, - ) + p.stop() From 2616be30eb44f53fd33ca842c3f5fd23440bcc4d Mon Sep 17 00:00:00 2001 From: mernmic Date: Wed, 18 Jan 2023 18:30:02 -0600 Subject: [PATCH 08/12] update comment --- tests/test_worker.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_worker.py b/tests/test_worker.py index a31cba9f..29b92933 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1044,12 +1044,16 @@ async def test_worker_crash(mocker, worker, exception_thrown): p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) p.start() + # spy method handling call_with_retry failure spy = mocker.spy(worker.pool, '_disconnect_raise') + # assert exception thrown with pytest.raises(type(exception_thrown)): await worker._poll_iteration() + # assert no retry counts and exception thrown during '_disconnect_raise' assert spy.call_count == 1 assert spy.spy_exception == exception_thrown finally: + # stop patch to allow worker cleanup p.stop() From 60ed45c27815447e0d72aa4bdb181a0a708cb060 Mon Sep 17 00:00:00 2001 From: mernmic Date: Wed, 18 Jan 2023 19:01:30 -0600 Subject: [PATCH 09/12] stop patch when exists --- tests/test_worker.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 29b92933..8e0e331f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1000,6 +1000,7 @@ async def test_worker_timezone_defaults_to_system_timezone(worker): ) async def test_worker_retry(mocker, worker_retry, exception_thrown): # Testing redis exceptions, with retry settings specified + p = None try: worker = worker_retry(functions=[func(foobar)]) @@ -1023,7 +1024,8 @@ async def test_worker_retry(mocker, worker_retry, exception_thrown): assert spy.spy_exception is None finally: # stop patch to allow worker cleanup - p.stop() + if p is not None: + p.stop() @pytest.mark.parametrize( @@ -1035,8 +1037,11 @@ async def test_worker_retry(mocker, worker_retry, exception_thrown): ) async def test_worker_crash(mocker, worker, exception_thrown): # Testing redis exceptions, no retry settings specified + p = None try: worker = worker(functions=[func(foobar)]) + + # baseline await worker.main() await worker._poll_iteration() @@ -1056,4 +1061,5 @@ async def test_worker_crash(mocker, worker, exception_thrown): assert spy.spy_exception == exception_thrown finally: # stop patch to allow worker cleanup - p.stop() + if p is not None: + p.stop() From 2df197075a3a6d2b256d91b5a298dcdfa30d58d4 Mon Sep 17 00:00:00 2001 From: mernmic Date: Thu, 19 Jan 2023 12:28:02 -0600 Subject: [PATCH 10/12] update retry type to asyncio --- arq/connections.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index 017e1fd5..01e86576 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -8,8 +8,8 @@ from urllib.parse import parse_qs, urlparse from uuid import uuid4 -import redis.retry from redis.asyncio import ConnectionPool, Redis +from redis.asyncio.retry import Retry from redis.asyncio.sentinel import Sentinel from redis.exceptions import RedisError, WatchError @@ -50,7 +50,7 @@ class RedisSettings: retry_on_timeout: bool = False retry_on_error: Optional[List[Any]] = None - retry: Optional[redis.retry.Retry] = None + retry: Optional[Retry] = None @classmethod def from_dsn(cls, dsn: str) -> 'RedisSettings': From ff233260401d28c963afc1a1cefb4d462e956f52 Mon Sep 17 00:00:00 2001 From: mernmic Date: Fri, 20 Jan 2023 10:19:04 -0600 Subject: [PATCH 11/12] chore: test cleanup --- tests/test_worker.py | 54 +++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 8e0e331f..0e6cd3c3 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1000,20 +1000,21 @@ async def test_worker_timezone_defaults_to_system_timezone(worker): ) async def test_worker_retry(mocker, worker_retry, exception_thrown): # Testing redis exceptions, with retry settings specified - p = None - try: - worker = worker_retry(functions=[func(foobar)]) + worker = worker_retry(functions=[func(foobar)]) - # baseline - await worker.main() - await worker._poll_iteration() + # patch db read_response to mimic connection exceptions + p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) - # patch db read_response - p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) - p.start() + # baseline + await worker.main() + await worker._poll_iteration() - # spy method handling call_with_retry failure - spy = mocker.spy(worker.pool, '_disconnect_raise') + # spy method handling call_with_retry failure + spy = mocker.spy(worker.pool, '_disconnect_raise') + + try: + # start patch + p.start() # assert exception thrown with pytest.raises(type(exception_thrown)): @@ -1022,10 +1023,10 @@ async def test_worker_retry(mocker, worker_retry, exception_thrown): # assert retry counts and no exception thrown during '_disconnect_raise' assert spy.call_count == 4 # retries setting + 1 assert spy.spy_exception is None + finally: # stop patch to allow worker cleanup - if p is not None: - p.stop() + p.stop() @pytest.mark.parametrize( @@ -1037,20 +1038,21 @@ async def test_worker_retry(mocker, worker_retry, exception_thrown): ) async def test_worker_crash(mocker, worker, exception_thrown): # Testing redis exceptions, no retry settings specified - p = None - try: - worker = worker(functions=[func(foobar)]) + worker = worker(functions=[func(foobar)]) - # baseline - await worker.main() - await worker._poll_iteration() + # patch db read_response to mimic connection exceptions + p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) - # patch db read_response - p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) - p.start() + # baseline + await worker.main() + await worker._poll_iteration() - # spy method handling call_with_retry failure - spy = mocker.spy(worker.pool, '_disconnect_raise') + # spy method handling call_with_retry failure + spy = mocker.spy(worker.pool, '_disconnect_raise') + + try: + # start patch + p.start() # assert exception thrown with pytest.raises(type(exception_thrown)): @@ -1059,7 +1061,7 @@ async def test_worker_crash(mocker, worker, exception_thrown): # assert no retry counts and exception thrown during '_disconnect_raise' assert spy.call_count == 1 assert spy.spy_exception == exception_thrown + finally: # stop patch to allow worker cleanup - if p is not None: - p.stop() + p.stop() From edd6014f77335755caf4d22ddfbdf4a785aaf0e4 Mon Sep 17 00:00:00 2001 From: mernmic Date: Wed, 25 Jan 2023 14:38:20 -0600 Subject: [PATCH 12/12] fix exception type --- arq/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arq/connections.py b/arq/connections.py index 01e86576..a4d2f325 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -49,7 +49,7 @@ class RedisSettings: sentinel_master: str = 'mymaster' retry_on_timeout: bool = False - retry_on_error: Optional[List[Any]] = None + retry_on_error: Optional[List[Exception]] = None retry: Optional[Retry] = None @classmethod