diff --git a/.env b/.env index c4427e3..a21adfe 100644 --- a/.env +++ b/.env @@ -2,6 +2,5 @@ GATEWAY_HOST=gateway GATEWAY_PORT=8080 ZOOKEEPER_HOST=zookeeper ZOOKEEPER_PORT=2181 -BROKERS=5 GF_SECURITY_ADMIN_USER=admin GF_SECURITY_ADMIN_PASSWORD=grafana \ No newline at end of file diff --git a/broker/proto/broker_pb2.py b/broker/proto/broker_pb2.py index 2d40199..7a0777d 100644 --- a/broker/proto/broker_pb2.py +++ b/broker/proto/broker_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x62roker.proto\"%\n\x07Message\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\"?\n\x0bMessageList\x12\x14\n\x0creplica_uuid\x18\x01 \x01(\t\x12\x1a\n\x08messages\x18\x02 \x03(\x0b\x32\x08.Message\"+\n\x0eReplicaRequest\x12\x0c\n\x04uuid\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\"\x19\n\tReplicaID\x12\x0c\n\x04uuid\x18\x01 \x01(\t\"3\n\x0cMessageCount\x12\x14\n\x0creplica_uuid\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x05\"+\n\x0cPushResponse\x12\x1b\n\x06status\x18\x01 \x01(\x0e\x32\x0b.PushStatus\"\x07\n\x05\x45mpty*&\n\nPushStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x32\xb1\x02\n\x06\x42roker\x12\x17\n\x03\x41\x63k\x12\x06.Empty\x1a\x06.Empty\"\x00\x12!\n\x04Push\x12\x08.Message\x1a\r.PushResponse\"\x00\x12\x1a\n\x04Pull\x12\x06.Empty\x1a\x08.Message\"\x00\x12\'\n\nSetReplica\x12\x0f.ReplicaRequest\x1a\x06.Empty\"\x00\x12#\n\x0bLeadReplica\x12\n.ReplicaID\x1a\x06.Empty\"\x00\x12#\n\x0b\x44ropReplica\x12\n.ReplicaID\x1a\x06.Empty\"\x00\x12,\n\x0bPushReplica\x12\x0c.MessageList\x1a\r.PushResponse\"\x00\x12.\n\x13\x44ropReplicaMessages\x12\r.MessageCount\x1a\x06.Empty\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x62roker.proto\"%\n\x07Message\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\"?\n\x0bMessageList\x12\x14\n\x0creplica_uuid\x18\x01 \x01(\t\x12\x1a\n\x08messages\x18\x02 \x03(\x0b\x32\x08.Message\"+\n\x0eReplicaRequest\x12\x0c\n\x04uuid\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\"\x19\n\tReplicaID\x12\x0c\n\x04uuid\x18\x01 \x01(\t\"3\n\x0cMessageCount\x12\x14\n\x0creplica_uuid\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x05\"+\n\x0cPushResponse\x12\x1b\n\x06status\x18\x01 \x01(\x0e\x32\x0b.PushStatus\"\x07\n\x05\x45mpty*&\n\nPushStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x32\xa9\x02\n\x06\x42roker\x12\x17\n\x03\x41\x63k\x12\x06.Empty\x1a\x06.Empty\"\x00\x12!\n\x04Push\x12\x08.Message\x1a\r.PushResponse\"\x00\x12\x1a\n\x04Pull\x12\x06.Empty\x1a\x08.Message\"\x00\x12\'\n\nSetReplica\x12\x0f.ReplicaRequest\x1a\x06.Empty\"\x00\x12\x1f\n\x0bLeadReplica\x12\x06.Empty\x1a\x06.Empty\"\x00\x12\x1f\n\x0b\x44ropReplica\x12\x06.Empty\x1a\x06.Empty\"\x00\x12,\n\x0bPushReplica\x12\x0c.MessageList\x1a\r.PushResponse\"\x00\x12.\n\x13\x44ropReplicaMessages\x12\r.MessageCount\x1a\x06.Empty\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -38,5 +38,5 @@ _globals['_EMPTY']._serialized_start=290 _globals['_EMPTY']._serialized_end=297 _globals['_BROKER']._serialized_start=340 - _globals['_BROKER']._serialized_end=645 + _globals['_BROKER']._serialized_end=637 # @@protoc_insertion_point(module_scope) diff --git a/broker/proto/broker_pb2_grpc.py b/broker/proto/broker_pb2_grpc.py index 6cdefa5..d2ba147 100644 --- a/broker/proto/broker_pb2_grpc.py +++ b/broker/proto/broker_pb2_grpc.py @@ -36,12 +36,12 @@ def __init__(self, channel): ) self.LeadReplica = channel.unary_unary( '/Broker/LeadReplica', - request_serializer=broker__pb2.ReplicaID.SerializeToString, + request_serializer=broker__pb2.Empty.SerializeToString, response_deserializer=broker__pb2.Empty.FromString, ) self.DropReplica = channel.unary_unary( '/Broker/DropReplica', - request_serializer=broker__pb2.ReplicaID.SerializeToString, + request_serializer=broker__pb2.Empty.SerializeToString, response_deserializer=broker__pb2.Empty.FromString, ) self.PushReplica = channel.unary_unary( @@ -132,12 +132,12 @@ def add_BrokerServicer_to_server(servicer, server): ), 'LeadReplica': grpc.unary_unary_rpc_method_handler( servicer.LeadReplica, - request_deserializer=broker__pb2.ReplicaID.FromString, + request_deserializer=broker__pb2.Empty.FromString, response_serializer=broker__pb2.Empty.SerializeToString, ), 'DropReplica': grpc.unary_unary_rpc_method_handler( servicer.DropReplica, - request_deserializer=broker__pb2.ReplicaID.FromString, + request_deserializer=broker__pb2.Empty.FromString, response_serializer=broker__pb2.Empty.SerializeToString, ), 'PushReplica': grpc.unary_unary_rpc_method_handler( @@ -240,7 +240,7 @@ def LeadReplica(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/Broker/LeadReplica', - broker__pb2.ReplicaID.SerializeToString, + broker__pb2.Empty.SerializeToString, broker__pb2.Empty.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -257,7 +257,7 @@ def DropReplica(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/Broker/DropReplica', - broker__pb2.ReplicaID.SerializeToString, + broker__pb2.Empty.SerializeToString, broker__pb2.Empty.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/proto/broker.proto b/proto/broker.proto index c750239..939768b 100644 --- a/proto/broker.proto +++ b/proto/broker.proto @@ -42,8 +42,8 @@ service Broker { rpc Pull(Empty) returns (Message) {} rpc SetReplica(ReplicaRequest) returns (Empty) {} - rpc LeadReplica(ReplicaID) returns (Empty) {} - rpc DropReplica(ReplicaID) returns (Empty) {} + rpc LeadReplica(Empty) returns (Empty) {} + rpc DropReplica(Empty) returns (Empty) {} rpc PushReplica(MessageList) returns (PushResponse) {} rpc DropReplicaMessages(MessageCount) returns (Empty) {} diff --git a/zookeeper/broker.py b/zookeeper/broker.py new file mode 100644 index 0000000..7cb0fb8 --- /dev/null +++ b/zookeeper/broker.py @@ -0,0 +1,120 @@ +from proto.broker_pb2 import Empty, Message, MessageList, ReplicaRequest, ReplicaID, MessageCount, PushResponse, PushStatus +import proto.broker_pb2_grpc +import grpc + +import random +import bisect + +from hashring import ConsistentHashRing + +from prometheus_client import Gauge, Counter + +BROKER_COUNTER = Gauge("zookeeper_broker_counter", "Number of brokers", ["name"]) +PUSH_COUNTER = Counter("zookeeper_push_counter", "Number of messages pushed", ["queue", "key"]) +PULL_COUNTER = Counter("zookeeper_pull_counter", "Number of messages pulled", ["queue", "key"]) +BROKER_MESSAGE_COUNTER = Gauge("zookeeper_broker_message_counter", "Number of messages in broker", ["queue"]) + +class Broker: + def __init__(self, uuid, url) -> None: + self.uuid = uuid + self.url = url + self.stub = None + self.replica = None + + def connect(self): + if self.stub: + return + self.stub = proto.broker_pb2_grpc.BrokerStub(grpc.insecure_channel(self.url)) + + def is_alive(self): + try: + self.stub.Ack(Empty()) + except: + return False + return True + + def push(self, key, value): + response = self.stub.Push(Message(key=key, value=value)) + PUSH_COUNTER.labels(queue=self.uuid, key=key).inc() + BROKER_MESSAGE_COUNTER.labels(queue=self.uuid).inc() + return response + + def pull(self): + response = self.stub.Pull(Empty()) + PULL_COUNTER.labels(queue=self.uuid, key=response.key).inc() + BROKER_MESSAGE_COUNTER.labels(queue=self.uuid).dec() + return response + + def set_replica(self, replica): + self.replica = replica + self.stub.SetReplica(ReplicaRequest(uuid=replica.uuid, url=replica.url)) + + def lead_replica(self): + return self.stub.LeadReplica(Empty()) + + def drop_replica(self): + return self.stub.DropReplica(Empty()) + + def __str__(self) -> str: + return self.uuid + +class BrokerManager: + def __init__(self) -> None: + self.hash_ring = ConsistentHashRing() + self.brokers = {} + self.broker_ring = [] + + def add_node(self, node: str): + if node.uuid in self.brokers: + return + node.connect() + if not node.is_alive(): + return + self.hash_ring.add_node(node) + self.brokers[node.uuid] = node + BROKER_COUNTER.labels(name=node.uuid).inc() + return self.__add_node_to_chain(node) + + def __add_node_to_chain(self, node): + bisect.insort(self.broker_ring, node.uuid) + if len(self.broker_ring) < 2: + return + index = self.broker_ring.index(node.uuid) + replica_index = (index + 1) % len(self.broker_ring) + prev_replica_index = (index - 1) % len(self.broker_ring) + replica = self.brokers[self.broker_ring[replica_index]] + prev_replica = self.brokers[self.broker_ring[prev_replica_index]] + replica.drop_replica() + node.set_replica(replica) + prev_replica.set_replica(node) + + def remove_node(self, node: str): + if node.uuid not in self.brokers or len(self.broker_ring) == 0: + return + if len(self.broker_ring) > 2: + self.__remove_node_from_chain(node) + self.hash_ring.remove_node(node) + del self.brokers[node.uuid] + self.broker_ring.remove(node.uuid) + if len(self.broker_ring) == 1: + self.broker_ring[0].lead_replica() + BROKER_COUNTER.labels(name=node.uuid).dec() + + def __remove_node_from_chain(self, node): + index = self.broker_ring.index(node.uuid) + replica_index = (index + 1) % len(self.broker_ring) + prev_replica_index = (index - 1) % len(self.broker_ring) + replica = self.brokers[self.broker_ring[replica_index]] + prev_replica = self.brokers[self.broker_ring[prev_replica_index]] + node_message_count = BROKER_MESSAGE_COUNTER.labels(queue=node.uuid)._value.get() + replica_message_count = BROKER_MESSAGE_COUNTER.labels(queue=replica.uuid)._value.get() + BROKER_MESSAGE_COUNTER.labels(queue=replica.uuid).set(replica_message_count + node_message_count) + BROKER_MESSAGE_COUNTER.remove(queue=node.uuid) + replica.lead_replica() + prev_replica.set_replica(replica) + + def get_node(self, key: str): + return self.hash_ring.get_node(key) + + def get_random_node(self): + return random.choice(list(self.brokers.values())) \ No newline at end of file diff --git a/zookeeper/proto/broker_pb2.py b/zookeeper/proto/broker_pb2.py index 2d40199..7a0777d 100644 --- a/zookeeper/proto/broker_pb2.py +++ b/zookeeper/proto/broker_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x62roker.proto\"%\n\x07Message\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\"?\n\x0bMessageList\x12\x14\n\x0creplica_uuid\x18\x01 \x01(\t\x12\x1a\n\x08messages\x18\x02 \x03(\x0b\x32\x08.Message\"+\n\x0eReplicaRequest\x12\x0c\n\x04uuid\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\"\x19\n\tReplicaID\x12\x0c\n\x04uuid\x18\x01 \x01(\t\"3\n\x0cMessageCount\x12\x14\n\x0creplica_uuid\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x05\"+\n\x0cPushResponse\x12\x1b\n\x06status\x18\x01 \x01(\x0e\x32\x0b.PushStatus\"\x07\n\x05\x45mpty*&\n\nPushStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x32\xb1\x02\n\x06\x42roker\x12\x17\n\x03\x41\x63k\x12\x06.Empty\x1a\x06.Empty\"\x00\x12!\n\x04Push\x12\x08.Message\x1a\r.PushResponse\"\x00\x12\x1a\n\x04Pull\x12\x06.Empty\x1a\x08.Message\"\x00\x12\'\n\nSetReplica\x12\x0f.ReplicaRequest\x1a\x06.Empty\"\x00\x12#\n\x0bLeadReplica\x12\n.ReplicaID\x1a\x06.Empty\"\x00\x12#\n\x0b\x44ropReplica\x12\n.ReplicaID\x1a\x06.Empty\"\x00\x12,\n\x0bPushReplica\x12\x0c.MessageList\x1a\r.PushResponse\"\x00\x12.\n\x13\x44ropReplicaMessages\x12\r.MessageCount\x1a\x06.Empty\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0c\x62roker.proto\"%\n\x07Message\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c\"?\n\x0bMessageList\x12\x14\n\x0creplica_uuid\x18\x01 \x01(\t\x12\x1a\n\x08messages\x18\x02 \x03(\x0b\x32\x08.Message\"+\n\x0eReplicaRequest\x12\x0c\n\x04uuid\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\"\x19\n\tReplicaID\x12\x0c\n\x04uuid\x18\x01 \x01(\t\"3\n\x0cMessageCount\x12\x14\n\x0creplica_uuid\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x05\"+\n\x0cPushResponse\x12\x1b\n\x06status\x18\x01 \x01(\x0e\x32\x0b.PushStatus\"\x07\n\x05\x45mpty*&\n\nPushStatus\x12\x0b\n\x07SUCCESS\x10\x00\x12\x0b\n\x07\x46\x41ILURE\x10\x01\x32\xa9\x02\n\x06\x42roker\x12\x17\n\x03\x41\x63k\x12\x06.Empty\x1a\x06.Empty\"\x00\x12!\n\x04Push\x12\x08.Message\x1a\r.PushResponse\"\x00\x12\x1a\n\x04Pull\x12\x06.Empty\x1a\x08.Message\"\x00\x12\'\n\nSetReplica\x12\x0f.ReplicaRequest\x1a\x06.Empty\"\x00\x12\x1f\n\x0bLeadReplica\x12\x06.Empty\x1a\x06.Empty\"\x00\x12\x1f\n\x0b\x44ropReplica\x12\x06.Empty\x1a\x06.Empty\"\x00\x12,\n\x0bPushReplica\x12\x0c.MessageList\x1a\r.PushResponse\"\x00\x12.\n\x13\x44ropReplicaMessages\x12\r.MessageCount\x1a\x06.Empty\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -38,5 +38,5 @@ _globals['_EMPTY']._serialized_start=290 _globals['_EMPTY']._serialized_end=297 _globals['_BROKER']._serialized_start=340 - _globals['_BROKER']._serialized_end=645 + _globals['_BROKER']._serialized_end=637 # @@protoc_insertion_point(module_scope) diff --git a/zookeeper/proto/broker_pb2_grpc.py b/zookeeper/proto/broker_pb2_grpc.py index 6cdefa5..d2ba147 100644 --- a/zookeeper/proto/broker_pb2_grpc.py +++ b/zookeeper/proto/broker_pb2_grpc.py @@ -36,12 +36,12 @@ def __init__(self, channel): ) self.LeadReplica = channel.unary_unary( '/Broker/LeadReplica', - request_serializer=broker__pb2.ReplicaID.SerializeToString, + request_serializer=broker__pb2.Empty.SerializeToString, response_deserializer=broker__pb2.Empty.FromString, ) self.DropReplica = channel.unary_unary( '/Broker/DropReplica', - request_serializer=broker__pb2.ReplicaID.SerializeToString, + request_serializer=broker__pb2.Empty.SerializeToString, response_deserializer=broker__pb2.Empty.FromString, ) self.PushReplica = channel.unary_unary( @@ -132,12 +132,12 @@ def add_BrokerServicer_to_server(servicer, server): ), 'LeadReplica': grpc.unary_unary_rpc_method_handler( servicer.LeadReplica, - request_deserializer=broker__pb2.ReplicaID.FromString, + request_deserializer=broker__pb2.Empty.FromString, response_serializer=broker__pb2.Empty.SerializeToString, ), 'DropReplica': grpc.unary_unary_rpc_method_handler( servicer.DropReplica, - request_deserializer=broker__pb2.ReplicaID.FromString, + request_deserializer=broker__pb2.Empty.FromString, response_serializer=broker__pb2.Empty.SerializeToString, ), 'PushReplica': grpc.unary_unary_rpc_method_handler( @@ -240,7 +240,7 @@ def LeadReplica(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/Broker/LeadReplica', - broker__pb2.ReplicaID.SerializeToString, + broker__pb2.Empty.SerializeToString, broker__pb2.Empty.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -257,7 +257,7 @@ def DropReplica(request, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/Broker/DropReplica', - broker__pb2.ReplicaID.SerializeToString, + broker__pb2.Empty.SerializeToString, broker__pb2.Empty.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/zookeeper/zookeeper.py b/zookeeper/zookeeper.py index f89bd1c..dba07b0 100644 --- a/zookeeper/zookeeper.py +++ b/zookeeper/zookeeper.py @@ -1,65 +1,63 @@ import grpc from proto.message_pb2 import Message, PushResponse, PushStatus import proto.message_pb2_grpc as message_pb2_grpc +from proto.zookeeper_pb2 import Empty +import proto.zookeeper_pb2_grpc as zookeeper_pb2_grpc + +from broker import BrokerManager, Broker + import os import time + from concurrent.futures import ThreadPoolExecutor + import logging -from hashring import ConsistentHashRing + from prometheus_client import Counter, Gauge, start_http_server logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) -hash_ring = ConsistentHashRing() - -PUSH_COUNTER = Counter("zookeeper_push_counter", "Number of messages pushed", ["queue", "key"]) -PULL_COUNTER = Counter("zookeeper_pull_counter", "Number of messages pulled", ["queue", "key"]) -BROKER_COUNTER = Gauge("zookeeper_broker_counter", "Number of brokers", ["name"]) -BROKER_MESSAGE_COUNTER = Gauge("zookeeper_broker_message_counter", "Number of messages in broker", ["queue"]) - - class MessageQueue(message_pb2_grpc.MessageQueueServicer): - def __init__(self): - self.broker_count = int(os.environ.get("BROKERS", 5)) - self.queues = [] - for i in range(self.broker_count): - self.queues.append([]) - hash_ring.add_node(f"broker-{i}") - BROKER_COUNTER.labels(name=f"broker-{i}").inc() - self.mapping = {f"broker-{i}": i for i in range(self.broker_count)} + def __init__(self, broker_manager: BrokerManager): + self.broker_manager = broker_manager def Push(self, request, context): - queue_id = self.mapping[hash_ring.get_node(request.key)] - logger.debug(f"Pushing message to queue {queue_id}") - queue = self.queues[queue_id] - queue.append((request.key, request.value)) - PUSH_COUNTER.labels(queue=f"{queue_id}", key=request.key).inc() - BROKER_MESSAGE_COUNTER.labels(queue=f"{queue_id}").inc() - logger.info(f"Pushed message: {request.key} {request.value}") - return PushResponse(status=PushStatus.SUCCESS, message="") + queue = self.broker_manager.get_node(request.key) + if queue.is_alive(): + response = queue.push(request.key, request.value) + logger.info(f"Pushed message: {request.key} {request.value}") + else: + self.broker_manager.remove_node(queue) + response = self.Push(request, context) + return response def Pull(self, request, context): logger.debug("Pulling message") - while True: - for i, queue in enumerate(self.queues): - if queue: - message = queue.pop(0) - PULL_COUNTER.labels(queue=f"{i}", key=message[0]).inc() - BROKER_MESSAGE_COUNTER.labels(queue=f"{i}").dec() - logger.debug(f"Pulled from queue {i}") - break - else: - time.sleep(1) - continue - break - logger.info(f"Pulled message: {message[0]} {message[1]}") - return Message(key=message[0], value=message[1]) + queue = self.broker_manager.get_random_node() + message = queue.pull() + logger.info(f"Pulled message: {message.key} {message.value}") + return message + +class Zookeeper(zookeeper_pb2_grpc.ZookeeperServicer): + def __init__(self, broker_manager: BrokerManager): + self.broker_manager = broker_manager + + def Ack(self, request, context): + return Empty() + + def Register(self, request, context): + logger.info(f"Registering broker: {request.uuid} {request.url}") + self.broker_manager.add_node(Broker(request.uuid, request.url)) + return Empty() + if __name__ == "__main__": start_http_server(8000) server = grpc.server(ThreadPoolExecutor(max_workers=10)) - message_pb2_grpc.add_MessageQueueServicer_to_server(MessageQueue(), server) + manager = BrokerManager() + message_pb2_grpc.add_MessageQueueServicer_to_server(MessageQueue(manager), server) + zookeeper_pb2_grpc.add_ZookeeperServicer_to_server(Zookeeper(manager), server) server.add_insecure_port(f"[::]:{os.getenv('ZOOKEEPER_PORT')}") server.start() server.wait_for_termination()