From 259571f1cf937af3fb972e997e625d77cd5bf4f3 Mon Sep 17 00:00:00 2001 From: Amirhossein Barati Date: Tue, 13 Feb 2024 00:29:15 +0330 Subject: [PATCH 1/5] feat(BrokerServer): Add logging and monitoring to class --- broker/broker.py | 29 +++++++++++++++++++++++++---- gateway/gateway.py | 7 +++++-- zookeeper/zookeeper.py | 3 +++ 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/broker/broker.py b/broker/broker.py index 5d87ad6..9628d80 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -1,12 +1,20 @@ from proto.zookeeper_pb2 import DiscoveryRequest import proto.zookeeper_pb2_grpc -from proto.broker_pb2 import BrokerEmpty, BrokerPushResponse, BrokerStatus, BrokerMessage, MessageList, MessageCount, BrokerPullResponse +from proto.broker_pb2 import BrokerEmpty, BrokerPushResponse, BrokerStatus, BrokerMessage, MessageList, MessageCount,\ + BrokerPullResponse import proto.broker_pb2_grpc as broker_pb2_grpc import grpc + +from prometheus_client import start_http_server from concurrent.futures import ThreadPoolExecutor import os import time +import logging + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + class BrokerServer(broker_pb2_grpc.BrokerServicer): def __init__(self) -> None: @@ -21,6 +29,7 @@ def Ack(self, request, context): def Push(self, request, context): self.messages.append((request.key, request.value)) + logger.info(f"Pushed message {request.key} {request.value}") if self.replica: self.replica_stub.PushReplica(MessageList(messages=[BrokerMessage(key=request.key, value=request.value)])) return BrokerPushResponse(status=BrokerStatus.BROKER_SUCCESS, message="") @@ -32,8 +41,10 @@ def Pull(self, request, context): if self.messages: break else: + logger.error(f"Pulling message {self.uuid} failed!\nBroker is not available") return BrokerPullResponse(status=BrokerStatus.BROKER_FAILURE, message="No messages") key, value = self.messages.pop(0) + logger.warning(f"Pulled message from Broker {self.uuid}: {key} {value}") if self.replica: self.replica_stub.DropReplicaMessages(MessageCount(count=1)) return BrokerPullResponse(status=BrokerStatus.BROKER_SUCCESS, message=BrokerMessage(key=key, value=value)) @@ -41,19 +52,25 @@ def Pull(self, request, context): def SetReplica(self, request, context): self.replica = request.uuid self.replica_stub = proto.broker_pb2_grpc.BrokerStub(grpc.insecure_channel(request.url)) + logger.info(f"Set replica for broker {self.uuid}: {request.uuid}") batch = 10 for i in range(0, len(self.messages), batch): - self.replica_stub.PushReplica(MessageList(messages=[BrokerMessage(key=k, value=v) for k, v in self.messages[i:i+batch]])) + self.replica_stub.PushReplica(MessageList( + messages=[BrokerMessage(key=k, value=v) for k, v in self.messages[i:i+batch]] + )) return BrokerEmpty() def LeadReplica(self, request, context): if not self.replica: + logger.error("No replica available") raise Exception("No replica") + logger.info(f"Merging replica messages to queue messages of broker {self.uuid}") self.messages.extend(self.replica_messages) self.replica_messages.clear() return BrokerEmpty() def DropReplica(self, request, context): + logger.warning(f"Dropping broker {self.uuid} replica") self.replica = None self.replica_messages.clear() return BrokerEmpty() @@ -61,13 +78,17 @@ def DropReplica(self, request, context): def PushReplica(self, request, context): for message in request.messages: self.replica_messages.append((message.key, message.value)) + logger.info(f"Pushed message {message.key} {message.value} to replica messages") return BrokerEmpty() def DropReplicaMessages(self, request, context): + logger.warning(f"Dropping {request.count} message from replica messages") self.replica_messages = self.replica_messages[request.count:] return BrokerEmpty() - + + if __name__ == "__main__": + start_http_server(8000) server = grpc.server(ThreadPoolExecutor(max_workers=10)) broker = BrokerServer() broker_pb2_grpc.add_BrokerServicer_to_server(broker, server) @@ -76,4 +97,4 @@ def DropReplicaMessages(self, request, context): channel = grpc.insecure_channel(f'{os.environ["ZOOKEEPER_HOST"]}:{os.environ["ZOOKEEPER_PORT"]}') stub = proto.zookeeper_pb2_grpc.ZookeeperStub(channel) stub.Register(DiscoveryRequest(url=f'{os.environ["BROKER_HOST"]}:{os.environ["BROKER_PORT"]}', uuid=broker.uuid)) - server.wait_for_termination() \ No newline at end of file + server.wait_for_termination() diff --git a/gateway/gateway.py b/gateway/gateway.py index baa79ee..ebdd183 100644 --- a/gateway/gateway.py +++ b/gateway/gateway.py @@ -9,10 +9,11 @@ import os import logging +from prometheus_client import generate_latest, Counter, Summary + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -from prometheus_client import generate_latest, Counter, Summary PUSH_COUNTER = Counter("gateway_push_counter", "Number of push requests") PULL_COUNTER = Counter("gateway_pull_counter", "Number of pull requests") PUSH_LATENCY = Summary("gateway_push_latency", "Latency of push requests") @@ -23,6 +24,7 @@ app = Flask(__name__) + @app.route("/push", methods=["POST"]) def push(): key = request.form["key"] @@ -67,6 +69,7 @@ def pull(): value=value, ) + @app.route("/metrics", methods=["GET"]) def metrics(): - return generate_latest() \ No newline at end of file + return generate_latest() diff --git a/zookeeper/zookeeper.py b/zookeeper/zookeeper.py index a0462ac..9e78453 100644 --- a/zookeeper/zookeeper.py +++ b/zookeeper/zookeeper.py @@ -13,9 +13,11 @@ from prometheus_client import start_http_server import logging + logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) + class MessageQueue(message_pb2_grpc.MessageQueueServicer): def __init__(self, broker_manager: BrokerManager): self.broker_manager = broker_manager @@ -45,6 +47,7 @@ def Pull(self, request, context): logger.debug(f"Pulled message: {message.key} {message.value}") return MQPullResponse(status=MQStatus.MQ_SUCCESS, message=MQMessage(key=message.key, value=message.value)) + class Zookeeper(zookeeper_pb2_grpc.ZookeeperServicer): def __init__(self, broker_manager: BrokerManager): self.broker_manager = broker_manager From 7650ac273071415049fb447de8388e897c796194 Mon Sep 17 00:00:00 2001 From: Amirhossein Barati Date: Tue, 13 Feb 2024 00:38:23 +0330 Subject: [PATCH 2/5] feat(BrokerServer): Reorder imports --- broker/broker.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/broker/broker.py b/broker/broker.py index 9628d80..5e463d9 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -1,16 +1,15 @@ +import grpc +import os +import time +import logging +from prometheus_client import start_http_server, Counter, Gauge +from concurrent.futures import ThreadPoolExecutor + from proto.zookeeper_pb2 import DiscoveryRequest import proto.zookeeper_pb2_grpc from proto.broker_pb2 import BrokerEmpty, BrokerPushResponse, BrokerStatus, BrokerMessage, MessageList, MessageCount,\ BrokerPullResponse import proto.broker_pb2_grpc as broker_pb2_grpc -import grpc - - -from prometheus_client import start_http_server -from concurrent.futures import ThreadPoolExecutor -import os -import time -import logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) From a87240ec5250da6085e705c6782a79fc687e5b6e Mon Sep 17 00:00:00 2001 From: Amirhossein Barati Date: Tue, 13 Feb 2024 01:28:40 +0330 Subject: [PATCH 3/5] feat(BrokerServer): Add monitoring fields --- broker/broker.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/broker/broker.py b/broker/broker.py index 5e463d9..ae569af 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -14,6 +14,11 @@ logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) +PUSH_COUNTER = Counter("broker_push_counter", "Number of push to broker") +PULL_COUNTER = Counter("broker_pull_counter", "Number of pull from broker") +MESSAGES_LENGTH = Gauge("broker_message_len", "Number of messages in broker") +REPLICA_MESSAGES_LENGTH = Gauge("broker_replica_messages_len", "Number of replica messages in broker") + class BrokerServer(broker_pb2_grpc.BrokerServicer): def __init__(self) -> None: @@ -29,6 +34,8 @@ def Ack(self, request, context): def Push(self, request, context): self.messages.append((request.key, request.value)) logger.info(f"Pushed message {request.key} {request.value}") + PUSH_COUNTER.inc() + MESSAGES_LENGTH.inc() if self.replica: self.replica_stub.PushReplica(MessageList(messages=[BrokerMessage(key=request.key, value=request.value)])) return BrokerPushResponse(status=BrokerStatus.BROKER_SUCCESS, message="") @@ -44,6 +51,8 @@ def Pull(self, request, context): return BrokerPullResponse(status=BrokerStatus.BROKER_FAILURE, message="No messages") key, value = self.messages.pop(0) logger.warning(f"Pulled message from Broker {self.uuid}: {key} {value}") + PULL_COUNTER.inc() + MESSAGES_LENGTH.dec() if self.replica: self.replica_stub.DropReplicaMessages(MessageCount(count=1)) return BrokerPullResponse(status=BrokerStatus.BROKER_SUCCESS, message=BrokerMessage(key=key, value=value)) @@ -65,24 +74,29 @@ def LeadReplica(self, request, context): raise Exception("No replica") logger.info(f"Merging replica messages to queue messages of broker {self.uuid}") self.messages.extend(self.replica_messages) + MESSAGES_LENGTH.inc(len(self.replica_messages)) self.replica_messages.clear() + REPLICA_MESSAGES_LENGTH.set(0) return BrokerEmpty() def DropReplica(self, request, context): logger.warning(f"Dropping broker {self.uuid} replica") self.replica = None self.replica_messages.clear() + REPLICA_MESSAGES_LENGTH.set(0) return BrokerEmpty() def PushReplica(self, request, context): for message in request.messages: self.replica_messages.append((message.key, message.value)) + REPLICA_MESSAGES_LENGTH.inc() logger.info(f"Pushed message {message.key} {message.value} to replica messages") return BrokerEmpty() def DropReplicaMessages(self, request, context): logger.warning(f"Dropping {request.count} message from replica messages") self.replica_messages = self.replica_messages[request.count:] + REPLICA_MESSAGES_LENGTH.dec(request.count) return BrokerEmpty() From c5bc544af8f0c55875b431268ff999593712e892 Mon Sep 17 00:00:00 2001 From: Amirhossein Barati Date: Tue, 13 Feb 2024 01:32:16 +0330 Subject: [PATCH 4/5] feat(Dockerfile): Add docker file --- broker/Dockerfile | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 broker/Dockerfile diff --git a/broker/Dockerfile b/broker/Dockerfile new file mode 100644 index 0000000..386856c --- /dev/null +++ b/broker/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.10-slim + +WORKDIR /app + +COPY requirements.txt requirements.txt + +RUN pip install -r requirements.txt + +COPY . . + +CMD ["python", "zookeeper.py"] \ No newline at end of file From 556c5225bb220910cbb2af738285eed2fee8c499 Mon Sep 17 00:00:00 2001 From: Amirhossein Barati Date: Tue, 13 Feb 2024 01:32:49 +0330 Subject: [PATCH 5/5] feat(Dockerfile): Edit docker file --- broker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/Dockerfile b/broker/Dockerfile index 386856c..8bea85d 100644 --- a/broker/Dockerfile +++ b/broker/Dockerfile @@ -8,4 +8,4 @@ RUN pip install -r requirements.txt COPY . . -CMD ["python", "zookeeper.py"] \ No newline at end of file +CMD ["python", "broker.py"] \ No newline at end of file