Skip to content

Commit

Permalink
First working version
Browse files Browse the repository at this point in the history
  • Loading branch information
matpompili committed Jul 29, 2022
1 parent a7fae59 commit 7c85175
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 3 deletions.
21 changes: 21 additions & 0 deletions easypubsub/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import logging
import sys


def getLogger(name: str, log_level=logging.INFO) -> logging.Logger:
"""Returns a logger with a default format."""

handler = logging.StreamHandler(stream=sys.stdout)
handler.setLevel(log_level)
handler.setFormatter(
logging.Formatter(
fmt="%(asctime)s.%(msecs)03d | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)

logger = logging.getLogger(name)
logger.addHandler(handler)
logger.setLevel(log_level)

return logger
60 changes: 60 additions & 0 deletions easypubsub/proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import zmq

from easypubsub.logging import getLogger

_logger = getLogger("EasyPubSub.Proxy")


class Proxy:
"""The EasyPubSub Proxy acts as an intermediary between Publishers and Subscribers.
Args:
subscribers_address: The address that subscribers will use to connect to the Proxy.
publishers_address: The address that publisher will use to connect to the Proxy.
Example:
>>> from easypubsub.proxy import Proxy
>>> proxy = Proxy("tcp://localhost:5555", "tcp://localhost:5556")
"""

def __init__(self, subscribers_address: str, publishers_address: str) -> None:
self.ctx = zmq.Context.instance()
self.subcriber_address = subscribers_address
self.publishers_address = publishers_address

_logger.info("Creating socket for subscribers.")
self.xpub_subscriber_socket = self.ctx.socket(zmq.XPUB)
_logger.info(
"Binding socket for subscribers to {}.".format(self.subcriber_address)
)
self.xpub_subscriber_socket.bind(self.subcriber_address)

_logger.info("Creating socket for publishers.")
self.xsub_publisher_socket = self.ctx.socket(zmq.XSUB)
_logger.info(
"Binding socket for publishers to {}.".format(self.publishers_address)
)
self.xsub_publisher_socket.bind(self.publishers_address)

def launch(self) -> None:
"""Launch the Proxy.
This method will block until the Proxy is closed.
"""
_logger.info("Launching proxy.")
try:
zmq.proxy(self.xpub_subscriber_socket, self.xsub_publisher_socket)
except KeyboardInterrupt:
_logger.info("Detected KeyboardInterrupt. Closing proxy and sockets.")
self.xpub_subscriber_socket.close()
self.xsub_publisher_socket.close()
except zmq.error.ContextTerminated:
_logger.info("Detected ContextTerminated. Closing proxy and sockets.")
self.xpub_subscriber_socket.close()
self.xsub_publisher_socket.close()
except:
_logger.exception("Unhandled exception. Closing proxy and sockets.")
self.xpub_subscriber_socket.close()
self.xsub_publisher_socket.close()

_logger.info("Done.")
41 changes: 41 additions & 0 deletions easypubsub/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pickle
import time
from typing import Any, Optional

import zmq

from easypubsub.logging import getLogger


class Publisher:
def __init__(
self, name: str, proxy_publishers_address: str, default_topic: str = ""
) -> None:
self.publishers_address = proxy_publishers_address
self.default_topic = default_topic
self.name = name

self._logger = getLogger(f"EasyPubSub.Publisher({name})")
self.connect()

def connect(self) -> None:
self.ctx = zmq.Context.instance()
self.socket = self.ctx.socket(zmq.PUB)
self._logger.info(f"Connecting to {self.publishers_address}.")
self.socket.connect(self.publishers_address)

time.sleep(1)

def publish(self, message: Any, topic: Optional[str] = None) -> None:
if topic is None:
topic = self.default_topic
if topic.endswith("."):
self._logger.warning(
f'Topic "{topic}" ends with a dot, I will remove the final dot before publishing.'
)
topic = f"{self.name}.{topic}".strip(".")
try:
pickled_message = pickle.dumps(message)
self.socket.send_multipart([topic.encode("utf-8"), pickled_message])
except:
self._logger.exception("Could not publish message. See traceback.")
67 changes: 67 additions & 0 deletions easypubsub/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import pickle
import time
from typing import Any, List, Optional, Tuple, Union

import zmq

from easypubsub.logging import getLogger


class Subscriber:
def __init__(
self,
name: str,
proxy_subscribers_address: str,
topics: Union[str, List[str]] = "",
receive_timeout: float = 0.1,
) -> None:

self.name = name
self.subscribers_address = proxy_subscribers_address
self.receive_timeout_ms = round(receive_timeout * 1000)

if topics == "":
self.topics = []
elif isinstance(topics, str):
self.topics = [topics.encode("utf-8")]
else:
self.topics = [topic.encode("utf-8") for topic in topics]

self._logger = getLogger(f"EasyPubSub.Subscriber({name})")

self.connect()

def __del__(self) -> None:
self.poller.unregister(self.socket)

def connect(self) -> None:
self.ctx = zmq.Context.instance()
self.socket = self.ctx.socket(zmq.SUB)
self._logger.info(f"Connecting to {self.subscribers_address}.")
self.socket.connect(self.subscribers_address)

if len(self.topics) > 0:
for topic in self.topics:
self._logger.info(f"Subscribing to {topic.decode('utf-8')}.")
self.socket.setsockopt(zmq.SUBSCRIBE, topic)
else:
self._logger.info("Subscribing to all topics.")
self.socket.setsockopt(zmq.SUBSCRIBE, b"")

self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)

time.sleep(1)

def receive(self) -> List[Tuple[str, Any]]:
messages: List[Any] = []
messages_available = True
while messages_available:
sockets = dict(self.poller.poll(self.receive_timeout_ms))
if self.socket in sockets:
topic, message = self.socket.recv_multipart()
messages.append((topic.decode("utf-8"), pickle.loads(message)))
else:
messages_available = False

return messages
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ warn_unused_configs = true

[[tool.mypy.overrides]]
module = [
"pyzmq",
"pyzmq.*",
"setuptools"
]
ignore_missing_imports = true
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name="easypubsub",
version="0.1.0",
version="0.2.0",
packages=["easypubsub"],
url="https://github.com/matpompili/easypubsub",
author="Matteo Pompili",
Expand All @@ -16,7 +16,7 @@
"Intended Audience :: Developers",
"Topic :: Software Development :: Libraries :: Python Modules",
],
description="TODO",
description="A simple wrapper around PyZMQ that provides an easy interface to Publish Subscribe.",
install_requires=requirements,
test_suite="tests",
package_data={"": ["LICENSE"]},
Expand Down
7 changes: 7 additions & 0 deletions tests/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import logging

from easypubsub.logging import getLogger

_logger = getLogger(__name__, log_level=logging.DEBUG)
_logger.info("This is a test message.")
_logger.debug("This is a debug test message.")
6 changes: 6 additions & 0 deletions tests/test_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from easypubsub.proxy import Proxy

SUBSCRIBERS_ADDRESS = "tcp://127.0.0.1:5555"
PUBLISHERS_ADDRESS = "tcp://127.0.0.1:5556"
# Create a Proxy.
Proxy(SUBSCRIBERS_ADDRESS, PUBLISHERS_ADDRESS).launch()
16 changes: 16 additions & 0 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import time

from easypubsub.publisher import Publisher

PUBLISHERS_ADDRESS = "tcp://127.0.0.1:5556"

publisher = Publisher("test_publisher", PUBLISHERS_ADDRESS, default_topic="test_topic")
publisher.publish("This is a test message.")
publisher.publish("This is a debug test message.", topic="debug_test_topic")
publisher.publish("This is a test list".split(" "))
publisher.publish("This will throw a warning", topic="my_wrong_topic.")
time.sleep(2.0)
publisher.publish("This is a test message.")
publisher.publish("This is a debug test message.", topic="debug_test_topic")
publisher.publish("This is a test list".split(" "))
publisher.publish("This will throw a warning", topic="my_wrong_topic.")
22 changes: 22 additions & 0 deletions tests/test_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import time

from easypubsub.subscriber import Subscriber

SUBSCRIBERS_ADDRESS = "tcp://127.0.0.1:5555"

subscriber = Subscriber(
"test_subscriber", SUBSCRIBERS_ADDRESS, topics="test_publisher.test_topic"
)

try:
while True:
result = subscriber.receive()
if len(result) > 0:
print("Received:")
for topic, message in result:
print(f"{topic}: {message}")
else:
time.sleep(1.0)
# print("No message received.")
except KeyboardInterrupt:
pass

0 comments on commit 7c85175

Please sign in to comment.