Skip to content

Commit

Permalink
added broker to compose
Browse files Browse the repository at this point in the history
  • Loading branch information
AlisaLC committed Feb 13, 2024
1 parent 7e590d9 commit b8ef37d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
6 changes: 6 additions & 0 deletions broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ def LeadReplica(self, request, context):
logger.info(f"Merging replica messages to queue messages of broker {self.uuid}")
self.messages.extend(self.replica_messages)
MESSAGES_LENGTH.inc(len(self.replica_messages))
if self.replica:
batch = 10
for i in range(0, len(self.messages), batch):
self.replica_stub.PushReplica(MessageList(
messages=[BrokerMessage(key=k, value=v) for k, v in self.messages[i:i+batch]]
))
self.replica_messages.clear()
REPLICA_MESSAGES_LENGTH.set(0)
return BrokerEmpty()
Expand Down
28 changes: 27 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,30 @@ services:
env_file:
- .env
volumes:
- ./grafana:/etc/grafana/provisioning/datasources
- ./grafana:/etc/grafana/provisioning/datasources

broker-1:
container_name: broker-1
image: minikafka:broker
ports:
- 8001:8001
env_file:
- .env
environment:
- BROKER_HOST=broker-1
- BROKER_PORT=8001
depends_on:
- zookeeper

broker-2:
container_name: broker-2
image: minikafka:broker
ports:
- 8002:8002
env_file:
- .env
environment:
- BROKER_HOST=broker-2
- BROKER_PORT=8002
depends_on:
- zookeeper
2 changes: 1 addition & 1 deletion zookeeper/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def push(self, key, value):
def pull(self):
response = self.stub.Pull(BrokerEmpty())
if response.status == BrokerStatus.BROKER_SUCCESS:
PULL_COUNTER.labels(queue=self.uuid, key=response.key).inc()
PULL_COUNTER.labels(queue=self.uuid, key=response.message.key).inc()
BROKER_MESSAGE_COUNTER.labels(queue=self.uuid).dec()
return response

Expand Down

0 comments on commit b8ef37d

Please sign in to comment.