diff --git a/broker/broker.py b/broker/broker.py index 5e463d9..ae569af 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -14,6 +14,11 @@ logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) +PUSH_COUNTER = Counter("broker_push_counter", "Number of push to broker") +PULL_COUNTER = Counter("broker_pull_counter", "Number of pull from broker") +MESSAGES_LENGTH = Gauge("broker_message_len", "Number of messages in broker") +REPLICA_MESSAGES_LENGTH = Gauge("broker_replica_messages_len", "Number of replica messages in broker") + class BrokerServer(broker_pb2_grpc.BrokerServicer): def __init__(self) -> None: @@ -29,6 +34,8 @@ def Ack(self, request, context): def Push(self, request, context): self.messages.append((request.key, request.value)) logger.info(f"Pushed message {request.key} {request.value}") + PUSH_COUNTER.inc() + MESSAGES_LENGTH.inc() if self.replica: self.replica_stub.PushReplica(MessageList(messages=[BrokerMessage(key=request.key, value=request.value)])) return BrokerPushResponse(status=BrokerStatus.BROKER_SUCCESS, message="") @@ -44,6 +51,8 @@ def Pull(self, request, context): return BrokerPullResponse(status=BrokerStatus.BROKER_FAILURE, message="No messages") key, value = self.messages.pop(0) logger.warning(f"Pulled message from Broker {self.uuid}: {key} {value}") + PULL_COUNTER.inc() + MESSAGES_LENGTH.dec() if self.replica: self.replica_stub.DropReplicaMessages(MessageCount(count=1)) return BrokerPullResponse(status=BrokerStatus.BROKER_SUCCESS, message=BrokerMessage(key=key, value=value)) @@ -65,24 +74,29 @@ def LeadReplica(self, request, context): raise Exception("No replica") logger.info(f"Merging replica messages to queue messages of broker {self.uuid}") self.messages.extend(self.replica_messages) + MESSAGES_LENGTH.inc(len(self.replica_messages)) self.replica_messages.clear() + REPLICA_MESSAGES_LENGTH.set(0) return BrokerEmpty() def DropReplica(self, request, context): logger.warning(f"Dropping broker {self.uuid} replica") self.replica = None self.replica_messages.clear() + REPLICA_MESSAGES_LENGTH.set(0) return BrokerEmpty() def PushReplica(self, request, context): for message in request.messages: self.replica_messages.append((message.key, message.value)) + REPLICA_MESSAGES_LENGTH.inc() logger.info(f"Pushed message {message.key} {message.value} to replica messages") return BrokerEmpty() def DropReplicaMessages(self, request, context): logger.warning(f"Dropping {request.count} message from replica messages") self.replica_messages = self.replica_messages[request.count:] + REPLICA_MESSAGES_LENGTH.dec(request.count) return BrokerEmpty()