Skip to content

Commit

Permalink
add type hint
Browse files Browse the repository at this point in the history
  • Loading branch information
keakon committed Mar 27, 2024
1 parent cf18186 commit a8468a6
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 36 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
fail-fast: true
matrix:
python-version: [ '2.7', '3.7', '3.12' ]
python-version: [ '3.7', '3.12' ]

services:
redis:
Expand Down Expand Up @@ -54,7 +54,7 @@ jobs:
strategy:
fail-fast: true
matrix:
python-version: [ '2.7', '3.5', '3.12' ]
python-version: [ '3.5', '3.12' ]

steps:
- name: Checkout code
Expand Down
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Delayed is a simple but robust task queue inspired by [rq](https://python-rq.org

## Requirements

1. Python 2.7 or later, tested on CPython 2.7, 3.5 - 3.11. Versions before 1.0 also have been tested on PyPy and PyPy3.
1. Python 3.5 or later, tested on CPython 3.5 - 3.12. Versions before 1.0 have also been tested on CPython 2.7, PyPy and PyPy3.
2. To gracefully stop the workers, Unix-like systems (with Unix signal) are required, tested on Ubuntu 22.04 and macOS Monterey 12.
3. Redis 2.6.0 or later (with Lua scripts).

Expand Down Expand Up @@ -178,16 +178,17 @@ A: Adds a `logging.DEBUG` level handler to `delayed.logger.logger`. The simplest
## Release notes
* 1.0:
1. Supports Go, adds `GoTask`.
2. Use MessagePack instead of pickle to serialize / deserialize tasks. (BREAKING CHANGE)
3. Removes `ForkedWorker` and `PreforkedWorker`. You can use `Worker` instead. (BREAKING CHANGE)
4. Changes params of `Queue()`, removes `default_timeout`, `requeue_timeout` and `busy_len`, adds `dequeue_timeout` and `keep_alive_timeout`. (BREAKING CHANGE)
5. Rename `Task` to `PyTask`. (BREAKING CHANGE)
6. Removes those properties of `PyTask`: `id`, `func_path`, `args` and `kwargs`. (BREAKING CHANGE)
7. Removes those params of `PyTask()`: `id`, `timeout`, `prior` and `error_handler_path`. (BREAKING CHANGE)
8. Removes `PyTask.create()`. You can use `PyTask()` instead. (BREAKING CHANGE)
9. Rename `func_path` param of `PyTask()` to `func`, it accepts both `callable` and `str`. (BREAKING CHANGE)
10. Removes `delayed.delay()`. Removes params of `delayed.delayed()`. (BREAKING CHANGE)
1. Python 2.7 is not supported anymore. (BREAKING CHANGE)
2. Supports Go, adds `GoTask`.
3. Use MessagePack instead of pickle to serialize / deserialize tasks. (BREAKING CHANGE)
4. Removes `ForkedWorker` and `PreforkedWorker`. You can use `Worker` instead. (BREAKING CHANGE)
5. Changes params of `Queue()`, removes `default_timeout`, `requeue_timeout` and `busy_len`, adds `dequeue_timeout` and `keep_alive_timeout`. (BREAKING CHANGE)
6. Rename `Task` to `PyTask`. (BREAKING CHANGE)
7. Removes those properties of `PyTask`: `id`, `func_path`, `args` and `kwargs`. (BREAKING CHANGE)
8. Removes those params of `PyTask()`: `id`, `timeout`, `prior` and `error_handler_path`. (BREAKING CHANGE)
9. Removes `PyTask.create()`. You can use `PyTask()` instead. (BREAKING CHANGE)
10. Rename `func_path` param of `PyTask()` to `func`, it accepts both `callable` and `str`. (BREAKING CHANGE)
11. Removes `delayed.delay()`. Removes params of `delayed.delayed()`. (BREAKING CHANGE)
* 0.11:
1. Sleeps random time when a `Worker` fails to pop a `task` before retrying.
Expand Down
4 changes: 2 additions & 2 deletions delayed/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def set_handler(handler):
logger.handlers = [handler]


def setup_logger(date_format=DEFAULT_DATE_FORMAT, log_format=DEFAULT_LOG_FORMAT):
def setup_logger(date_format: str = DEFAULT_DATE_FORMAT, log_format: str = DEFAULT_LOG_FORMAT):
"""Setup a console logger.
Args:
Expand All @@ -36,7 +36,7 @@ def setup_logger(date_format=DEFAULT_DATE_FORMAT, log_format=DEFAULT_LOG_FORMAT)
logger.addHandler(handler)


def _setup_handler(handler, date_format, log_format):
def _setup_handler(handler, date_format: str, log_format: str):
"""Setup a handler for the logger.
Args:
Expand Down
19 changes: 12 additions & 7 deletions delayed/queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# -*- coding: utf-8 -*-

from typing import Optional, Union

import redis

from .logger import logger
from .task import PyTask

Expand Down Expand Up @@ -40,7 +44,7 @@
_PROCESSING_KEY_SUFFIX = '_processing'


class Queue(object):
class Queue:
"""Queue is the class of a task queue.
Args:
Expand All @@ -51,8 +55,8 @@ class Queue(object):
keep_alive_timeout (int or float): The keep alive timeout in seconds of the worker.
"""

def __init__(self, name, conn, dequeue_timeout=1, keep_alive_timeout=60):
self._worker_id = None
def __init__(self, name: str, conn: redis.Redis, dequeue_timeout: Union[int, float] = 1, keep_alive_timeout: Union[int, float] = 60):
self._worker_id: Optional[bytes] = None
self._name = name
self._noti_key = name + _NOTI_KEY_SUFFIX
self._processing_key = name + _PROCESSING_KEY_SUFFIX
Expand All @@ -62,7 +66,7 @@ def __init__(self, name, conn, dequeue_timeout=1, keep_alive_timeout=60):
self._dequeue_script = conn.register_script(_DEQUEUE_SCRIPT)
self._requeue_lost_script = conn.register_script(_REQUEUE_LOST_SCRIPT)

def enqueue(self, task):
def enqueue(self, task: PyTask):
"""Enqueues a task to the queue.
Args:
Expand All @@ -76,7 +80,7 @@ def enqueue(self, task):
pipe.execute()
logger.debug('Enqueued task %s.', task._func_path)

def dequeue(self):
def dequeue(self) -> Optional[PyTask]:
"""Dequeues a task from the queue.
Returns:
Expand All @@ -100,11 +104,11 @@ def release(self):
self._conn.hdel(self._processing_key, self._worker_id)
logger.debug('Released the task of worker %s.', self._worker_id)

def len(self):
def len(self) -> int:
"""Returns the length of the queue."""
return self._conn.llen(self._name)

def requeue_lost(self):
def requeue_lost(self) -> int:
"""Requeues lost tasks.
It should be called periodically to prevent losing tasks.
The lost tasks were those popped from the queue, but its dead worker hadn't released it.
Expand All @@ -114,6 +118,7 @@ def requeue_lost(self):
"""
count = self._requeue_lost_script(
keys=(self._name, self._noti_key, self._processing_key))
logger.debug(type(count))
if count >= 1:
if count == 1:
logger.debug('Requeued 1 lost task.')
Expand Down
6 changes: 4 additions & 2 deletions delayed/sweeper.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# -*- coding: utf-8 -*-

import time
from typing import Union

from .constants import Status
from .logger import logger
from .queue import Queue


class Sweeper(object):
class Sweeper:
"""Sweeper keeps recovering lost tasks.
Args:
Expand All @@ -15,7 +17,7 @@ class Sweeper(object):
It tries to requeue lost tasks every `interval` seconds.
"""

def __init__(self, queues, interval=60):
def __init__(self, queues: list[Queue], interval: Union[int, float] = 60):
self._queues = queues
self._interval = interval
self._status = Status.STOPPED
Expand Down
17 changes: 9 additions & 8 deletions delayed/task.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# -*- coding: utf-8 -*-

from importlib import import_module
from typing import Any, Callable, Optional, Union

from msgpack import packb, unpackb

from .constants import SEP
from .logger import logger


class PyTask(object):
class PyTask:
"""PyTask is the class of a Python task.
Args:
Expand All @@ -21,7 +22,7 @@ class PyTask(object):

__slots__ = ['_func_path', '_args', '_kwargs', '_data']

def __init__(self, func, args=None, kwargs=None):
def __init__(self, func: Union[Callable, str], args: Union[list, tuple, None] = None, kwargs: Optional[dict] = None):
if isinstance(func, str):
self._func_path = func
elif callable(func):
Expand All @@ -32,7 +33,7 @@ def __init__(self, func, args=None, kwargs=None):
self._kwargs = {} if kwargs is None else kwargs
self._data = None

def serialize(self):
def serialize(self) -> Optional[bytes]:
"""Serializes the task to bytes.
Returns:
Expand All @@ -52,7 +53,7 @@ def serialize(self):
return self._data

@classmethod
def deserialize(cls, data):
def deserialize(cls, data) -> 'PyTask':
"""Deserialize a task from the bytes.
Args:
Expand All @@ -65,7 +66,7 @@ def deserialize(cls, data):
task._data = data
return task

def execute(self):
def execute(self) -> Any:
"""Executes the task.
Returns:
Expand All @@ -78,7 +79,7 @@ def execute(self):
return func(*self._args, **self._kwargs)


class GoTask(object):
class GoTask:
"""GoTask is the class of a Go task.
Args:
Expand All @@ -90,13 +91,13 @@ class GoTask(object):

__slots__ = ['_func_path', '_args', '_payload', '_data']

def __init__(self, func_path, args=None):
def __init__(self, func_path: str, args: Optional[list] = None):
self._func_path = func_path
self._args = args
self._payload = None
self._data = None

def serialize(self):
def serialize(self) -> Optional[bytes]:
"""Serializes the task to bytes.
Returns:
Expand Down
10 changes: 6 additions & 4 deletions delayed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@
import signal
import threading
import time
from typing import Union

from .constants import DEFAULT_SLEEP_TIME, MAX_SLEEP_TIME, Status
from .keep_alive import KeepAliveThread
from .logger import logger
from .queue import Queue
from .task import PyTask


class Worker(object):
class Worker:
"""Worker is the class of Python task worker.
Args:
queue (delayed.queue.Queue): The task queue of the worker.
keep_alive_interval (int or float): The worker marks itself as alive for every `keep_alive_interval` seconds.
"""

def __init__(self, queue, keep_alive_interval=15):
def __init__(self, queue: Queue, keep_alive_interval: Union[int, float] = 15):
queue._worker_id = self._id = binascii.hexlify(os.urandom(16))
self._queue = queue
self._keep_alive_interval = keep_alive_interval
Expand Down Expand Up @@ -70,7 +72,7 @@ def stop(self):
logger.debug('Stopping worker %s.', self._id)
self._status = Status.STOPPING

def _requeue_task(self, task):
def _requeue_task(self, task: PyTask):
"""Requeues a dequeued task.
Args:
Expand Down

0 comments on commit a8468a6

Please sign in to comment.