Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multiple queues #17

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ keywords = ["taskiq", "tasks", "distributed", "async", "aio-pika"]
python = "^3.7"
taskiq = "^0"
aio-pika = "^8.1.0"
aiostream = "^0.4.5"

[tool.poetry.dev-dependencies]
pytest = "^7.0"
Expand Down
62 changes: 45 additions & 17 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
from datetime import timedelta
from logging import getLogger
from typing import Any, AsyncGenerator, Callable, Dict, Optional, TypeVar
from typing import Any, AsyncGenerator, Callable, Dict, List, Optional, TypeVar

from aio_pika import DeliveryMode, ExchangeType, Message, connect_robust
from aio_pika.abc import AbstractChannel, AbstractQueue, AbstractRobustConnection
from aiostream import stream
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend
from taskiq.message import BrokerMessage
Expand Down Expand Up @@ -50,7 +51,6 @@ def __init__( # noqa: WPS211
delay_queue_name: Optional[str] = None,
declare_exchange: bool = True,
declare_queues: bool = True,
routing_key: str = "#",
exchange_type: ExchangeType = ExchangeType.TOPIC,
max_priority: Optional[int] = None,
**connection_kwargs: Any,
Expand All @@ -75,7 +75,6 @@ def __init__( # noqa: WPS211
if it doesn't exist.
:param declare_queues: whether you want to declare queues even on
client side. May be useful for message persistance.
:param routing_key: that used to bind that queue to the exchange.
:param exchange_type: type of the exchange.
Used only if `declare_exchange` is True.
:param max_priority: maximum priority value for messages.
Expand All @@ -93,8 +92,8 @@ def __init__( # noqa: WPS211
self._declare_exchange = declare_exchange
self._declare_queues = declare_queues
self._queue_name = queue_name
self._routing_key = routing_key
self._max_priority = max_priority
self._queue_name_list: List[str] = []

self._dead_letter_queue_name = f"{queue_name}.dead_letter"
if dead_letter_queue_name:
Expand Down Expand Up @@ -135,10 +134,20 @@ async def startup(self) -> None: # noqa: WPS217
if self._declare_queues:
await self.declare_queues(self.write_channel)

def add_queue(self, queue_name: str) -> "AioPikaBroker":
"""
This function is add queue name.

:param queue_name: queue_name.
:return: AioPikaBroker
"""
self._queue_name_list.append(queue_name)
return self

async def declare_queues(
self,
channel: AbstractChannel,
) -> AbstractQueue:
) -> List[AbstractQueue]:
"""
This function is used to declare queues.

Expand All @@ -150,6 +159,16 @@ async def declare_queues(
:param channel: channel to used for declaration.
:return: main queue instance.
"""
queue_list = []

async def bind_queue(queue_name: str) -> AbstractQueue:
queue = await channel.declare_queue(
queue_name,
arguments=args,
)
await queue.bind(exchange=self._exchange_name, routing_key=queue_name)
return queue

await channel.declare_queue(
self._dead_letter_queue_name,
)
Expand All @@ -159,19 +178,19 @@ async def declare_queues(
}
if self._max_priority is not None:
args["x-max-priority"] = self._max_priority
queue = await channel.declare_queue(
self._queue_name,
arguments=args,
)
queue_list.append(await bind_queue(self._queue_name))
await channel.declare_queue(
self._delay_queue_name,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._queue_name,
},
)
await queue.bind(exchange=self._exchange_name, routing_key=self._routing_key)
return queue
if self._queue_name_list:
for _queue_name in self._queue_name_list:
queue_list.append(await bind_queue(_queue_name))

return queue_list

async def kick(self, message: BrokerMessage) -> None:
"""
Expand Down Expand Up @@ -201,12 +220,13 @@ async def kick(self, message: BrokerMessage) -> None:
priority=priority,
)
delay = parse_val(int, message.labels.get("delay"))
routing_key_name = message.queue_name or self._queue_name # type: ignore # todo
if delay is None:
exchange = await self.write_channel.get_exchange(
self._exchange_name,
ensure=False,
)
await exchange.publish(rmq_msg, routing_key=message.task_name)
await exchange.publish(rmq_msg, routing_key=routing_key_name)
else:
rmq_msg.expiration = timedelta(seconds=delay)
await self.write_channel.default_exchange.publish(
Expand All @@ -227,11 +247,19 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
if self.read_channel is None:
raise ValueError("Call startup before starting listening.")
await self.read_channel.set_qos(prefetch_count=self._qos)
queue = await self.declare_queues(self.read_channel)
async with queue.iterator() as iterator:
async for message in iterator:
async with message.process():
yield message.body
queue_list = await self.declare_queues(self.read_channel)

async def body(queue: AbstractQueue) -> AsyncGenerator[bytes, None]:
async with queue.iterator() as iterator:
async for message in iterator:
async with message.process():
yield message.body

combine = stream.merge(*[body(queue) for queue in queue_list])

async with combine.stream() as streamer:
async for message_body in streamer:
yield message_body

async def shutdown(self) -> None:
"""Close all connections on shutdown."""
Expand Down