Skip to content

Commit

Permalink
Merge branch 'release/0.3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed May 27, 2023
2 parents 82caeb1 + f9079a3 commit 8c3786f
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 250 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ repos:
hooks:
- id: check-ast
- id: trailing-whitespace
exclude: 'README.md'
- id: check-toml
- id: end-of-file-fixer

Expand Down
464 changes: 234 additions & 230 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taskiq-aio-pika"
version = "0.2.2"
version = "0.3.0"
description = "RabbitMQ broker for taskiq"
authors = ["Pavel Kirilin <[email protected]>"]
readme = "README.md"
Expand All @@ -20,8 +20,9 @@ keywords = ["taskiq", "tasks", "distributed", "async", "aio-pika"]

[tool.poetry.dependencies]
python = "^3.7"
taskiq = "^0"
taskiq = ">=0.6.0,<1"
aio-pika = "^9.0"
importlib-metadata = {version = "^4.0.0", python = "<3.8"}

[tool.poetry.dev-dependencies]
pytest = "^7.0"
Expand Down
9 changes: 9 additions & 0 deletions taskiq_aio_pika/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
"""Taskiq integration with aio-pika."""
from taskiq_aio_pika.broker import AioPikaBroker

try:
# Python 3.8+
from importlib.metadata import version # noqa: WPS433
except ImportError:
# Python 3.7
from importlib_metadata import version # noqa: WPS433, WPS440

__version__ = version("taskiq-aio-pika")

__all__ = ["AioPikaBroker"]
24 changes: 12 additions & 12 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

from aio_pika import DeliveryMode, ExchangeType, Message, connect_robust
from aio_pika.abc import AbstractChannel, AbstractQueue, AbstractRobustConnection
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend
from taskiq.message import BrokerMessage
from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage

_T = TypeVar("_T") # noqa: WPS111

Expand Down Expand Up @@ -228,21 +226,20 @@ async def kick(self, message: BrokerMessage) -> None:
if self.write_channel is None:
raise ValueError("Please run startup before kicking.")

message_base_params: dict[str, Any] = {
message_base_params: Dict[str, Any] = {
"body": message.message,
"headers": {
"task_id": message.task_id,
"task_name": message.task_name,
**message.labels,
},
"delivery_mode": DeliveryMode.PERSISTENT,
"priority": parse_val(
int,
message.labels.get("priority"),
),
}

message_base_params["priority"] = parse_val(
int,
message.labels.get("priority"),
)

delay: Optional[int] = parse_val(int, message.labels.get("delay"))
rmq_message: Message = Message(**message_base_params)

Expand All @@ -268,7 +265,7 @@ async def kick(self, message: BrokerMessage) -> None:
routing_key=self._delay_queue_name,
)

async def listen(self) -> AsyncGenerator[bytes, None]:
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
"""
Listen to queue.
Expand All @@ -284,5 +281,8 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
queue = await self.declare_queues(self.read_channel)
async with queue.iterator() as iterator:
async for message in iterator:
async with message.process():
yield message.body
yield AckableMessage(
data=message.body,
ack=message.ack,
reject=message.reject,
)
16 changes: 10 additions & 6 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import pytest
from aio_pika import Channel, Message
from aio_pika.exceptions import QueueEmpty
from taskiq import BrokerMessage
from taskiq import AckableMessage, BrokerMessage
from taskiq.utils import maybe_awaitable

from taskiq_aio_pika.broker import AioPikaBroker


async def get_first_task(broker: AioPikaBroker) -> bytes: # type: ignore
async def get_first_task(broker: AioPikaBroker) -> AckableMessage: # type: ignore
"""
Get first message from the queue.
Expand Down Expand Up @@ -46,7 +47,8 @@ async def test_kick_success(broker: AioPikaBroker) -> None:

message = await asyncio.wait_for(get_first_task(broker), timeout=0.4)

assert message == sent.message
assert message.data == sent.message
await maybe_awaitable(message.ack())


@pytest.mark.anyio
Expand Down Expand Up @@ -111,7 +113,8 @@ async def test_listen(

message = await asyncio.wait_for(get_first_task(broker), timeout=0.4)

assert message == b"test_message"
assert message.data == b"test_message"
await maybe_awaitable(message.ack())


@pytest.mark.anyio
Expand All @@ -133,9 +136,10 @@ async def test_wrong_format(
routing_key=queue_name,
)

msg_bytes = await asyncio.wait_for(get_first_task(broker), 0.4)
message = await asyncio.wait_for(get_first_task(broker), 0.4)

assert msg_bytes == b"wrong"
assert message.data == b"wrong"
await maybe_awaitable(message.ack())

with pytest.raises(QueueEmpty):
await queue.get()
Expand Down

0 comments on commit 8c3786f

Please sign in to comment.