From 9e7bec5da7ea07823797570adcad7597f3419b51 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Tue, 10 Dec 2024 14:13:08 +0100 Subject: [PATCH] adding basic tests --- examples/getting_started/main.py | 25 ++---- rabbitmq_amqp_python_client/connection.py | 12 +-- rabbitmq_amqp_python_client/entities.py | 5 +- rabbitmq_amqp_python_client/management.py | 83 ++++++------------- .../{configuration_options.py => options.py} | 4 +- tests/test_connection.py | 2 - tests/test_management.py | 44 ++++++++++ 7 files changed, 86 insertions(+), 89 deletions(-) rename rabbitmq_amqp_python_client/{configuration_options.py => options.py} (91%) create mode 100644 tests/test_management.py diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 2437169..4c504d7 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -1,5 +1,3 @@ -# from proton import Message - from rabbitmq_amqp_python_client import ( Connection, ExchangeSpecification, @@ -8,7 +6,7 @@ ) -def main(): +def main() -> None: exchange_name = "getting-started-exchange" queue_name = "example-queue" connection = Connection("amqp://guest:guest@localhost:5672/") @@ -17,20 +15,15 @@ def main(): management = connection.management() - exchange_info = management.declare_exchange( - ExchangeSpecification(name=exchange_name, arguments={}) - ) + management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) - #queue_info = management.declare_queue( - # QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={}) - #) + management.declare_queue( + QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={}) + ) """ - #management.bind(BindingSpecification{ - source_exchange: exchange_name, - destination_queue: queue_name, - binding_key: routing_key, - }) + #management.bind(BindingSpecification(source_exchange=exchange_name, destination_queue=queue_name, \ + binding_key=routing_key)) """ """ @@ -59,9 +52,9 @@ def main(): management.purge_queue(queue_info.name) """ - #management.delete_queue(queue_name) + # management.delete_queue(queue_name) - #management.delete_exchange(exchange_name) + # management.delete_exchange(exchange_name) management.close() diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 32a9c2e..9ce6001 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -1,13 +1,5 @@ -from proton.utils import ( - BlockingConnection, - BlockingReceiver, - BlockingSender, -) - -from .configuration_options import ( - ReceiverOption, - SenderOption, -) +from proton.utils import BlockingConnection + from .management import Management diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index c9a2008..3da5d31 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -7,16 +7,17 @@ @dataclass class ExchangeSpecification: name: str - arguments: dict + arguments: dict[str, str] exchange_type: ExchangeType = ExchangeType.direct is_auto_delete: bool = False + is_internal: bool = False is_durable: bool = True @dataclass class QueueSpecification: name: str - arguments: dict + arguments: dict[str, str] queue_type: QueueType = QueueType.quorum dead_letter_routing_key: str = "" is_exclusive: Optional[bool] = None diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index a1d2ad0..5d6ffef 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -1,27 +1,22 @@ import uuid from typing import Any, Optional -import json -from proton import Message, Receiver, Sender + +from proton import Message +from proton._data import Data from proton.utils import ( BlockingConnection, BlockingReceiver, BlockingSender, ) -from proton._data import Data - from .address_helper import exchange_address, queue_address from .common import CommonValues -from .configuration_options import ( - ReceiverOption, - SenderOption, -) from .entities import ( ExchangeSpecification, QueueSpecification, ) +from .options import ReceiverOption, SenderOption -import pickle class Management: def __init__(self, conn: BlockingConnection): @@ -59,7 +54,6 @@ def request( method: str, expected_response_codes: list[int], ) -> None: - print("im in request") self._request(str(uuid.uuid4()), body, path, method, expected_response_codes) def _request( @@ -70,63 +64,37 @@ def _request( method: str, expected_response_codes: list[int], ) -> None: - print("path is: " + path) - - ## test exchange message amq_message = Message( id=id, body=body, reply_to="$me", address=path, subject=method, - #properties={"id": id, "to": path, "subject": method, "reply_to": "$me"}, - ) - - kvBody = { - "auto_delete": False, - "durable": True, - "type": "direct", - "arguments": {}, - } - - amq_message = Message( - body=kvBody, - reply_to="$me", - address=path, - subject=method, - id = id, ) - message_bytes= amq_message.encode() - list_bytes = list(message_bytes) - if self._sender is not None: self._sender.send(amq_message) - msg = self._receiver.receive() - - - print("response received: " + str(msg.subject)) - - #self._validate_reponse_code(int(msg.properties["http:response"]), expected_response_codes) + if self._receiver is not None: + msg = self._receiver.receive() - # TO_COMPLETE HERE + self._validate_reponse_code(int(msg.subject), expected_response_codes) # TODO # def delete_queue(self, name:str): - def declare_exchange(self, exchange_specification: ExchangeSpecification): + def declare_exchange( + self, exchange_specification: ExchangeSpecification + ) -> ExchangeSpecification: body = {} body["auto_delete"] = exchange_specification.is_auto_delete body["durable"] = exchange_specification.is_durable - body["type"] = exchange_specification.exchange_type.value - #body["internal"] = False - body["arguments"] = {} + body["type"] = exchange_specification.exchange_type.value # type: ignore + body["internal"] = exchange_specification.is_internal + body["arguments"] = {} # type: ignore path = exchange_address(exchange_specification.name) - print(path) - self.request( body, path, @@ -138,11 +106,15 @@ def declare_exchange(self, exchange_specification: ExchangeSpecification): ], ) - def declare_queue(self, queue_specification: QueueSpecification): + return exchange_specification + + def declare_queue( + self, queue_specification: QueueSpecification + ) -> QueueSpecification: body = {} body["auto_delete"] = queue_specification.is_auto_delete body["durable"] = queue_specification.is_durable - body["arguments"] = { + body["arguments"] = { # type: ignore "x-queue-type": queue_specification.queue_type.value, "x-dead-letter-exchange": queue_specification.dead_letter_exchange, "x-dead-letter-routing-key": queue_specification.dead_letter_routing_key, @@ -164,8 +136,9 @@ def declare_queue(self, queue_specification: QueueSpecification): ], ) - def delete_exchange(self, exchange_name:str): + return queue_specification + def delete_exchange(self, exchange_name: str) -> None: path = exchange_address(exchange_name) print(path) @@ -179,9 +152,7 @@ def delete_exchange(self, exchange_name:str): ], ) - - def delete_queue(self, queue_name:str): - + def delete_queue(self, queue_name: str) -> None: path = queue_address(queue_name) print(path) @@ -195,11 +166,10 @@ def delete_queue(self, queue_name:str): ], ) - def _validate_reponse_code(self, response_code: int, expected_response_codes: list[int]) -> None: - - print("response code: " + str(response_code)) - - if response_code == CommonValues.response_code_409: + def _validate_reponse_code( + self, response_code: int, expected_response_codes: list[int] + ) -> None: + if response_code == CommonValues.response_code_409.value: # TODO replace with a new defined Exception raise Exception("ErrPreconditionFailed") @@ -209,7 +179,6 @@ def _validate_reponse_code(self, response_code: int, expected_response_codes: li raise Exception("wrong response code received") - # TODO # def bind(self, bind_specification:BindSpecification): diff --git a/rabbitmq_amqp_python_client/configuration_options.py b/rabbitmq_amqp_python_client/options.py similarity index 91% rename from rabbitmq_amqp_python_client/configuration_options.py rename to rabbitmq_amqp_python_client/options.py index 1f2e2ac..8dc596e 100644 --- a/rabbitmq_amqp_python_client/configuration_options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -3,7 +3,7 @@ from proton.reactor import LinkOption # noqa: E402 -class SenderOption(LinkOption): +class SenderOption(LinkOption): # type: ignore def __init__(self, addr: str): self._addr = addr @@ -18,7 +18,7 @@ def test(self, link: Link) -> bool: return bool(link.is_sender) -class ReceiverOption(LinkOption): +class ReceiverOption(LinkOption): # type: ignore def __init__(self, addr: str): self._addr = addr diff --git a/tests/test_connection.py b/tests/test_connection.py index e574901..1d2aa4d 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,8 +1,6 @@ from rabbitmq_amqp_python_client import Connection -# Temporary this will be replaced by our connection Deal when we start the implementation -# For the moment we just need a test to run poetry run pytest without failing def test_connection() -> None: connection = Connection("amqp://guest:guest@localhost:5672/") connection.dial() diff --git a/tests/test_management.py b/tests/test_management.py new file mode 100644 index 0000000..ad3b275 --- /dev/null +++ b/tests/test_management.py @@ -0,0 +1,44 @@ +from rabbitmq_amqp_python_client import ( + Connection, + ExchangeSpecification, + QueueSpecification, + QueueType, +) + + +def test_declare_delete_exchange() -> None: + connection = Connection("amqp://guest:guest@localhost:5672/") + connection.dial() + + exchange_name = "test-exchange" + management = connection.management() + + exchange_info = management.declare_exchange( + ExchangeSpecification(name=exchange_name, arguments={}) + ) + + assert exchange_info.name == exchange_name + + # Still not working + # management.delete_exchange(exchange_name) + + connection.close() + + +def test_declare_delete_queue() -> None: + connection = Connection("amqp://guest:guest@localhost:5672/") + connection.dial() + + queue_name = "test-queue" + management = connection.management() + + exchange_info = management.declare_queue( + QueueSpecification(name=queue_name, queue_type=QueueType.quorum, arguments={}) + ) + + assert exchange_info.name == queue_name + + # Still not working + # management.delete_queue(queue_name) + + connection.close()