From 5b61db317fda1bfca03241824138523c78494dc2 Mon Sep 17 00:00:00 2001 From: Martin Korbel Date: Mon, 25 Apr 2022 14:36:59 +0200 Subject: [PATCH] Support of threading (#7) * Fix creating of tube from schema * Multi topic * Fix of unknow topic Co-authored-by: Martin Korbel --- pyproject.toml | 2 +- zmq_tubes/manager.py | 25 +++++++++++-------------- zmq_tubes/threads.py | 2 ++ 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a8aa483..d5f444f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "zmq_tubes" -version = "0.2.0" +version = "1.4.0" license = "MIT" readme = "README.md" description = "Wrapper for ZMQ comunication." diff --git a/zmq_tubes/manager.py b/zmq_tubes/manager.py index f01091a..91dae7c 100644 --- a/zmq_tubes/manager.py +++ b/zmq_tubes/manager.py @@ -387,6 +387,8 @@ async def receive_data(self, raw_socket=None): class TubeNode: + __TUBE_CLASS = Tube + def __init__(self, *, schema=None, warning_not_mach_topic=True): self.logger = logging.getLogger(self.__class__.__name__) self._tubes = TopicMatcher() @@ -423,7 +425,7 @@ def parse_schema(self, schema): """ if 'tubes' in schema: for tube_info in schema['tubes']: - tube = Tube(**tube_info) + tube = self.__TUBE_CLASS(**tube_info) self.register_tube(tube, tube_info.get('topics', [])) def get_tube_by_topic(self, topic: str, types=None) -> Tube: @@ -457,15 +459,10 @@ def register_tube(self, tube: Tube, topics: [str]): if isinstance(topics, str): topics = [topics] for topic in topics: - tubes = self._tubes.get_topic(topic) - if tubes: - self.logger.info(f"The tube '{tube.name}' overrides " - f"the exist topic: {topic}") - tubes.append(tube) - else: - self.logger.debug(f"The tube '{tube.name}' was registered to " - f"the topic: {topic}") - tubes = [tube, ] + tubes = self._tubes.get_topic(topic) or [] + tubes.append(tube) + self.logger.debug(f"The tube '{tube.name}' was registered to " + f"the topic: {topic}") self._tubes.set_topic(topic, tubes) def get_callback_by_topic(self, topic: str, tube=None) -> Callable: @@ -476,10 +473,10 @@ def get_callback_by_topic(self, topic: str, tube=None) -> Callable: """ callbacks = [] callbacks_for_tube = [] - for clb in self._callbacks.match(topic): - if not hasattr(clb, 'tube'): + for clb in self._callbacks.match(topic) or []: + if 'tube' not in clb.__dict__: callbacks.append(clb) - elif clb.tube == tube: + elif clb.__dict__['tube'] == tube: callbacks_for_tube.append(clb) return callbacks_for_tube if callbacks_for_tube else callbacks @@ -531,7 +528,7 @@ def register_handler(self, topic: str, fce: Callable, tube: Tube = None): :param tube: Tube - only for the case DEALER x DEALER on the same node. """ if tube: - fce.tube = tube + fce.__dict__['tube'] = tube self._callbacks.get_topic(topic, set_default=[]).append(fce) def stop(self): diff --git a/zmq_tubes/threads.py b/zmq_tubes/threads.py index 7804a5f..7ee818a 100644 --- a/zmq_tubes/threads.py +++ b/zmq_tubes/threads.py @@ -79,6 +79,8 @@ def receive_data(self, raw_socket=None): class TubeNode(AsyncTubeNode): + __TUBE_CLASS = Tube + def request(self, topic: str, payload=None, timeout=30) \ -> TubeMessage: tube = self.get_tube_by_topic(topic, [zmq.REQ])