From 321e4f3f225c8e7bba961ad8ff8119ec2c0f5407 Mon Sep 17 00:00:00 2001 From: AlisaLC Date: Mon, 12 Feb 2024 00:47:35 +0330 Subject: [PATCH] added logger for zookeeper --- zookeeper/broker.py | 34 +++++++++++++++++++++++++++++----- zookeeper/zookeeper.py | 27 +++++++++++++-------------- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/zookeeper/broker.py b/zookeeper/broker.py index 7cb0fb8..f80e402 100644 --- a/zookeeper/broker.py +++ b/zookeeper/broker.py @@ -1,4 +1,4 @@ -from proto.broker_pb2 import Empty, Message, MessageList, ReplicaRequest, ReplicaID, MessageCount, PushResponse, PushStatus +from proto.broker_pb2 import Empty, Message, ReplicaRequest import proto.broker_pb2_grpc import grpc @@ -8,12 +8,15 @@ from hashring import ConsistentHashRing from prometheus_client import Gauge, Counter - BROKER_COUNTER = Gauge("zookeeper_broker_counter", "Number of brokers", ["name"]) PUSH_COUNTER = Counter("zookeeper_push_counter", "Number of messages pushed", ["queue", "key"]) PULL_COUNTER = Counter("zookeeper_pull_counter", "Number of messages pulled", ["queue", "key"]) BROKER_MESSAGE_COUNTER = Gauge("zookeeper_broker_message_counter", "Number of messages in broker", ["queue"]) +import logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + class Broker: def __init__(self, uuid, url) -> None: self.uuid = uuid @@ -30,7 +33,9 @@ def is_alive(self): try: self.stub.Ack(Empty()) except: + logger.error(f"Broker {self.uuid} is not alive") return False + logger.debug(f"Broker {self.uuid} is alive") return True def push(self, key, value): @@ -46,13 +51,16 @@ def pull(self): return response def set_replica(self, replica): + logger.info(f"Setting replica for {self.uuid} to {replica.uuid}") self.replica = replica self.stub.SetReplica(ReplicaRequest(uuid=replica.uuid, url=replica.url)) def lead_replica(self): + logger.info(f"Leading replica for {self.uuid}") return self.stub.LeadReplica(Empty()) def drop_replica(self): + logger.info(f"Dropping replica for {self.uuid}") return self.stub.DropReplica(Empty()) def __str__(self) -> str: @@ -66,6 +74,7 @@ def __init__(self) -> None: def add_node(self, node: str): if node.uuid in self.brokers: + logger.debug(f"Broker {node.uuid} already exists") return node.connect() if not node.is_alive(): @@ -73,7 +82,8 @@ def add_node(self, node: str): self.hash_ring.add_node(node) self.brokers[node.uuid] = node BROKER_COUNTER.labels(name=node.uuid).inc() - return self.__add_node_to_chain(node) + self.__add_node_to_chain(node) + logger.info(f"Added broker {node.uuid}") def __add_node_to_chain(self, node): bisect.insort(self.broker_ring, node.uuid) @@ -90,6 +100,7 @@ def __add_node_to_chain(self, node): def remove_node(self, node: str): if node.uuid not in self.brokers or len(self.broker_ring) == 0: + logger.debug(f"Broker {node.uuid} does not exist") return if len(self.broker_ring) > 2: self.__remove_node_from_chain(node) @@ -99,6 +110,7 @@ def remove_node(self, node: str): if len(self.broker_ring) == 1: self.broker_ring[0].lead_replica() BROKER_COUNTER.labels(name=node.uuid).dec() + logger.info(f"Removed broker {node.uuid}") def __remove_node_from_chain(self, node): index = self.broker_ring.index(node.uuid) @@ -114,7 +126,19 @@ def __remove_node_from_chain(self, node): prev_replica.set_replica(replica) def get_node(self, key: str): - return self.hash_ring.get_node(key) + if len(self.brokers) == 0: + return None + node = self.hash_ring.get_node(key) + if node.is_alive(): + return node + self.remove_node(node) + return self.get_node(key) def get_random_node(self): - return random.choice(list(self.brokers.values())) \ No newline at end of file + if len(self.brokers) == 0: + return None + node = random.choice(list(self.brokers.values())) + if node.is_alive(): + return node + self.remove_node(node) + return self.get_random_node() \ No newline at end of file diff --git a/zookeeper/zookeeper.py b/zookeeper/zookeeper.py index dba07b0..275b1df 100644 --- a/zookeeper/zookeeper.py +++ b/zookeeper/zookeeper.py @@ -1,5 +1,5 @@ import grpc -from proto.message_pb2 import Message, PushResponse, PushStatus +from proto.message_pb2 import PushResponse, PushStatus import proto.message_pb2_grpc as message_pb2_grpc from proto.zookeeper_pb2 import Empty import proto.zookeeper_pb2_grpc as zookeeper_pb2_grpc @@ -7,14 +7,11 @@ from broker import BrokerManager, Broker import os -import time - from concurrent.futures import ThreadPoolExecutor -import logging - -from prometheus_client import Counter, Gauge, start_http_server +from prometheus_client import start_http_server +import logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) @@ -24,19 +21,21 @@ def __init__(self, broker_manager: BrokerManager): def Push(self, request, context): queue = self.broker_manager.get_node(request.key) - if queue.is_alive(): - response = queue.push(request.key, request.value) - logger.info(f"Pushed message: {request.key} {request.value}") - else: - self.broker_manager.remove_node(queue) - response = self.Push(request, context) + if not queue: + logger.error(f"No queue for key: {request.key}") + return PushResponse(status=PushStatus.FAILURE, message="No queue for key") + response = queue.push(request.key, request.value) + logger.info(f"Pushed message: {request.key} {request.value}") return response def Pull(self, request, context): - logger.debug("Pulling message") queue = self.broker_manager.get_random_node() + if not queue: + logger.error("No brokers available") + return + logger.debug(f"Pulling from queue: {queue.uuid}") message = queue.pull() - logger.info(f"Pulled message: {message.key} {message.value}") + logger.debug(f"Pulled message: {message.key} {message.value}") return message class Zookeeper(zookeeper_pb2_grpc.ZookeeperServicer):