Skip to content

Commit

Permalink
integrated broker to zookeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
AlisaLC committed Feb 11, 2024
1 parent d9f7d25 commit 84fe20e
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 59 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ GATEWAY_HOST=gateway
GATEWAY_PORT=8080
ZOOKEEPER_HOST=zookeeper
ZOOKEEPER_PORT=2181
BROKERS=5
GF_SECURITY_ADMIN_USER=admin
GF_SECURITY_ADMIN_PASSWORD=grafana
4 changes: 2 additions & 2 deletions broker/proto/broker_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions broker/proto/broker_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ def __init__(self, channel):
)
self.LeadReplica = channel.unary_unary(
'/Broker/LeadReplica',
request_serializer=broker__pb2.ReplicaID.SerializeToString,
request_serializer=broker__pb2.Empty.SerializeToString,
response_deserializer=broker__pb2.Empty.FromString,
)
self.DropReplica = channel.unary_unary(
'/Broker/DropReplica',
request_serializer=broker__pb2.ReplicaID.SerializeToString,
request_serializer=broker__pb2.Empty.SerializeToString,
response_deserializer=broker__pb2.Empty.FromString,
)
self.PushReplica = channel.unary_unary(
Expand Down Expand Up @@ -132,12 +132,12 @@ def add_BrokerServicer_to_server(servicer, server):
),
'LeadReplica': grpc.unary_unary_rpc_method_handler(
servicer.LeadReplica,
request_deserializer=broker__pb2.ReplicaID.FromString,
request_deserializer=broker__pb2.Empty.FromString,
response_serializer=broker__pb2.Empty.SerializeToString,
),
'DropReplica': grpc.unary_unary_rpc_method_handler(
servicer.DropReplica,
request_deserializer=broker__pb2.ReplicaID.FromString,
request_deserializer=broker__pb2.Empty.FromString,
response_serializer=broker__pb2.Empty.SerializeToString,
),
'PushReplica': grpc.unary_unary_rpc_method_handler(
Expand Down Expand Up @@ -240,7 +240,7 @@ def LeadReplica(request,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/Broker/LeadReplica',
broker__pb2.ReplicaID.SerializeToString,
broker__pb2.Empty.SerializeToString,
broker__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
Expand All @@ -257,7 +257,7 @@ def DropReplica(request,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/Broker/DropReplica',
broker__pb2.ReplicaID.SerializeToString,
broker__pb2.Empty.SerializeToString,
broker__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
Expand Down
4 changes: 2 additions & 2 deletions proto/broker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ service Broker {
rpc Pull(Empty) returns (Message) {}

rpc SetReplica(ReplicaRequest) returns (Empty) {}
rpc LeadReplica(ReplicaID) returns (Empty) {}
rpc DropReplica(ReplicaID) returns (Empty) {}
rpc LeadReplica(Empty) returns (Empty) {}
rpc DropReplica(Empty) returns (Empty) {}

rpc PushReplica(MessageList) returns (PushResponse) {}
rpc DropReplicaMessages(MessageCount) returns (Empty) {}
Expand Down
120 changes: 120 additions & 0 deletions zookeeper/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from proto.broker_pb2 import Empty, Message, MessageList, ReplicaRequest, ReplicaID, MessageCount, PushResponse, PushStatus
import proto.broker_pb2_grpc
import grpc

import random
import bisect

from hashring import ConsistentHashRing

from prometheus_client import Gauge, Counter

BROKER_COUNTER = Gauge("zookeeper_broker_counter", "Number of brokers", ["name"])
PUSH_COUNTER = Counter("zookeeper_push_counter", "Number of messages pushed", ["queue", "key"])
PULL_COUNTER = Counter("zookeeper_pull_counter", "Number of messages pulled", ["queue", "key"])
BROKER_MESSAGE_COUNTER = Gauge("zookeeper_broker_message_counter", "Number of messages in broker", ["queue"])

class Broker:
def __init__(self, uuid, url) -> None:
self.uuid = uuid
self.url = url
self.stub = None
self.replica = None

def connect(self):
if self.stub:
return
self.stub = proto.broker_pb2_grpc.BrokerStub(grpc.insecure_channel(self.url))

def is_alive(self):
try:
self.stub.Ack(Empty())
except:
return False
return True

def push(self, key, value):
response = self.stub.Push(Message(key=key, value=value))
PUSH_COUNTER.labels(queue=self.uuid, key=key).inc()
BROKER_MESSAGE_COUNTER.labels(queue=self.uuid).inc()
return response

def pull(self):
response = self.stub.Pull(Empty())
PULL_COUNTER.labels(queue=self.uuid, key=response.key).inc()
BROKER_MESSAGE_COUNTER.labels(queue=self.uuid).dec()
return response

def set_replica(self, replica):
self.replica = replica
self.stub.SetReplica(ReplicaRequest(uuid=replica.uuid, url=replica.url))

def lead_replica(self):
return self.stub.LeadReplica(Empty())

def drop_replica(self):
return self.stub.DropReplica(Empty())

def __str__(self) -> str:
return self.uuid

class BrokerManager:
def __init__(self) -> None:
self.hash_ring = ConsistentHashRing()
self.brokers = {}
self.broker_ring = []

def add_node(self, node: str):
if node.uuid in self.brokers:
return
node.connect()
if not node.is_alive():
return
self.hash_ring.add_node(node)
self.brokers[node.uuid] = node
BROKER_COUNTER.labels(name=node.uuid).inc()
return self.__add_node_to_chain(node)

def __add_node_to_chain(self, node):
bisect.insort(self.broker_ring, node.uuid)
if len(self.broker_ring) < 2:
return
index = self.broker_ring.index(node.uuid)
replica_index = (index + 1) % len(self.broker_ring)
prev_replica_index = (index - 1) % len(self.broker_ring)
replica = self.brokers[self.broker_ring[replica_index]]
prev_replica = self.brokers[self.broker_ring[prev_replica_index]]
replica.drop_replica()
node.set_replica(replica)
prev_replica.set_replica(node)

def remove_node(self, node: str):
if node.uuid not in self.brokers or len(self.broker_ring) == 0:
return
if len(self.broker_ring) > 2:
self.__remove_node_from_chain(node)
self.hash_ring.remove_node(node)
del self.brokers[node.uuid]
self.broker_ring.remove(node.uuid)
if len(self.broker_ring) == 1:
self.broker_ring[0].lead_replica()
BROKER_COUNTER.labels(name=node.uuid).dec()

def __remove_node_from_chain(self, node):
index = self.broker_ring.index(node.uuid)
replica_index = (index + 1) % len(self.broker_ring)
prev_replica_index = (index - 1) % len(self.broker_ring)
replica = self.brokers[self.broker_ring[replica_index]]
prev_replica = self.brokers[self.broker_ring[prev_replica_index]]
node_message_count = BROKER_MESSAGE_COUNTER.labels(queue=node.uuid)._value.get()
replica_message_count = BROKER_MESSAGE_COUNTER.labels(queue=replica.uuid)._value.get()
BROKER_MESSAGE_COUNTER.labels(queue=replica.uuid).set(replica_message_count + node_message_count)
BROKER_MESSAGE_COUNTER.remove(queue=node.uuid)
replica.lead_replica()
prev_replica.set_replica(replica)

def get_node(self, key: str):
return self.hash_ring.get_node(key)

def get_random_node(self):
return random.choice(list(self.brokers.values()))
4 changes: 2 additions & 2 deletions zookeeper/proto/broker_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions zookeeper/proto/broker_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ def __init__(self, channel):
)
self.LeadReplica = channel.unary_unary(
'/Broker/LeadReplica',
request_serializer=broker__pb2.ReplicaID.SerializeToString,
request_serializer=broker__pb2.Empty.SerializeToString,
response_deserializer=broker__pb2.Empty.FromString,
)
self.DropReplica = channel.unary_unary(
'/Broker/DropReplica',
request_serializer=broker__pb2.ReplicaID.SerializeToString,
request_serializer=broker__pb2.Empty.SerializeToString,
response_deserializer=broker__pb2.Empty.FromString,
)
self.PushReplica = channel.unary_unary(
Expand Down Expand Up @@ -132,12 +132,12 @@ def add_BrokerServicer_to_server(servicer, server):
),
'LeadReplica': grpc.unary_unary_rpc_method_handler(
servicer.LeadReplica,
request_deserializer=broker__pb2.ReplicaID.FromString,
request_deserializer=broker__pb2.Empty.FromString,
response_serializer=broker__pb2.Empty.SerializeToString,
),
'DropReplica': grpc.unary_unary_rpc_method_handler(
servicer.DropReplica,
request_deserializer=broker__pb2.ReplicaID.FromString,
request_deserializer=broker__pb2.Empty.FromString,
response_serializer=broker__pb2.Empty.SerializeToString,
),
'PushReplica': grpc.unary_unary_rpc_method_handler(
Expand Down Expand Up @@ -240,7 +240,7 @@ def LeadReplica(request,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/Broker/LeadReplica',
broker__pb2.ReplicaID.SerializeToString,
broker__pb2.Empty.SerializeToString,
broker__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
Expand All @@ -257,7 +257,7 @@ def DropReplica(request,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/Broker/DropReplica',
broker__pb2.ReplicaID.SerializeToString,
broker__pb2.Empty.SerializeToString,
broker__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
Expand Down
78 changes: 38 additions & 40 deletions zookeeper/zookeeper.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,63 @@
import grpc
from proto.message_pb2 import Message, PushResponse, PushStatus
import proto.message_pb2_grpc as message_pb2_grpc
from proto.zookeeper_pb2 import Empty
import proto.zookeeper_pb2_grpc as zookeeper_pb2_grpc

from broker import BrokerManager, Broker

import os
import time

from concurrent.futures import ThreadPoolExecutor

import logging
from hashring import ConsistentHashRing

from prometheus_client import Counter, Gauge, start_http_server

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

hash_ring = ConsistentHashRing()

PUSH_COUNTER = Counter("zookeeper_push_counter", "Number of messages pushed", ["queue", "key"])
PULL_COUNTER = Counter("zookeeper_pull_counter", "Number of messages pulled", ["queue", "key"])
BROKER_COUNTER = Gauge("zookeeper_broker_counter", "Number of brokers", ["name"])
BROKER_MESSAGE_COUNTER = Gauge("zookeeper_broker_message_counter", "Number of messages in broker", ["queue"])


class MessageQueue(message_pb2_grpc.MessageQueueServicer):
def __init__(self):
self.broker_count = int(os.environ.get("BROKERS", 5))
self.queues = []
for i in range(self.broker_count):
self.queues.append([])
hash_ring.add_node(f"broker-{i}")
BROKER_COUNTER.labels(name=f"broker-{i}").inc()
self.mapping = {f"broker-{i}": i for i in range(self.broker_count)}
def __init__(self, broker_manager: BrokerManager):
self.broker_manager = broker_manager

def Push(self, request, context):
queue_id = self.mapping[hash_ring.get_node(request.key)]
logger.debug(f"Pushing message to queue {queue_id}")
queue = self.queues[queue_id]
queue.append((request.key, request.value))
PUSH_COUNTER.labels(queue=f"{queue_id}", key=request.key).inc()
BROKER_MESSAGE_COUNTER.labels(queue=f"{queue_id}").inc()
logger.info(f"Pushed message: {request.key} {request.value}")
return PushResponse(status=PushStatus.SUCCESS, message="")
queue = self.broker_manager.get_node(request.key)
if queue.is_alive():
response = queue.push(request.key, request.value)
logger.info(f"Pushed message: {request.key} {request.value}")
else:
self.broker_manager.remove_node(queue)
response = self.Push(request, context)
return response

def Pull(self, request, context):
logger.debug("Pulling message")
while True:
for i, queue in enumerate(self.queues):
if queue:
message = queue.pop(0)
PULL_COUNTER.labels(queue=f"{i}", key=message[0]).inc()
BROKER_MESSAGE_COUNTER.labels(queue=f"{i}").dec()
logger.debug(f"Pulled from queue {i}")
break
else:
time.sleep(1)
continue
break
logger.info(f"Pulled message: {message[0]} {message[1]}")
return Message(key=message[0], value=message[1])
queue = self.broker_manager.get_random_node()
message = queue.pull()
logger.info(f"Pulled message: {message.key} {message.value}")
return message

class Zookeeper(zookeeper_pb2_grpc.ZookeeperServicer):
def __init__(self, broker_manager: BrokerManager):
self.broker_manager = broker_manager

def Ack(self, request, context):
return Empty()

def Register(self, request, context):
logger.info(f"Registering broker: {request.uuid} {request.url}")
self.broker_manager.add_node(Broker(request.uuid, request.url))
return Empty()


if __name__ == "__main__":
start_http_server(8000)
server = grpc.server(ThreadPoolExecutor(max_workers=10))
message_pb2_grpc.add_MessageQueueServicer_to_server(MessageQueue(), server)
manager = BrokerManager()
message_pb2_grpc.add_MessageQueueServicer_to_server(MessageQueue(manager), server)
zookeeper_pb2_grpc.add_ZookeeperServicer_to_server(Zookeeper(manager), server)
server.add_insecure_port(f"[::]:{os.getenv('ZOOKEEPER_PORT')}")
server.start()
server.wait_for_termination()

0 comments on commit 84fe20e

Please sign in to comment.