Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/AlisaLC/MiniKafka
Browse files Browse the repository at this point in the history
  • Loading branch information
aradmaleki02 committed Feb 13, 2024
2 parents b40dc95 + 8cdbdb6 commit 0c46e85
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
43 changes: 29 additions & 14 deletions broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@ def __init__(self) -> None:
self.replica = None
self.replica_stub = None

def __is_replica_alive(self):
if not self.replica:
return False
try:
self.replica_stub.Ack(BrokerEmpty())
except:
logger.error(f"Replica {self.replica} is not alive")
self.replica = None
self.replica_stub = None
if self.replica_messages:
logger.warning(f"Replica {self.replica} has {len(self.replica_messages)} messages")
self.messages.extend(self.replica_messages)
MESSAGES_LENGTH.inc(len(self.replica_messages))
self.replica_messages.clear()
REPLICA_MESSAGES_LENGTH.set(0)
return False
logger.debug(f"Replica {self.replica} is alive")
return True

def Ack(self, request, context):
return BrokerEmpty()

Expand All @@ -36,30 +55,28 @@ def Push(self, request, context):
logger.info(f"Pushed message {request.key} {request.value}")
PUSH_COUNTER.inc()
MESSAGES_LENGTH.inc()
if self.replica:
if self.__is_replica_alive():
self.replica_stub.PushReplica(MessageList(messages=[BrokerMessage(key=request.key, value=request.value)]))
return BrokerPushResponse(status=BrokerStatus.BROKER_SUCCESS, message="")

def Pull(self, request, context):
if not self.messages:
for i in range(10):
time.sleep(1)
if self.messages:
break
else:
logger.error(f"Pulling message {self.uuid} failed!\nBroker is not available")
return BrokerPullResponse(status=BrokerStatus.BROKER_FAILURE, message="No messages")
logger.error(f"Pulling message {self.uuid} failed!\nBroker is not available")
return BrokerPullResponse(status=BrokerStatus.BROKER_FAILURE, message="No messages")
key, value = self.messages.pop(0)
logger.warning(f"Pulled message from Broker {self.uuid}: {key} {value}")
PULL_COUNTER.inc()
MESSAGES_LENGTH.dec()
if self.replica:
if self.__is_replica_alive():
self.replica_stub.DropReplicaMessages(MessageCount(count=1))
return BrokerPullResponse(status=BrokerStatus.BROKER_SUCCESS, message=BrokerMessage(key=key, value=value))

def SetReplica(self, request, context):
self.replica = request.uuid
self.replica_stub = proto.broker_pb2_grpc.BrokerStub(grpc.insecure_channel(request.url))
if not self.__is_replica_alive():
logger.error(f"Replica {request.uuid} is not available")
return BrokerEmpty()
logger.info(f"Set replica for broker {self.uuid}: {request.uuid}")
batch = 10
for i in range(0, len(self.messages), batch):
Expand All @@ -69,13 +86,12 @@ def SetReplica(self, request, context):
return BrokerEmpty()

def LeadReplica(self, request, context):
if not self.replica:
logger.error("No replica available")
raise Exception("No replica")
if len(self.replica_messages) == 0:
return BrokerEmpty()
logger.info(f"Merging replica messages to queue messages of broker {self.uuid}")
self.messages.extend(self.replica_messages)
MESSAGES_LENGTH.inc(len(self.replica_messages))
if self.replica:
if self.__is_replica_alive():
batch = 10
for i in range(0, len(self.messages), batch):
self.replica_stub.PushReplica(MessageList(
Expand All @@ -87,7 +103,6 @@ def LeadReplica(self, request, context):

def DropReplica(self, request, context):
logger.warning(f"Dropping broker {self.uuid} replica")
self.replica = None
self.replica_messages.clear()
REPLICA_MESSAGES_LENGTH.set(0)
return BrokerEmpty()
Expand Down
2 changes: 1 addition & 1 deletion zookeeper/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def remove_node(self, node: str):
del self.brokers[node.uuid]
self.broker_ring.remove(node.uuid)
if len(self.broker_ring) == 1:
self.broker_ring[0].lead_replica()
self.brokers[self.broker_ring[0]].lead_replica()
BROKER_COUNTER.labels(name=node.uuid).dec()
logger.info(f"Removed broker {node.uuid}")

Expand Down

0 comments on commit 0c46e85

Please sign in to comment.