Skip to content

Commit

Permalink
Allow passing extra params to declare exchange and declare queues (du…
Browse files Browse the repository at this point in the history
…plicated with linter fixes)
  • Loading branch information
Steve-Bupyc committed Aug 13, 2024
1 parent 8e00c47 commit b1c71ba
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __init__( # noqa: WPS211
exchange_type: ExchangeType = ExchangeType.TOPIC,
max_priority: Optional[int] = None,
delayed_message_exchange_plugin: bool = False,
declare_exchange_kwargs: Optional[Dict[Any, Any]] = None,
declare_queues_kwargs: Optional[Dict[Any, Any]] = None,
**connection_kwargs: Any,
) -> None:
"""
Expand Down Expand Up @@ -80,6 +82,8 @@ def __init__( # noqa: WPS211
:param max_priority: maximum priority value for messages.
:param delayed_message_exchange_plugin: turn on or disable
delayed-message-exchange rabbitmq plugin.
:param declare_exchange_kwargs: additional from AbstractChannel.declare_exchange
:param declare_queues_kwargs: additional from AbstractChannel.declare_queue
:param connection_kwargs: additional keyword arguments,
for connect_robust method of aio-pika.
"""
Expand All @@ -92,7 +96,9 @@ def __init__( # noqa: WPS211
self._exchange_type = exchange_type
self._qos = qos
self._declare_exchange = declare_exchange
self._declare_exchange_kwargs = declare_exchange_kwargs or {}
self._declare_queues = declare_queues
self._declare_queues_kwargs = declare_queues_kwargs or {}
self._queue_name = queue_name
self._routing_key = routing_key
self._max_priority = max_priority
Expand Down Expand Up @@ -135,6 +141,7 @@ async def startup(self) -> None: # noqa: WPS217
await self.write_channel.declare_exchange(
self._exchange_name,
type=self._exchange_type,
**self._declare_exchange_kwargs,
)

if self._delayed_message_exchange_plugin:
Expand Down Expand Up @@ -178,6 +185,7 @@ async def declare_queues(
"""
await channel.declare_queue(
self._dead_letter_queue_name,
**self._declare_queues_kwargs,
)
args: "Dict[str, Any]" = {
"x-dead-letter-exchange": "",
Expand All @@ -188,6 +196,7 @@ async def declare_queues(
queue = await channel.declare_queue(
self._queue_name,
arguments=args,
**self._declare_queues_kwargs,
)
if self._delayed_message_exchange_plugin:
await queue.bind(
Expand All @@ -201,6 +210,7 @@ async def declare_queues(
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._queue_name,
},
**self._declare_queues_kwargs,
)

await queue.bind(
Expand Down

0 comments on commit b1c71ba

Please sign in to comment.