Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin authored Apr 1, 2024
2 parents 305a003 + 9731f54 commit 253867e
Show file tree
Hide file tree
Showing 24 changed files with 557 additions and 363 deletions.
74 changes: 42 additions & 32 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
push:
branches:
- main
- v0.23-branch
tags:
- '**'
pull_request: {}
Expand All @@ -14,12 +13,12 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: set up python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: '3.10'
python-version: '3.11'

- run: pip install -r requirements/linting.txt -r requirements/pyproject.txt pre-commit

Expand All @@ -29,20 +28,20 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: set up python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: '3.10'
python-version: '3.11'

- run: pip install -r requirements/docs.txt -r requirements/pyproject.txt
- run: pip install .

- run: make docs

- name: Store docs site
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: docs
path: docs/_build/
Expand All @@ -53,21 +52,21 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu]
python: ['3.7', '3.8', '3.9', '3.10', '3.11']
python: ['3.8', '3.9', '3.10', '3.11', '3.12']
redis: ['5']
include:
- python: '3.10'
- python: '3.11'
redis: '6'
os: 'ubuntu'
- python: '3.10'
- python: '3.11'
redis: '7'
os: 'ubuntu'

env:
PYTHON: ${{ matrix.python }}
OS: ${{ matrix.os }}

runs-on: ${{ format('{0}-latest', matrix.os) }}
runs-on: ${{ matrix.os }}-latest

services:
redis:
Expand All @@ -77,10 +76,10 @@ jobs:
options: --entrypoint redis-server

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: set up python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python }}

Expand All @@ -90,50 +89,61 @@ jobs:

- run: coverage xml

- uses: codecov/codecov-action@v2
- uses: codecov/codecov-action@v4
with:
file: ./coverage.xml
env_vars: PYTHON,OS

deploy:
name: Deploy
check:
if: always()
needs: [lint, docs, test]
runs-on: ubuntu-latest

steps:
- name: Decide whether the needed jobs succeeded or failed
uses: re-actors/alls-green@release/v1
id: all-green
with:
jobs: ${{ toJSON(needs) }}

release:
name: Release
needs: [check]
if: "success() && startsWith(github.ref, 'refs/tags/')"
runs-on: ubuntu-latest
environment: release

permissions:
id-token: write

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: get docs
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: docs
path: docs/_build/

- name: set up python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: '3.10'
python-version: '3.11'

- name: install
run: pip install -U twine build packaging
run: pip install -U build

- name: check version
id: check-version
run: python <(curl -Ls https://gist.githubusercontent.com/samuelcolvin/4e1ad439c5489e8d6478cdee3eb952ef/raw/check_version.py)
env:
VERSION_PATH: 'arq/version.py'
uses: samuelcolvin/check-python[email protected]
with:
version_file_path: 'arq/version.py'

- name: build
run: python -m build

- run: twine check dist/*

- name: upload to pypi
run: twine upload dist/*
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.pypi_token }}
- name: Upload package to PyPI
uses: pypa/gh-action-pypi-publish@release/v1

- name: publish docs
if: '!fromJSON(steps.check-version.outputs.IS_PRERELEASE)'
Expand Down
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ repos:

- repo: local
hooks:
- id: lint
name: Lint
entry: make lint
- id: format
name: Format
entry: make format
types: [python]
language: system
pass_filenames: false
Expand Down
28 changes: 20 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
.DEFAULT_GOAL := all
isort = isort arq tests
black = black arq tests
sources = arq tests

.PHONY: install
install:
pip install -U pip pre-commit
pip install -U pip pre-commit pip-tools
pip install -r requirements/all.txt
pip install -e .[watch]
pre-commit install

.PHONY: refresh-lockfiles
refresh-lockfiles:
find requirements/ -name '*.txt' ! -name 'all.txt' -type f -delete
make update-lockfiles

.PHONY: update-lockfiles
update-lockfiles:
@echo "Updating requirements/*.txt files using pip-compile"
pip-compile -q --strip-extras -o requirements/linting.txt requirements/linting.in
pip-compile -q --strip-extras -o requirements/testing.txt requirements/testing.in
pip-compile -q --strip-extras -o requirements/docs.txt requirements/docs.in
pip-compile -q --strip-extras -o requirements/pyproject.txt pyproject.toml --all-extras
pip install --dry-run -r requirements/all.txt

.PHONY: format
format:
$(isort)
$(black)
ruff check --fix $(sources)
ruff format $(sources)

.PHONY: lint
lint:
flake8 --max-complexity 10 --max-line-length 120 --ignore E203,W503 arq/ tests/
$(isort) --check-only --df
$(black) --check
ruff check $(sources)
ruff format --check $(sources)

.PHONY: test
test:
Expand Down
2 changes: 1 addition & 1 deletion arq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def watch_reload(path: str, worker_settings: 'WorkerSettingsType') -> None
except ImportError as e: # pragma: no cover
raise ImportError('watchfiles not installed, use `pip install watchfiles`') from e

loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
stop_event = asyncio.Event()

def worker_on_stop(s: Signals) -> None:
Expand Down
50 changes: 33 additions & 17 deletions arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from operator import attrgetter
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union, cast
from urllib.parse import parse_qs, urlparse
from uuid import uuid4

from redis.asyncio import ConnectionPool, Redis
from redis.asyncio.retry import Retry
from redis.asyncio.sentinel import Sentinel
from redis.exceptions import RedisError, WatchError

Expand Down Expand Up @@ -43,20 +44,28 @@ class RedisSettings:
conn_timeout: int = 1
conn_retries: int = 5
conn_retry_delay: int = 1
max_connections: Optional[int] = None

sentinel: bool = False
sentinel_master: str = 'mymaster'

retry_on_timeout: bool = False
retry_on_error: Optional[List[Exception]] = None
retry: Optional[Retry] = None

@classmethod
def from_dsn(cls, dsn: str) -> 'RedisSettings':
conf = urlparse(dsn)
assert conf.scheme in {'redis', 'rediss', 'unix'}, 'invalid DSN scheme'
if conf.scheme not in {'redis', 'rediss', 'unix'}:
raise RuntimeError('invalid DSN scheme')
query_db = parse_qs(conf.query).get('db')
if query_db:
# e.g. redis://localhost:6379?db=1
database = int(query_db[0])
else:
elif conf.scheme != 'unix':
database = int(conf.path.lstrip('/')) if conf.path else 0
else:
database = 0
return RedisSettings(
host=conf.hostname or 'localhost',
port=conf.port or 6379,
Expand Down Expand Up @@ -138,7 +147,8 @@ async def enqueue_job(
_queue_name = self.default_queue_name
job_id = _job_id or uuid4().hex
job_key = job_key_prefix + job_id
assert not (_defer_until and _defer_by), "use either 'defer_until' or 'defer_by' or neither, not both"
if _defer_until and _defer_by:
raise RuntimeError("use either 'defer_until' or 'defer_by' or neither, not both")

defer_by_ms = to_ms(_defer_by)
expires_ms = to_ms(_expires)
Expand All @@ -161,8 +171,8 @@ async def enqueue_job(

job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
pipe.multi()
pipe.psetex(job_key, expires_ms, job) # type: ignore[no-untyped-call]
pipe.zadd(_queue_name, {job_id: score}) # type: ignore[unused-coroutine]
pipe.psetex(job_key, expires_ms, job)
pipe.zadd(_queue_name, {job_id: score})
try:
await pipe.execute()
except WatchError:
Expand Down Expand Up @@ -190,9 +200,11 @@ async def all_job_results(self) -> List[JobResult]:
async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
key = job_key_prefix + job_id.decode()
v = await self.get(key)
assert v is not None, f'job "{key}" not found'
if v is None:
raise RuntimeError(f'job "{key}" not found')
jd = deserialize_job(v, deserializer=self.job_deserializer)
jd.score = score
jd.job_id = job_id.decode()
return jd

async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef]:
Expand All @@ -206,7 +218,7 @@ async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef]


async def create_pool(
settings_: RedisSettings = None,
settings_: Optional[RedisSettings] = None,
*,
retry: int = 0,
job_serializer: Optional[Serializer] = None,
Expand All @@ -221,9 +233,8 @@ async def create_pool(
"""
settings: RedisSettings = RedisSettings() if settings_ is None else settings_

assert not (
type(settings.host) is str and settings.sentinel
), "str provided for 'host' but 'sentinel' is true; list of sentinels expected"
if isinstance(settings.host, str) and settings.sentinel:
raise RuntimeError("str provided for 'host' but 'sentinel' is true; list of sentinels expected")

if settings.sentinel:

Expand All @@ -234,7 +245,8 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
ssl=settings.ssl,
**kwargs,
)
return client.master_for(settings.sentinel_master, redis_class=ArqRedis)
redis = client.master_for(settings.sentinel_master, redis_class=ArqRedis)
return cast(ArqRedis, redis)

else:
pool_factory = functools.partial(
Expand All @@ -250,6 +262,10 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
ssl_ca_certs=settings.ssl_ca_certs,
ssl_ca_data=settings.ssl_ca_data,
ssl_check_hostname=settings.ssl_check_hostname,
retry=settings.retry,
retry_on_timeout=settings.retry_on_timeout,
retry_on_error=settings.retry_on_error,
max_connections=settings.max_connections,
)

while True:
Expand All @@ -266,7 +282,7 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:
except (ConnectionError, OSError, RedisError, asyncio.TimeoutError) as e:
if retry < settings.conn_retries:
logger.warning(
'redis connection error %s:%s %s %s, %d retries remaining...',
'redis connection error %s:%s %s %s, %s retries remaining...',
settings.host,
settings.port,
e.__class__.__name__,
Expand All @@ -285,10 +301,10 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis:

async def log_redis_info(redis: 'Redis[bytes]', log_func: Callable[[str], Any]) -> None:
async with redis.pipeline(transaction=False) as pipe:
pipe.info(section='Server') # type: ignore[unused-coroutine]
pipe.info(section='Memory') # type: ignore[unused-coroutine]
pipe.info(section='Clients') # type: ignore[unused-coroutine]
pipe.dbsize() # type: ignore[unused-coroutine]
pipe.info(section='Server')
pipe.info(section='Memory')
pipe.info(section='Clients')
pipe.dbsize()
info_server, info_memory, info_clients, key_count = await pipe.execute()

redis_version = info_server.get('redis_version', '?')
Expand Down
Loading

0 comments on commit 253867e

Please sign in to comment.