Skip to content

Commit

Permalink
Support of threading (#7)
Browse files Browse the repository at this point in the history
* Fix creating of tube from schema

* Multi topic

* Fix of unknow topic

Co-authored-by: Martin Korbel <[email protected]>
  • Loading branch information
BlackSmith and Martin Korbel authored Apr 25, 2022
1 parent f970644 commit 5b61db3
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "zmq_tubes"
version = "0.2.0"
version = "1.4.0"
license = "MIT"
readme = "README.md"
description = "Wrapper for ZMQ comunication."
Expand Down
25 changes: 11 additions & 14 deletions zmq_tubes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ async def receive_data(self, raw_socket=None):

class TubeNode:

__TUBE_CLASS = Tube

def __init__(self, *, schema=None, warning_not_mach_topic=True):
self.logger = logging.getLogger(self.__class__.__name__)
self._tubes = TopicMatcher()
Expand Down Expand Up @@ -423,7 +425,7 @@ def parse_schema(self, schema):
"""
if 'tubes' in schema:
for tube_info in schema['tubes']:
tube = Tube(**tube_info)
tube = self.__TUBE_CLASS(**tube_info)
self.register_tube(tube, tube_info.get('topics', []))

def get_tube_by_topic(self, topic: str, types=None) -> Tube:
Expand Down Expand Up @@ -457,15 +459,10 @@ def register_tube(self, tube: Tube, topics: [str]):
if isinstance(topics, str):
topics = [topics]
for topic in topics:
tubes = self._tubes.get_topic(topic)
if tubes:
self.logger.info(f"The tube '{tube.name}' overrides "
f"the exist topic: {topic}")
tubes.append(tube)
else:
self.logger.debug(f"The tube '{tube.name}' was registered to "
f"the topic: {topic}")
tubes = [tube, ]
tubes = self._tubes.get_topic(topic) or []
tubes.append(tube)
self.logger.debug(f"The tube '{tube.name}' was registered to "
f"the topic: {topic}")
self._tubes.set_topic(topic, tubes)

def get_callback_by_topic(self, topic: str, tube=None) -> Callable:
Expand All @@ -476,10 +473,10 @@ def get_callback_by_topic(self, topic: str, tube=None) -> Callable:
"""
callbacks = []
callbacks_for_tube = []
for clb in self._callbacks.match(topic):
if not hasattr(clb, 'tube'):
for clb in self._callbacks.match(topic) or []:
if 'tube' not in clb.__dict__:
callbacks.append(clb)
elif clb.tube == tube:
elif clb.__dict__['tube'] == tube:
callbacks_for_tube.append(clb)
return callbacks_for_tube if callbacks_for_tube else callbacks

Expand Down Expand Up @@ -531,7 +528,7 @@ def register_handler(self, topic: str, fce: Callable, tube: Tube = None):
:param tube: Tube - only for the case DEALER x DEALER on the same node.
"""
if tube:
fce.tube = tube
fce.__dict__['tube'] = tube
self._callbacks.get_topic(topic, set_default=[]).append(fce)

def stop(self):
Expand Down
2 changes: 2 additions & 0 deletions zmq_tubes/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def receive_data(self, raw_socket=None):


class TubeNode(AsyncTubeNode):
__TUBE_CLASS = Tube

def request(self, topic: str, payload=None, timeout=30) \
-> TubeMessage:
tube = self.get_tube_by_topic(topic, [zmq.REQ])
Expand Down

0 comments on commit 5b61db3

Please sign in to comment.