Skip to content

Commit

Permalink
added logger for zookeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
AlisaLC committed Feb 11, 2024
1 parent 84fe20e commit 321e4f3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 19 deletions.
34 changes: 29 additions & 5 deletions zookeeper/broker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from proto.broker_pb2 import Empty, Message, MessageList, ReplicaRequest, ReplicaID, MessageCount, PushResponse, PushStatus
from proto.broker_pb2 import Empty, Message, ReplicaRequest
import proto.broker_pb2_grpc
import grpc

Expand All @@ -8,12 +8,15 @@
from hashring import ConsistentHashRing

from prometheus_client import Gauge, Counter

BROKER_COUNTER = Gauge("zookeeper_broker_counter", "Number of brokers", ["name"])
PUSH_COUNTER = Counter("zookeeper_push_counter", "Number of messages pushed", ["queue", "key"])
PULL_COUNTER = Counter("zookeeper_pull_counter", "Number of messages pulled", ["queue", "key"])
BROKER_MESSAGE_COUNTER = Gauge("zookeeper_broker_message_counter", "Number of messages in broker", ["queue"])

import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

class Broker:
def __init__(self, uuid, url) -> None:
self.uuid = uuid
Expand All @@ -30,7 +33,9 @@ def is_alive(self):
try:
self.stub.Ack(Empty())
except:
logger.error(f"Broker {self.uuid} is not alive")
return False
logger.debug(f"Broker {self.uuid} is alive")
return True

def push(self, key, value):
Expand All @@ -46,13 +51,16 @@ def pull(self):
return response

def set_replica(self, replica):
logger.info(f"Setting replica for {self.uuid} to {replica.uuid}")
self.replica = replica
self.stub.SetReplica(ReplicaRequest(uuid=replica.uuid, url=replica.url))

def lead_replica(self):
logger.info(f"Leading replica for {self.uuid}")
return self.stub.LeadReplica(Empty())

def drop_replica(self):
logger.info(f"Dropping replica for {self.uuid}")
return self.stub.DropReplica(Empty())

def __str__(self) -> str:
Expand All @@ -66,14 +74,16 @@ def __init__(self) -> None:

def add_node(self, node: str):
if node.uuid in self.brokers:
logger.debug(f"Broker {node.uuid} already exists")
return
node.connect()
if not node.is_alive():
return
self.hash_ring.add_node(node)
self.brokers[node.uuid] = node
BROKER_COUNTER.labels(name=node.uuid).inc()
return self.__add_node_to_chain(node)
self.__add_node_to_chain(node)
logger.info(f"Added broker {node.uuid}")

def __add_node_to_chain(self, node):
bisect.insort(self.broker_ring, node.uuid)
Expand All @@ -90,6 +100,7 @@ def __add_node_to_chain(self, node):

def remove_node(self, node: str):
if node.uuid not in self.brokers or len(self.broker_ring) == 0:
logger.debug(f"Broker {node.uuid} does not exist")
return
if len(self.broker_ring) > 2:
self.__remove_node_from_chain(node)
Expand All @@ -99,6 +110,7 @@ def remove_node(self, node: str):
if len(self.broker_ring) == 1:
self.broker_ring[0].lead_replica()
BROKER_COUNTER.labels(name=node.uuid).dec()
logger.info(f"Removed broker {node.uuid}")

def __remove_node_from_chain(self, node):
index = self.broker_ring.index(node.uuid)
Expand All @@ -114,7 +126,19 @@ def __remove_node_from_chain(self, node):
prev_replica.set_replica(replica)

def get_node(self, key: str):
return self.hash_ring.get_node(key)
if len(self.brokers) == 0:
return None
node = self.hash_ring.get_node(key)
if node.is_alive():
return node
self.remove_node(node)
return self.get_node(key)

def get_random_node(self):
return random.choice(list(self.brokers.values()))
if len(self.brokers) == 0:
return None
node = random.choice(list(self.brokers.values()))
if node.is_alive():
return node
self.remove_node(node)
return self.get_random_node()
27 changes: 13 additions & 14 deletions zookeeper/zookeeper.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
import grpc
from proto.message_pb2 import Message, PushResponse, PushStatus
from proto.message_pb2 import PushResponse, PushStatus
import proto.message_pb2_grpc as message_pb2_grpc
from proto.zookeeper_pb2 import Empty
import proto.zookeeper_pb2_grpc as zookeeper_pb2_grpc

from broker import BrokerManager, Broker

import os
import time

from concurrent.futures import ThreadPoolExecutor

import logging

from prometheus_client import Counter, Gauge, start_http_server
from prometheus_client import start_http_server

import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

Expand All @@ -24,19 +21,21 @@ def __init__(self, broker_manager: BrokerManager):

def Push(self, request, context):
queue = self.broker_manager.get_node(request.key)
if queue.is_alive():
response = queue.push(request.key, request.value)
logger.info(f"Pushed message: {request.key} {request.value}")
else:
self.broker_manager.remove_node(queue)
response = self.Push(request, context)
if not queue:
logger.error(f"No queue for key: {request.key}")
return PushResponse(status=PushStatus.FAILURE, message="No queue for key")
response = queue.push(request.key, request.value)
logger.info(f"Pushed message: {request.key} {request.value}")
return response

def Pull(self, request, context):
logger.debug("Pulling message")
queue = self.broker_manager.get_random_node()
if not queue:
logger.error("No brokers available")
return
logger.debug(f"Pulling from queue: {queue.uuid}")
message = queue.pull()
logger.info(f"Pulled message: {message.key} {message.value}")
logger.debug(f"Pulled message: {message.key} {message.value}")
return message

class Zookeeper(zookeeper_pb2_grpc.ZookeeperServicer):
Expand Down

0 comments on commit 321e4f3

Please sign in to comment.