diff --git a/broker/Dockerfile b/broker/Dockerfile new file mode 100644 index 0000000..8bea85d --- /dev/null +++ b/broker/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.10-slim + +WORKDIR /app + +COPY requirements.txt requirements.txt + +RUN pip install -r requirements.txt + +COPY . . + +CMD ["python", "broker.py"] \ No newline at end of file diff --git a/broker/broker.py b/broker/broker.py index 5d87ad6..ae569af 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -1,12 +1,24 @@ +import grpc +import os +import time +import logging +from prometheus_client import start_http_server, Counter, Gauge +from concurrent.futures import ThreadPoolExecutor + from proto.zookeeper_pb2 import DiscoveryRequest import proto.zookeeper_pb2_grpc -from proto.broker_pb2 import BrokerEmpty, BrokerPushResponse, BrokerStatus, BrokerMessage, MessageList, MessageCount, BrokerPullResponse +from proto.broker_pb2 import BrokerEmpty, BrokerPushResponse, BrokerStatus, BrokerMessage, MessageList, MessageCount,\ + BrokerPullResponse import proto.broker_pb2_grpc as broker_pb2_grpc -import grpc -from concurrent.futures import ThreadPoolExecutor -import os -import time +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +PUSH_COUNTER = Counter("broker_push_counter", "Number of push to broker") +PULL_COUNTER = Counter("broker_pull_counter", "Number of pull from broker") +MESSAGES_LENGTH = Gauge("broker_message_len", "Number of messages in broker") +REPLICA_MESSAGES_LENGTH = Gauge("broker_replica_messages_len", "Number of replica messages in broker") + class BrokerServer(broker_pb2_grpc.BrokerServicer): def __init__(self) -> None: @@ -21,6 +33,9 @@ def Ack(self, request, context): def Push(self, request, context): self.messages.append((request.key, request.value)) + logger.info(f"Pushed message {request.key} {request.value}") + PUSH_COUNTER.inc() + MESSAGES_LENGTH.inc() if self.replica: self.replica_stub.PushReplica(MessageList(messages=[BrokerMessage(key=request.key, value=request.value)])) return BrokerPushResponse(status=BrokerStatus.BROKER_SUCCESS, message="") @@ -32,8 +47,12 @@ def Pull(self, request, context): if self.messages: break else: + logger.error(f"Pulling message {self.uuid} failed!\nBroker is not available") return BrokerPullResponse(status=BrokerStatus.BROKER_FAILURE, message="No messages") key, value = self.messages.pop(0) + logger.warning(f"Pulled message from Broker {self.uuid}: {key} {value}") + PULL_COUNTER.inc() + MESSAGES_LENGTH.dec() if self.replica: self.replica_stub.DropReplicaMessages(MessageCount(count=1)) return BrokerPullResponse(status=BrokerStatus.BROKER_SUCCESS, message=BrokerMessage(key=key, value=value)) @@ -41,33 +60,48 @@ def Pull(self, request, context): def SetReplica(self, request, context): self.replica = request.uuid self.replica_stub = proto.broker_pb2_grpc.BrokerStub(grpc.insecure_channel(request.url)) + logger.info(f"Set replica for broker {self.uuid}: {request.uuid}") batch = 10 for i in range(0, len(self.messages), batch): - self.replica_stub.PushReplica(MessageList(messages=[BrokerMessage(key=k, value=v) for k, v in self.messages[i:i+batch]])) + self.replica_stub.PushReplica(MessageList( + messages=[BrokerMessage(key=k, value=v) for k, v in self.messages[i:i+batch]] + )) return BrokerEmpty() def LeadReplica(self, request, context): if not self.replica: + logger.error("No replica available") raise Exception("No replica") + logger.info(f"Merging replica messages to queue messages of broker {self.uuid}") self.messages.extend(self.replica_messages) + MESSAGES_LENGTH.inc(len(self.replica_messages)) self.replica_messages.clear() + REPLICA_MESSAGES_LENGTH.set(0) return BrokerEmpty() def DropReplica(self, request, context): + logger.warning(f"Dropping broker {self.uuid} replica") self.replica = None self.replica_messages.clear() + REPLICA_MESSAGES_LENGTH.set(0) return BrokerEmpty() def PushReplica(self, request, context): for message in request.messages: self.replica_messages.append((message.key, message.value)) + REPLICA_MESSAGES_LENGTH.inc() + logger.info(f"Pushed message {message.key} {message.value} to replica messages") return BrokerEmpty() def DropReplicaMessages(self, request, context): + logger.warning(f"Dropping {request.count} message from replica messages") self.replica_messages = self.replica_messages[request.count:] + REPLICA_MESSAGES_LENGTH.dec(request.count) return BrokerEmpty() - + + if __name__ == "__main__": + start_http_server(8000) server = grpc.server(ThreadPoolExecutor(max_workers=10)) broker = BrokerServer() broker_pb2_grpc.add_BrokerServicer_to_server(broker, server) @@ -76,4 +110,4 @@ def DropReplicaMessages(self, request, context): channel = grpc.insecure_channel(f'{os.environ["ZOOKEEPER_HOST"]}:{os.environ["ZOOKEEPER_PORT"]}') stub = proto.zookeeper_pb2_grpc.ZookeeperStub(channel) stub.Register(DiscoveryRequest(url=f'{os.environ["BROKER_HOST"]}:{os.environ["BROKER_PORT"]}', uuid=broker.uuid)) - server.wait_for_termination() \ No newline at end of file + server.wait_for_termination() diff --git a/gateway/gateway.py b/gateway/gateway.py index baa79ee..ebdd183 100644 --- a/gateway/gateway.py +++ b/gateway/gateway.py @@ -9,10 +9,11 @@ import os import logging +from prometheus_client import generate_latest, Counter, Summary + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -from prometheus_client import generate_latest, Counter, Summary PUSH_COUNTER = Counter("gateway_push_counter", "Number of push requests") PULL_COUNTER = Counter("gateway_pull_counter", "Number of pull requests") PUSH_LATENCY = Summary("gateway_push_latency", "Latency of push requests") @@ -23,6 +24,7 @@ app = Flask(__name__) + @app.route("/push", methods=["POST"]) def push(): key = request.form["key"] @@ -67,6 +69,7 @@ def pull(): value=value, ) + @app.route("/metrics", methods=["GET"]) def metrics(): - return generate_latest() \ No newline at end of file + return generate_latest() diff --git a/zookeeper/zookeeper.py b/zookeeper/zookeeper.py index a0462ac..9e78453 100644 --- a/zookeeper/zookeeper.py +++ b/zookeeper/zookeeper.py @@ -13,9 +13,11 @@ from prometheus_client import start_http_server import logging + logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) + class MessageQueue(message_pb2_grpc.MessageQueueServicer): def __init__(self, broker_manager: BrokerManager): self.broker_manager = broker_manager @@ -45,6 +47,7 @@ def Pull(self, request, context): logger.debug(f"Pulled message: {message.key} {message.value}") return MQPullResponse(status=MQStatus.MQ_SUCCESS, message=MQMessage(key=message.key, value=message.value)) + class Zookeeper(zookeeper_pb2_grpc.ZookeeperServicer): def __init__(self, broker_manager: BrokerManager): self.broker_manager = broker_manager