From b1c71ba60d6bc64b2056c918a147361b03d602f4 Mon Sep 17 00:00:00 2001 From: Stepan Date: Wed, 14 Aug 2024 00:20:09 +0300 Subject: [PATCH] Allow passing extra params to declare exchange and declare queues (duplicated with linter fixes) --- taskiq_aio_pika/broker.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/taskiq_aio_pika/broker.py b/taskiq_aio_pika/broker.py index a2e0d5e..326e54a 100644 --- a/taskiq_aio_pika/broker.py +++ b/taskiq_aio_pika/broker.py @@ -52,6 +52,8 @@ def __init__( # noqa: WPS211 exchange_type: ExchangeType = ExchangeType.TOPIC, max_priority: Optional[int] = None, delayed_message_exchange_plugin: bool = False, + declare_exchange_kwargs: Optional[Dict[Any, Any]] = None, + declare_queues_kwargs: Optional[Dict[Any, Any]] = None, **connection_kwargs: Any, ) -> None: """ @@ -80,6 +82,8 @@ def __init__( # noqa: WPS211 :param max_priority: maximum priority value for messages. :param delayed_message_exchange_plugin: turn on or disable delayed-message-exchange rabbitmq plugin. + :param declare_exchange_kwargs: additional from AbstractChannel.declare_exchange + :param declare_queues_kwargs: additional from AbstractChannel.declare_queue :param connection_kwargs: additional keyword arguments, for connect_robust method of aio-pika. """ @@ -92,7 +96,9 @@ def __init__( # noqa: WPS211 self._exchange_type = exchange_type self._qos = qos self._declare_exchange = declare_exchange + self._declare_exchange_kwargs = declare_exchange_kwargs or {} self._declare_queues = declare_queues + self._declare_queues_kwargs = declare_queues_kwargs or {} self._queue_name = queue_name self._routing_key = routing_key self._max_priority = max_priority @@ -135,6 +141,7 @@ async def startup(self) -> None: # noqa: WPS217 await self.write_channel.declare_exchange( self._exchange_name, type=self._exchange_type, + **self._declare_exchange_kwargs, ) if self._delayed_message_exchange_plugin: @@ -178,6 +185,7 @@ async def declare_queues( """ await channel.declare_queue( self._dead_letter_queue_name, + **self._declare_queues_kwargs, ) args: "Dict[str, Any]" = { "x-dead-letter-exchange": "", @@ -188,6 +196,7 @@ async def declare_queues( queue = await channel.declare_queue( self._queue_name, arguments=args, + **self._declare_queues_kwargs, ) if self._delayed_message_exchange_plugin: await queue.bind( @@ -201,6 +210,7 @@ async def declare_queues( "x-dead-letter-exchange": "", "x-dead-letter-routing-key": self._queue_name, }, + **self._declare_queues_kwargs, ) await queue.bind(