Skip to content

Commit

Permalink
feat(BrokerServer): Add monitoring fields
Browse files Browse the repository at this point in the history
  • Loading branch information
AmirhBrt committed Feb 12, 2024
1 parent 7650ac2 commit a87240e
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

PUSH_COUNTER = Counter("broker_push_counter", "Number of push to broker")
PULL_COUNTER = Counter("broker_pull_counter", "Number of pull from broker")
MESSAGES_LENGTH = Gauge("broker_message_len", "Number of messages in broker")
REPLICA_MESSAGES_LENGTH = Gauge("broker_replica_messages_len", "Number of replica messages in broker")


class BrokerServer(broker_pb2_grpc.BrokerServicer):
def __init__(self) -> None:
Expand All @@ -29,6 +34,8 @@ def Ack(self, request, context):
def Push(self, request, context):
self.messages.append((request.key, request.value))
logger.info(f"Pushed message {request.key} {request.value}")
PUSH_COUNTER.inc()
MESSAGES_LENGTH.inc()
if self.replica:
self.replica_stub.PushReplica(MessageList(messages=[BrokerMessage(key=request.key, value=request.value)]))
return BrokerPushResponse(status=BrokerStatus.BROKER_SUCCESS, message="")
Expand All @@ -44,6 +51,8 @@ def Pull(self, request, context):
return BrokerPullResponse(status=BrokerStatus.BROKER_FAILURE, message="No messages")
key, value = self.messages.pop(0)
logger.warning(f"Pulled message from Broker {self.uuid}: {key} {value}")
PULL_COUNTER.inc()
MESSAGES_LENGTH.dec()
if self.replica:
self.replica_stub.DropReplicaMessages(MessageCount(count=1))
return BrokerPullResponse(status=BrokerStatus.BROKER_SUCCESS, message=BrokerMessage(key=key, value=value))
Expand All @@ -65,24 +74,29 @@ def LeadReplica(self, request, context):
raise Exception("No replica")
logger.info(f"Merging replica messages to queue messages of broker {self.uuid}")
self.messages.extend(self.replica_messages)
MESSAGES_LENGTH.inc(len(self.replica_messages))
self.replica_messages.clear()
REPLICA_MESSAGES_LENGTH.set(0)
return BrokerEmpty()

def DropReplica(self, request, context):
logger.warning(f"Dropping broker {self.uuid} replica")
self.replica = None
self.replica_messages.clear()
REPLICA_MESSAGES_LENGTH.set(0)
return BrokerEmpty()

def PushReplica(self, request, context):
for message in request.messages:
self.replica_messages.append((message.key, message.value))
REPLICA_MESSAGES_LENGTH.inc()
logger.info(f"Pushed message {message.key} {message.value} to replica messages")
return BrokerEmpty()

def DropReplicaMessages(self, request, context):
logger.warning(f"Dropping {request.count} message from replica messages")
self.replica_messages = self.replica_messages[request.count:]
REPLICA_MESSAGES_LENGTH.dec(request.count)
return BrokerEmpty()


Expand Down

0 comments on commit a87240e

Please sign in to comment.