Skip to content

Commit

Permalink
Finish 0.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
chandr-andr committed May 21, 2023
2 parents 410dd34 + c3e5e69 commit 82caeb1
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
pytest:
services:
rabbit:
image: rabbitmq:3.9.16-alpine
image: heidiks/rabbitmq-delayed-message-exchange:latest
env:
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
Expand Down
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ You can send delayed messages and set priorities to messages using labels.

## Delays

### **Default retries**

To send delayed message, you have to specify
delay label. You can do it with `task` decorator,
or by using kicker. For example:
or by using kicker.
In this type of delay we are using additional queue with `expiration` parameter and after with time message will be deleted from `delay` queue and sent to the main taskiq queue.
For example:

```python
broker = AioPikaBroker()
Expand All @@ -48,6 +52,36 @@ async def main():
# have to wait delay period before message is going to be sent.
```

### **Retries with `rabbitmq-delayed-message-exchange` plugin**

To send delayed message you can install `rabbitmq-delayed-message-exchange`
plugin https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.

And you need to configure you broker.
There is `delayed_message_exchange_plugin` `AioPikaBroker` parameter and it must be `True` to turn on delayed message functionality.

The delay plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time.
For example:

```python
broker = AioPikaBroker(
delayed_message_exchange_plugin=True,
)

@broker.task(delay=3)
async def delayed_task() -> int:
return 1

async def main():
await broker.startup()
# This message will be received by workers
# After 3 seconds delay.
await delayed_task.kiq()

# This message is going to be received after the delay in 4 seconds.
# Since we overriden the `delay` label using kicker.
await delayed_task.kicker().with_labels(delay=4).kiq()
```

## Priorities

Expand Down
28 changes: 14 additions & 14 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taskiq-aio-pika"
version = "0.2.1"
version = "0.2.2"
description = "RabbitMQ broker for taskiq"
authors = ["Pavel Kirilin <[email protected]>"]
readme = "README.md"
Expand Down
100 changes: 71 additions & 29 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__( # noqa: WPS211
routing_key: str = "#",
exchange_type: ExchangeType = ExchangeType.TOPIC,
max_priority: Optional[int] = None,
delayed_message_exchange_plugin: bool = False,
**connection_kwargs: Any,
) -> None:
"""
Expand All @@ -79,6 +80,8 @@ def __init__( # noqa: WPS211
:param exchange_type: type of the exchange.
Used only if `declare_exchange` is True.
:param max_priority: maximum priority value for messages.
:param delayed_message_exchange_plugin: turn on or disable
delayed-message-exchange rabbitmq plugin.
:param connection_kwargs: additional keyword arguments,
for connect_robust method of aio-pika.
"""
Expand All @@ -95,6 +98,7 @@ def __init__( # noqa: WPS211
self._queue_name = queue_name
self._routing_key = routing_key
self._max_priority = max_priority
self._delayed_message_exchange_plugin = delayed_message_exchange_plugin

self._dead_letter_queue_name = f"{queue_name}.dead_letter"
if dead_letter_queue_name:
Expand All @@ -104,6 +108,8 @@ def __init__( # noqa: WPS211
if delay_queue_name:
self._delay_queue_name = delay_queue_name

self._delay_plugin_exchange_name = f"{exchange_name}.plugin_delay"

self.read_conn: Optional[AbstractRobustConnection] = None
self.write_conn: Optional[AbstractRobustConnection] = None
self.write_channel: Optional[AbstractChannel] = None
Expand Down Expand Up @@ -132,9 +138,31 @@ async def startup(self) -> None: # noqa: WPS217
self._exchange_name,
type=self._exchange_type,
)

if self._delayed_message_exchange_plugin:
await self.write_channel.declare_exchange(
self._delay_plugin_exchange_name,
type=ExchangeType.X_DELAYED_MESSAGE,
arguments={
"x-delayed-type": "direct",
},
)

if self._declare_queues:
await self.declare_queues(self.write_channel)

async def shutdown(self) -> None:
"""Close all connections on shutdown."""
await super().shutdown()
if self.write_channel:
await self.write_channel.close()
if self.read_channel:
await self.read_channel.close()
if self.write_conn:
await self.write_conn.close()
if self.read_conn:
await self.read_conn.close()

async def declare_queues(
self,
channel: AbstractChannel,
Expand Down Expand Up @@ -163,14 +191,24 @@ async def declare_queues(
self._queue_name,
arguments=args,
)
await channel.declare_queue(
self._delay_queue_name,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._queue_name,
},
if self._delayed_message_exchange_plugin:
await queue.bind(
exchange=self._delay_plugin_exchange_name,
routing_key=self._routing_key,
)
else:
await channel.declare_queue(
self._delay_queue_name,
arguments={
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._queue_name,
},
)

await queue.bind(
exchange=self._exchange_name,
routing_key=self._routing_key,
)
await queue.bind(exchange=self._exchange_name, routing_key=self._routing_key)
return queue

async def kick(self, message: BrokerMessage) -> None:
Expand All @@ -189,28 +227,44 @@ async def kick(self, message: BrokerMessage) -> None:
"""
if self.write_channel is None:
raise ValueError("Please run startup before kicking.")
priority = parse_val(int, message.labels.get("priority"))
rmq_msg = Message(
body=message.message,
headers={

message_base_params: dict[str, Any] = {
"body": message.message,
"headers": {
"task_id": message.task_id,
"task_name": message.task_name,
**message.labels,
},
delivery_mode=DeliveryMode.PERSISTENT,
priority=priority,
"delivery_mode": DeliveryMode.PERSISTENT,
}

message_base_params["priority"] = parse_val(
int,
message.labels.get("priority"),
)
delay = parse_val(int, message.labels.get("delay"))

delay: Optional[int] = parse_val(int, message.labels.get("delay"))
rmq_message: Message = Message(**message_base_params)

if delay is None:
exchange = await self.write_channel.get_exchange(
self._exchange_name,
ensure=False,
)
await exchange.publish(rmq_msg, routing_key=message.task_name)
await exchange.publish(rmq_message, routing_key=message.task_name)
elif self._delayed_message_exchange_plugin:
rmq_message.headers["x-delay"] = delay * 1000
exchange = await self.write_channel.get_exchange(
self._delay_plugin_exchange_name,
)
await exchange.publish(
rmq_message,
routing_key=self._routing_key,
)
else:
rmq_msg.expiration = timedelta(seconds=delay)
rmq_message.expiration = timedelta(seconds=delay)
await self.write_channel.default_exchange.publish(
rmq_msg,
rmq_message,
routing_key=self._delay_queue_name,
)

Expand All @@ -232,15 +286,3 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
async for message in iterator:
async with message.process():
yield message.body

async def shutdown(self) -> None:
"""Close all connections on shutdown."""
await super().shutdown()
if self.write_channel:
await self.write_channel.close()
if self.read_channel:
await self.read_channel.close()
if self.write_conn:
await self.write_conn.close()
if self.read_conn:
await self.read_conn.close()
Loading

0 comments on commit 82caeb1

Please sign in to comment.