From f8bccb8eea164234df0943ae8b1fa6e1b883e238 Mon Sep 17 00:00:00 2001 From: Adam Burdett Date: Mon, 16 Nov 2020 02:33:24 -0700 Subject: [PATCH] send undeliverable msg's to kafka service Signed-off-by: Adam Burdett --- aries_cloudagent/config/argparse.py | 37 ++++++++++++++++++- aries_cloudagent/transport/inbound/manager.py | 10 ++++- .../transport/outbound/manager.py | 8 +++- requirements.txt | 1 + 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/aries_cloudagent/config/argparse.py b/aries_cloudagent/config/argparse.py index 09db031842..b0bc68e05d 100644 --- a/aries_cloudagent/config/argparse.py +++ b/aries_cloudagent/config/argparse.py @@ -826,6 +826,31 @@ def add_arguments(self, parser: ArgumentParser): to hold messages for delivery to agents without an endpoint. This\ option will require additional memory to store messages in the queue.", ) + parser.add_argument( + "--enable-kafka-queue", + action="store_true", + dest="enable_kafka_queue", + env_var="ACAPY_ENABLE_KAFKA_QUEUE", + help="Enable the inbound/outbound undelivered kafka queue. This" + "agent to send messages where the endpoint is not available to a kafka" + " service." + ) + parser.add_argument( + "--kafka-producer-endpoint", + type=str, + nargs="+", + metavar="", + env_var="ACAPY_KAFKA_PRODUCER_ENDPOINT", + help="" + ) + parser.add_argument( + "--kafka-consumer-endpoint", + type=str, + nargs="+", + metavar="", + env_var="ACAPY_KAFKA_CONSUMER_ENDPOINT", + help="" + ) parser.add_argument( "--max-outbound-retry", default=4, @@ -848,7 +873,17 @@ def get_settings(self, args: Namespace): else: raise ArgsParseError("-ot/--outbound-transport is required") settings["transport.enable_undelivered_queue"] = args.enable_undelivered_queue - + settings["transport.enable_kafka_queue"] = args.enable_kafka_queue + + if args.enable_kafka_queue: + if not args.kafka_producer_endpoint or not args.kafka_consumer_endpoint: + raise ArgsParseError( + "Parameters --kafka-producer-endpoint and --kafka-consumer-endpoint must be provided" + + " for kafka queue." + ) + settings["transport.kafka_producer_endpoint"] = args.kafka_producer_endpoint + settings["transport.kafka_consumer_endpoint"] = args.kafka_consumer_endpoint + if args.label: settings["default_label"] = args.label if args.max_message_size: diff --git a/aries_cloudagent/transport/inbound/manager.py b/aries_cloudagent/transport/inbound/manager.py index 3075d6624b..bad803f774 100644 --- a/aries_cloudagent/transport/inbound/manager.py +++ b/aries_cloudagent/transport/inbound/manager.py @@ -21,6 +21,7 @@ from .delivery_queue import DeliveryQueue from .message import InboundMessage from .session import InboundSession +from kafka import KafkaProducer LOGGER = logging.getLogger(__name__) MODULE_BASE_PATH = "aries_cloudagent.transport.inbound" @@ -46,6 +47,7 @@ def __init__( self.session_limit: asyncio.Semaphore = None self.task_queue = TaskQueue() self.undelivered_queue: DeliveryQueue = None + self.kafka_producer = None async def setup(self): """Perform setup operations.""" @@ -65,7 +67,10 @@ async def setup(self): # Setup queue for undelivered messages if self.context.settings.get("transport.enable_undelivered_queue"): self.undelivered_queue = DeliveryQueue() - + if self.context.settings.get("transport.enable_kafka_queue"): + bootstrap_server = self.context.settings.get("transport.kafka_producer_endpoint") + self.kafka_producer = KafkaProducer(bootstrap_servers=bootstrap_server) + # self.session_limit = asyncio.Semaphore(50) def register(self, config: InboundTransportConfiguration) -> str: @@ -230,6 +235,9 @@ def return_undelivered(self, outbound: OutboundMessage) -> bool: if self.undelivered_queue: self.undelivered_queue.add_message(outbound) return True + if self.kafka_producer: + self.kafka_producer.send("undeliverable_inbound",outbound) + return False def process_undelivered(self, session: InboundSession): diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index 977817a7a6..c45681e47b 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -24,6 +24,7 @@ OutboundTransportRegistrationError, ) from .message import OutboundMessage +from kafka import KafkaProducer LOGGER = logging.getLogger(__name__) MODULE_BASE_PATH = "aries_cloudagent.transport.outbound" @@ -89,6 +90,7 @@ def __init__( self._process_task: asyncio.Task = None if self.context.settings.get("transport.max_outbound_retry"): self.MAX_RETRY_COUNT = self.context.settings["transport.max_outbound_retry"] + self.kafka_producer = None async def setup(self): """Perform setup operations.""" @@ -97,7 +99,9 @@ async def setup(self): ) for outbound_transport in outbound_transports: self.register(outbound_transport) - + if self.context.settings.get("transport.enable_kafka_queue"): + bootstrap_server = self.context.settings.get("transport.kafka_producer_endpoint") + self.kafka_producer = KafkaProducer(bootstrap_servers=bootstrap_server) def register(self, module: str) -> str: """ Register a new outbound transport by module path. @@ -473,6 +477,8 @@ def finished_deliver(self, queued: QueuedOutboundMessage, completed: CompletedTa ">>> Outbound message failed to deliver, NOT Re-queued.", exc_info=queued.error, ) + if self.kafka_producer: + self.kafka_producer.send("undeliverable_outbound",queued) queued.state = QueuedOutboundMessage.STATE_DONE else: queued.error = None diff --git a/requirements.txt b/requirements.txt index 724905b2ba..55da59656c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,4 @@ pyld==2.0.1 py_multicodec==0.2.1 pyyaml~=5.3.1 ConfigArgParse~=1.2.3 +kafka-python~=2.0.2 \ No newline at end of file