diff --git a/broker/broker.py b/broker/broker.py index 77b0bee..73b5357 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -28,6 +28,25 @@ def __init__(self) -> None: self.replica = None self.replica_stub = None + def __is_replica_alive(self): + if not self.replica: + return False + try: + self.replica_stub.Ack(BrokerEmpty()) + except: + logger.error(f"Replica {self.replica} is not alive") + self.replica = None + self.replica_stub = None + if self.replica_messages: + logger.warning(f"Replica {self.replica} has {len(self.replica_messages)} messages") + self.messages.extend(self.replica_messages) + MESSAGES_LENGTH.inc(len(self.replica_messages)) + self.replica_messages.clear() + REPLICA_MESSAGES_LENGTH.set(0) + return False + logger.debug(f"Replica {self.replica} is alive") + return True + def Ack(self, request, context): return BrokerEmpty() @@ -36,30 +55,28 @@ def Push(self, request, context): logger.info(f"Pushed message {request.key} {request.value}") PUSH_COUNTER.inc() MESSAGES_LENGTH.inc() - if self.replica: + if self.__is_replica_alive(): self.replica_stub.PushReplica(MessageList(messages=[BrokerMessage(key=request.key, value=request.value)])) return BrokerPushResponse(status=BrokerStatus.BROKER_SUCCESS, message="") def Pull(self, request, context): if not self.messages: - for i in range(10): - time.sleep(1) - if self.messages: - break - else: - logger.error(f"Pulling message {self.uuid} failed!\nBroker is not available") - return BrokerPullResponse(status=BrokerStatus.BROKER_FAILURE, message="No messages") + logger.error(f"Pulling message {self.uuid} failed!\nBroker is not available") + return BrokerPullResponse(status=BrokerStatus.BROKER_FAILURE, message="No messages") key, value = self.messages.pop(0) logger.warning(f"Pulled message from Broker {self.uuid}: {key} {value}") PULL_COUNTER.inc() MESSAGES_LENGTH.dec() - if self.replica: + if self.__is_replica_alive(): self.replica_stub.DropReplicaMessages(MessageCount(count=1)) return BrokerPullResponse(status=BrokerStatus.BROKER_SUCCESS, message=BrokerMessage(key=key, value=value)) def SetReplica(self, request, context): self.replica = request.uuid self.replica_stub = proto.broker_pb2_grpc.BrokerStub(grpc.insecure_channel(request.url)) + if not self.__is_replica_alive(): + logger.error(f"Replica {request.uuid} is not available") + return BrokerEmpty() logger.info(f"Set replica for broker {self.uuid}: {request.uuid}") batch = 10 for i in range(0, len(self.messages), batch): @@ -69,13 +86,12 @@ def SetReplica(self, request, context): return BrokerEmpty() def LeadReplica(self, request, context): - if not self.replica: - logger.error("No replica available") - raise Exception("No replica") + if len(self.replica_messages) == 0: + return BrokerEmpty() logger.info(f"Merging replica messages to queue messages of broker {self.uuid}") self.messages.extend(self.replica_messages) MESSAGES_LENGTH.inc(len(self.replica_messages)) - if self.replica: + if self.__is_replica_alive(): batch = 10 for i in range(0, len(self.messages), batch): self.replica_stub.PushReplica(MessageList( @@ -87,7 +103,6 @@ def LeadReplica(self, request, context): def DropReplica(self, request, context): logger.warning(f"Dropping broker {self.uuid} replica") - self.replica = None self.replica_messages.clear() REPLICA_MESSAGES_LENGTH.set(0) return BrokerEmpty() diff --git a/zookeeper/broker.py b/zookeeper/broker.py index bb15f1e..c701651 100644 --- a/zookeeper/broker.py +++ b/zookeeper/broker.py @@ -115,7 +115,7 @@ def remove_node(self, node: str): del self.brokers[node.uuid] self.broker_ring.remove(node.uuid) if len(self.broker_ring) == 1: - self.broker_ring[0].lead_replica() + self.brokers[self.broker_ring[0]].lead_replica() BROKER_COUNTER.labels(name=node.uuid).dec() logger.info(f"Removed broker {node.uuid}")