Skip to content

Commit

Permalink
Fixed type inference for new versions of pyright and dependencies wer…
Browse files Browse the repository at this point in the history
…e updated. (#413)
  • Loading branch information
s3rius authored Feb 26, 2025
1 parent 528c2db commit a89cac1
Show file tree
Hide file tree
Showing 14 changed files with 884 additions and 780 deletions.
9 changes: 5 additions & 4 deletions .python-version
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
3.11.4
3.10.12
3.9.17
3.8.17
3.13.1
3.12.8
3.11.11
3.10.16
3.9.21
1,443 changes: 781 additions & 662 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ classifiers = [
keywords = ["taskiq", "tasks", "distributed", "async"]

[tool.poetry.dependencies]
python = "^3.8.1"
python = "^3.9"
typing-extensions = ">=3.10.0.0"
pydantic = ">=1.0,<=3.0"
importlib-metadata = "*"
Expand All @@ -51,7 +51,7 @@ msgpack = { version = "^1.0.7", optional = true }
cbor2 = { version = "^5", optional = true }
izulu = "0.5.4"

[tool.poetry.dev-dependencies]
[tool.poetry.group.dev.dependencies]
pytest = "^7.1.2"
ruff = "^0"
black = { version = "^22.6.0", allow-prereleases = true }
Expand Down Expand Up @@ -157,8 +157,6 @@ lint.ignore = [
"D401", # First line should be in imperative mood
"D104", # Missing docstring in public package
"D100", # Missing docstring in public module
"ANN102", # Missing type annotation for self in method
"ANN101", # Missing type annotation for argument
"ANN401", # typing.Any are disallowed in `**kwargs
"PLR0913", # Too many arguments for function call
"D106", # Missing docstring in public nested class
Expand Down
46 changes: 23 additions & 23 deletions taskiq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,35 @@

__version__ = version("taskiq")
__all__ = [
"__version__",
"gather",
"Context",
"AckableMessage",
"AsyncBroker",
"TaskiqError",
"TaskiqState",
"TaskiqResult",
"ZeroMQBroker",
"TaskiqEvents",
"SecurityError",
"TaskiqMessage",
"AsyncResultBackend",
"AsyncTaskiqDecoratedTask",
"AsyncTaskiqTask",
"BrokerMessage",
"Context",
"InMemoryBroker",
"NoResultError",
"PrometheusMiddleware",
"ResultGetError",
"ResultIsReadyError",
"ScheduleSource",
"ScheduledTask",
"TaskiqDepends",
"NoResultError",
"SecurityError",
"SendTaskError",
"AckableMessage",
"InMemoryBroker",
"ScheduleSource",
"TaskiqScheduler",
"SimpleRetryMiddleware",
"TaskiqDepends",
"TaskiqError",
"TaskiqEvents",
"TaskiqFormatter",
"AsyncTaskiqTask",
"TaskiqMessage",
"TaskiqMiddleware",
"ResultIsReadyError",
"AsyncResultBackend",
"async_shared_broker",
"PrometheusMiddleware",
"SimpleRetryMiddleware",
"AsyncTaskiqDecoratedTask",
"TaskiqResult",
"TaskiqResultTimeoutError",
"TaskiqScheduler",
"TaskiqState",
"ZeroMQBroker",
"__version__",
"async_shared_broker",
"gather",
]
29 changes: 21 additions & 8 deletions taskiq/abc/middleware.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from types import CoroutineType
from typing import TYPE_CHECKING, Any, Coroutine, Union

if TYPE_CHECKING: # pragma: no cover # pragma: no cover
Expand All @@ -20,7 +21,9 @@ def set_broker(self, broker: "AsyncBroker") -> None:
"""
self.broker = broker

def startup(self) -> "Union[None, Coroutine[Any, Any, None]]":
def startup(
self,
) -> Union[None, Coroutine[Any, Any, None], "CoroutineType[Any, Any, None]"]:
"""
Startup method to perform various action during startup.
Expand All @@ -30,7 +33,9 @@ def startup(self) -> "Union[None, Coroutine[Any, Any, None]]":
:returns nothing.
"""

def shutdown(self) -> "Union[None, Coroutine[Any, Any, None]]":
def shutdown(
self,
) -> Union[None, Coroutine[Any, Any, None], "CoroutineType[Any, Any, None]"]:
"""
Shutdown method to perform various action during shutdown.
Expand All @@ -43,7 +48,11 @@ def shutdown(self) -> "Union[None, Coroutine[Any, Any, None]]":
def pre_send(
self,
message: "TaskiqMessage",
) -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":
) -> Union[
"TaskiqMessage",
"Coroutine[Any, Any, TaskiqMessage]",
"CoroutineType[Any, Any, TaskiqMessage]",
]:
"""
Hook that executes before sending the task to worker.
Expand All @@ -58,7 +67,7 @@ def pre_send(
def post_send(
self,
message: "TaskiqMessage",
) -> "Union[None, Coroutine[Any, Any, None]]":
) -> Union[None, Coroutine[Any, Any, None], "CoroutineType[Any, Any, None]"]:
"""
This hook is executed right after the task is sent.
Expand All @@ -71,7 +80,11 @@ def post_send(
def pre_execute(
self,
message: "TaskiqMessage",
) -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":
) -> Union[
"TaskiqMessage",
"Coroutine[Any, Any, TaskiqMessage]",
"CoroutineType[Any, Any, TaskiqMessage]",
]:
"""
This hook is called before executing task.
Expand All @@ -87,7 +100,7 @@ def post_execute(
self,
message: "TaskiqMessage",
result: "TaskiqResult[Any]",
) -> "Union[None, Coroutine[Any, Any, None]]":
) -> Union[None, Coroutine[Any, Any, None], "CoroutineType[Any, Any, None]"]:
"""
This hook executes after task is complete.
Expand All @@ -102,7 +115,7 @@ def post_save(
self,
message: "TaskiqMessage",
result: "TaskiqResult[Any]",
) -> "Union[None, Coroutine[Any, Any, None]]":
) -> Union[None, Coroutine[Any, Any, None], "CoroutineType[Any, Any, None]"]:
"""
Post save hook.
Expand All @@ -118,7 +131,7 @@ def on_error(
message: "TaskiqMessage",
result: "TaskiqResult[Any]",
exception: BaseException,
) -> "Union[None, Coroutine[Any, Any, None]]":
) -> Union[None, Coroutine[Any, Any, None], "CoroutineType[Any, Any, None]"]:
"""
This function is called when exception is found.
Expand Down
8 changes: 5 additions & 3 deletions taskiq/abc/schedule_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Coroutine, List, Union
from collections.abc import Coroutine
from types import CoroutineType
from typing import TYPE_CHECKING, Any, List, Union

if TYPE_CHECKING: # pragma: no cover
from taskiq.scheduler.scheduled_task import ScheduledTask
Expand Down Expand Up @@ -56,7 +58,7 @@ async def delete_schedule(self, schedule_id: str) -> None:
def pre_send( # noqa: B027
self,
task: "ScheduledTask",
) -> Union[None, Coroutine[Any, Any, None]]:
) -> Union[None, "CoroutineType[Any, Any, None]", Coroutine[Any, Any, None]]:
"""
Actions to execute before task will be sent to broker.
Expand All @@ -69,7 +71,7 @@ def pre_send( # noqa: B027
def post_send( # noqa: B027
self,
task: "ScheduledTask",
) -> Union[None, Coroutine[Any, Any, None]]:
) -> Union[None, "CoroutineType[Any, Any, None]", Coroutine[Any, Any, None]]:
"""
Actions to execute after task was sent to broker.
Expand Down
11 changes: 10 additions & 1 deletion taskiq/decor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from collections.abc import Coroutine
from datetime import datetime
from types import CoroutineType
from typing import (
TYPE_CHECKING,
Any,
Callable,
Coroutine,
Dict,
Generic,
TypeVar,
Expand Down Expand Up @@ -64,6 +65,14 @@ def __call__( # noqa: D102
) -> _ReturnType:
return self.original_func(*args, **kwargs)

@overload
async def kiq(
self: "AsyncTaskiqDecoratedTask[_FuncParams, CoroutineType[Any, Any, _T]]",
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> AsyncTaskiqTask[_T]:
...

@overload
async def kiq(
self: "AsyncTaskiqDecoratedTask[_FuncParams, Coroutine[Any, Any, _T]]",
Expand Down
11 changes: 10 additions & 1 deletion taskiq/kicker.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from collections.abc import Coroutine
from dataclasses import asdict, is_dataclass
from datetime import datetime
from logging import getLogger
from types import CoroutineType
from typing import (
TYPE_CHECKING,
Any,
Coroutine,
Dict,
Generic,
Optional,
Expand Down Expand Up @@ -107,6 +108,14 @@ def with_broker(
self.broker = broker
return self

@overload
async def kiq(
self: "AsyncKicker[_FuncParams, CoroutineType[Any, Any, _T]]",
*args: _FuncParams.args,
**kwargs: _FuncParams.kwargs,
) -> AsyncTaskiqTask[_T]: # pragma: no cover
...

@overload
async def kiq(
self: "AsyncKicker[_FuncParams, Coroutine[Any, Any, _T]]",
Expand Down
14 changes: 12 additions & 2 deletions taskiq/scheduler/created_schedule.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import TYPE_CHECKING, Any, Coroutine, Generic, TypeVar, overload
from collections.abc import Coroutine
from types import CoroutineType
from typing import TYPE_CHECKING, Any, Generic, TypeVar, overload

from taskiq.abc.schedule_source import ScheduleSource
from taskiq.scheduler.scheduled_task import ScheduledTask
Expand All @@ -25,14 +27,22 @@ def __init__(
self.task = task
self.schedule_id = task.schedule_id

@overload
async def kiq(
self: "CreatedSchedule[CoroutineType[Any,Any, _T]]",
) -> AsyncTaskiqTask[_T]:
...

@overload
async def kiq(
self: "CreatedSchedule[Coroutine[Any,Any, _T]]",
) -> AsyncTaskiqTask[_T]:
...

@overload
async def kiq(self: "CreatedSchedule[_ReturnType]") -> AsyncTaskiqTask[_ReturnType]:
async def kiq(
self: "CreatedSchedule[_ReturnType]",
) -> AsyncTaskiqTask[_ReturnType]:
...

async def kiq(self) -> Any:
Expand Down
4 changes: 2 additions & 2 deletions taskiq/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from .pickle import PickleSerializer

__all__ = [
"CBORSerializer",
"JSONSerializer",
"ORJSONSerializer",
"MSGPackSerializer",
"CBORSerializer",
"ORJSONSerializer",
"PickleSerializer",
]
68 changes: 2 additions & 66 deletions taskiq/task.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
from abc import ABC, abstractmethod
from time import time
from typing import TYPE_CHECKING, Any, Coroutine, Generic, Optional, Union
from typing import TYPE_CHECKING, Any, Generic, Optional

from typing_extensions import TypeVar

Expand All @@ -19,70 +18,7 @@
_ReturnType = TypeVar("_ReturnType")


class _Task(ABC, Generic[_ReturnType]):
"""TaskiqTask interface."""

@abstractmethod
def is_ready(self) -> Union[bool, Coroutine[Any, Any, bool]]:
"""
Method to check wether result is ready.
:return: True if result is ready.
"""

@abstractmethod
def get_result(
self,
with_logs: bool = False,
) -> Union[
"TaskiqResult[_ReturnType]",
Coroutine[Any, Any, "TaskiqResult[_ReturnType]"],
]:
"""
Get actual execution result.
:param with_logs: wether you want to fetch logs.
:return: TaskiqResult.
"""

@abstractmethod
def wait_result(
self,
check_interval: float = 0.2,
timeout: float = -1.0,
with_logs: bool = False,
) -> Union[
"TaskiqResult[_ReturnType]",
Coroutine[Any, Any, "TaskiqResult[_ReturnType]"],
]:
"""
Wait for result to become ready and get it.
This function constantly checks whether result is ready
and fetches it when it becomes available.
:param check_interval: how often availability is checked.
:param timeout: maximum amount of time it will wait
before raising TaskiqResultTimeoutError.
:param with_logs: whether you need to download logs.
:return: TaskiqResult.
"""

@abstractmethod
def get_progress(
self,
) -> Union[
"Optional[TaskProgress[Any]]",
Coroutine[Any, Any, "Optional[TaskProgress[Any]]"],
]:
"""
Get task progress.
:return: task's progress.
"""


class AsyncTaskiqTask(_Task[_ReturnType]):
class AsyncTaskiqTask(Generic[_ReturnType]):
"""AsyncTask for AsyncResultBackend."""

def __init__(
Expand Down
Loading

0 comments on commit a89cac1

Please sign in to comment.