Skip to content

Commit

Permalink
adding test for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
tavallaie committed Aug 4, 2024
1 parent c16a964 commit 4f478d4
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 19 deletions.
72 changes: 72 additions & 0 deletions .github/workflows/test-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
name: Kafka Tests

on:
push:
branches:
- main
paths:
- 'connectiva/protocols/kafka_protocol.py'
- 'tests/test_kafka_protocol.py'
- 'pyproject.toml'
pull_request:
branches:
- main
paths:
- 'connectiva/protocols/kafka_protocol.py'
- 'tests/test_kafka_protocol.py'
- 'pyproject.toml'

jobs:
test-kafka:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install Poetry
run: |
python -m pip install --upgrade pip
python -m pip install poetry
- name: Add Poetry to Path
run: echo "export PATH=\"$HOME/.local/bin:\$PATH\"" >> $GITHUB_ENV

- name: Install dependencies
run: poetry install

- name: Start Kafka and Zookeeper
run: |
echo "Starting Kafka and Zookeeper services on Python ${{ matrix.python-version }}..."
docker-compose -f docker-compose.kafka.yml up -d
- name: Wait for Kafka to be healthy
run: |
echo "Waiting for Kafka service to be healthy..."
for i in {1..10}; do
if [ "$(docker inspect --format='{{json .State.Health.Status}}' $(docker ps -q -f name=kafka))" == "\"healthy\"" ]; then
echo "Kafka is healthy!"
break
else
echo "Kafka is not healthy yet. Waiting..."
sleep 10
fi
done
- name: Run Kafka Tests
run: |
echo "Running Kafka tests on Python ${{ matrix.python-version }}..."
poetry run python -m unittest discover -s tests -p 'test_kafka_protocol.py'
- name: Stop Kafka and Zookeeper
run: |
echo "Stopping Kafka and Zookeeper services..."
docker-compose -f docker-compose.kafka.yml down
47 changes: 28 additions & 19 deletions connectiva/protocols/kafka_protocol.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,82 @@
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from typing import Dict, Any
from ..interfaces import CommunicationMethod
from ..message import Message
from connectiva import CommunicationMethod, Message
import json
import logging

class KafkaProtocol(CommunicationMethod):
"""
Kafka communication class for producing and consuming messages from Kafka topics.
"""

def __init__(self, **kwargs):
self.broker_list = kwargs.get("broker_list").split(',')
self.endpoint = kwargs.get("endpoint")
self.topic = kwargs.get("topic")
self.group_id = kwargs.get("group_id")
self.producer = None
self.consumer = None

# Parse the endpoint to get broker list
self.endpoint = kwargs.get("endpoint")

# Set up logger
self.logger = logging.getLogger(self.__class__.__name__)



def connect(self):
print(f"Connecting to Kafka brokers at {self.broker_list}...")
self.logger.info(f"Connecting to Kafka brokers at {self.endpoint}...")
try:
# Initialize Kafka producer
self.producer = KafkaProducer(
bootstrap_servers=self.broker_list,
bootstrap_servers=self.endpoint,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Kafka producer connected.")
self.logger.info("Kafka producer connected.")

# Initialize Kafka consumer
if self.group_id:
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.broker_list,
bootstrap_servers=self.endpoint,
group_id=self.group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print("Kafka consumer connected.")
self.logger.info("Kafka consumer connected.")
else:
print("No consumer group ID provided; skipping consumer initialization.")
self.logger.info("No consumer group ID provided; skipping consumer initialization.")

except KafkaError as e:
print(f"Failed to connect to Kafka: {e}")
self.logger.error(f"Failed to connect to Kafka: {e}")
raise

def send(self, message: Message) -> Dict[str, Any]:
print(f"Sending message to Kafka topic '{self.topic}'...")
self.logger.info(f"Sending message to Kafka topic '{self.topic}'...")
try:
future = self.producer.send(self.topic, value=message.__dict__)
result = future.get(timeout=10) # Block until a single message is sent
print("Message sent successfully!")
self.logger.info("Message sent successfully!")
return {"status": "sent", "offset": result.offset}
except KafkaError as e:
print(f"Failed to send message: {e}")
self.logger.error(f"Failed to send message: {e}")
return {"error": str(e)}

def receive(self) -> Message:
print(f"Receiving message from Kafka topic '{self.topic}'...")
self.logger.info(f"Receiving message from Kafka topic '{self.topic}'...")
try:
for message in self.consumer:
print("Message received successfully!")
self.logger.info("Message received successfully!")
return Message(action="receive", data=message.value) # Return the first message received
except KafkaError as e:
print(f"Failed to receive message: {e}")
self.logger.error(f"Failed to receive message: {e}")
return Message(action="error", data={}, metadata={"error": str(e)})

def disconnect(self):
print("Disconnecting from Kafka...")
self.logger.info("Disconnecting from Kafka...")
if self.producer:
self.producer.close()
print("Kafka producer disconnected.")
self.logger.info("Kafka producer disconnected.")
if self.consumer:
self.consumer.close()
print("Kafka consumer disconnected.")
self.logger.info("Kafka consumer disconnected.")
60 changes: 60 additions & 0 deletions docker/docker-compose.kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: '3.8'

services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
networks:
- kafka_network
healthcheck:
test:
[
"CMD",
"echo",
"ruok",
"|",
"nc",
"localhost",
"2181",
"|",
"grep",
"imok"
]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s

kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: /kafka/logs
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
networks:
- kafka_network
healthcheck:
test:
[
"CMD",
"bash",
"-c",
"unset JMX_PORT; echo 'dump' | nc -w 1 localhost 9092 | grep -q brokers"
]
interval: 10s
timeout: 5s
retries: 5
start_period: 20s

networks:
kafka_network:
driver: bridge
53 changes: 53 additions & 0 deletions tests/test_kafka_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import unittest
import time
from connectiva.protocols import KafkaProtocol
from connectiva import Message

class TestKafkaProtocol(unittest.TestCase):

@classmethod
def setUpClass(cls):
# Use a Kafka broker for testing
cls.protocol = KafkaProtocol(
endpoint='localhost:9092', # Assuming Kafka is running on localhost
topic='test_topic',
group_id='test_group'
)
cls.protocol.connect()

@classmethod
def tearDownClass(cls):
# Disconnect after all tests
cls.protocol.disconnect()

def test_send_message(self):
# Send a message and check the response
message = Message(action="send", data={"key": "value"})
result = self.protocol.send(message)
self.assertEqual(result["status"], "sent", "Message send status should be 'sent'")

def test_receive_message(self):
# Send a message first to ensure there's something to receive
sent_message = Message(action="send", data={"key": "value"})
self.protocol.send(sent_message)

# Allow some time for the message to be available
time.sleep(1)

# Receive the message and verify contents
received_message = self.protocol.receive()
self.assertEqual(received_message.action, "receive", "Received action should be 'receive'")
self.assertEqual(received_message.data, sent_message.__dict__, "Received data should match sent message")

def test_receive_no_message(self):
# Attempt to receive when no message is expected
# Assume we clean the topic or use a new consumer group for this test
# No prior send means no messages to receive
time.sleep(1) # Allow time for the consumer to poll
received_message = self.protocol.receive()
self.assertEqual(received_message.action, "error", "Action should be 'error' when no message is found")
self.assertIn("No message found", received_message.metadata.get("error", ""), "Error metadata should indicate no message found")


if __name__ == '__main__':
unittest.main()

0 comments on commit 4f478d4

Please sign in to comment.