Skip to content

Commit

Permalink
Merge pull request #3 from tavallaie/kafka
Browse files Browse the repository at this point in the history
adding support for kafka
  • Loading branch information
tavallaie authored Aug 4, 2024
2 parents c16a964 + e7bef8e commit 1e72f25
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 65 deletions.
72 changes: 72 additions & 0 deletions .github/workflows/test-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
name: Kafka Tests

on:
push:
branches:
- main
paths:
- 'connectiva/protocols/kafka_protocol.py'
- 'tests/test_kafka_protocol.py'
- 'pyproject.toml'
pull_request:
branches:
- main
paths:
- 'connectiva/protocols/kafka_protocol.py'
- 'tests/test_kafka_protocol.py'
- 'pyproject.toml'

jobs:
test-kafka:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install Poetry
run: |
python -m pip install --upgrade pip
python -m pip install poetry
- name: Add Poetry to Path
run: echo "export PATH=\"$HOME/.local/bin:\$PATH\"" >> $GITHUB_ENV

- name: Install dependencies
run: poetry install

- name: Start Kafka and Zookeeper
run: |
echo "Starting Kafka and Zookeeper services on Python ${{ matrix.python-version }}..."
docker compose -f docker/docker-compose.kafka.yml up -d
- name: Wait for Kafka to be healthy
run: |
echo "Waiting for Kafka service to be healthy..."
for i in {1..10}; do
if [ "$(docker inspect --format='{{json .State.Health.Status}}' $(docker ps -q -f name=kafka))" == "\"healthy\"" ]; then
echo "Kafka is healthy!"
break
else
echo "Kafka is not healthy yet. Waiting..."
sleep 10
fi
done
- name: Run Kafka Tests
run: |
echo "Running Kafka tests on Python ${{ matrix.python-version }}..."
poetry run python -m unittest discover -s tests -p 'test_kafka_protocol.py'
- name: Stop Kafka and Zookeeper
run: |
echo "Stopping Kafka and Zookeeper services..."
docker compose -f docker/docker-compose.kafka.yml down
2 changes: 1 addition & 1 deletion .github/workflows/test-rabbitmq.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12","3.13"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]

steps:
- name: Checkout code
Expand Down
4 changes: 2 additions & 2 deletions connectiva/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
from .interfaces import CommunicationMethod
from .communication_factory import CommunicationFactory
from .logging_config import setup_logging

__all__= ["Message","CommunicationMethod","CommunicationFactory","setup_logging"]
from .connectiva import Connectiva
__all__= ["Message","CommunicationMethod","CommunicationFactory","setup_logging","Connectiva"]
11 changes: 10 additions & 1 deletion connectiva/connectiva.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from typing import Dict, Any, List, Optional
from connectiva import CommunicationFactory, Message , setup_logging
from connectiva import CommunicationFactory, Message, setup_logging


class Connectiva:
Expand Down Expand Up @@ -62,3 +62,12 @@ def receive(self) -> Message:
def disconnect(self):
self.logger.info("Disconnecting from communication endpoint...")
self.strategy.disconnect()

def seek_to_end(self):
"""
Seek the consumer to the end of the topic.
This method will check if the strategy supports seeking.
"""
if hasattr(self.strategy, 'seek_to_end'):
self.strategy.seek_to_end()
self.logger.info("Consumer moved to the end of the log.")
138 changes: 113 additions & 25 deletions connectiva/protocols/kafka_protocol.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,161 @@
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import TopicPartition
from typing import Dict, Any
from ..interfaces import CommunicationMethod
from ..message import Message
from connectiva import CommunicationMethod, Message
import json
import logging
import re

class KafkaProtocol(CommunicationMethod):
"""
Kafka communication class for producing and consuming messages from Kafka topics.
"""

def __init__(self, **kwargs):
self.broker_list = kwargs.get("broker_list").split(',')
# Set up logger first
self.logger = logging.getLogger(self.__class__.__name__)

# Extract configuration from kwargs
self.endpoint = kwargs.get("endpoint")
self.topic = kwargs.get("topic")
self.group_id = kwargs.get("group_id")
self.partitions = kwargs.get("partitions", 1)
self.replication_factor = kwargs.get("replication_factor", 1)
self.consumer_timeout = kwargs.get("consumer_timeout", 5000) # Timeout for consumer in milliseconds
self.producer = None
self.consumer = None
self.admin_client = None

# Parse the endpoint to get broker list
self.broker_list = self._parse_endpoint(self.endpoint)

def _parse_endpoint(self, endpoint: str) -> list:
"""
Parse the Kafka endpoint into a list of brokers.
:param endpoint: The endpoint URL in the form 'kafka://host1:port1,host2:port2'
:return: List of broker addresses ['host1:port1', 'host2:port2']
"""
if not endpoint.startswith("kafka://"):
raise ValueError("Invalid Kafka endpoint. Must start with 'kafka://'")

# Strip the protocol prefix and split by comma to get individual brokers
broker_string = endpoint[len("kafka://"):]
brokers = re.split(r',\s*', broker_string)
self.logger.debug(f"Parsed brokers: {brokers}")
return brokers

def create_topic(self):
"""
Create Kafka topic if it does not exist.
"""
try:
self.admin_client = KafkaAdminClient(bootstrap_servers=self.broker_list)
topic_list = self.admin_client.list_topics()
if self.topic not in topic_list:
self.logger.info(f"Creating topic {self.topic}...")
new_topic = NewTopic(
name=self.topic,
num_partitions=self.partitions,
replication_factor=self.replication_factor
)
self.admin_client.create_topics([new_topic])
self.logger.info(f"Topic {self.topic} created successfully!")
else:
self.logger.info(f"Topic {self.topic} already exists.")
except TopicAlreadyExistsError:
self.logger.info(f"Topic {self.topic} already exists.")
except KafkaError as e:
self.logger.error(f"Failed to create topic: {e}")
raise
finally:
if self.admin_client:
self.admin_client.close()

def connect(self):
print(f"Connecting to Kafka brokers at {self.broker_list}...")
self.logger.info(f"Connecting to Kafka brokers at {self.broker_list}...")
try:
# Create the topic if it doesn't exist
self.create_topic()

# Initialize Kafka producer
self.producer = KafkaProducer(
bootstrap_servers=self.broker_list,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Kafka producer connected.")
self.logger.info("Kafka producer connected.")

# Initialize Kafka consumer
if self.group_id:
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.broker_list,
group_id=self.group_id,
auto_offset_reset='earliest', # Start from the earliest message
enable_auto_commit=True, # Automatically commit offsets
consumer_timeout_ms=self.consumer_timeout, # Set consumer timeout
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print("Kafka consumer connected.")
self.logger.info("Kafka consumer connected.")
self.consumer.subscribe([self.topic]) # Subscribe to the topic
else:
print("No consumer group ID provided; skipping consumer initialization.")
self.logger.info("No consumer group ID provided; skipping consumer initialization.")

except KafkaError as e:
print(f"Failed to connect to Kafka: {e}")
self.logger.error(f"Failed to connect to Kafka: {e}")
raise

def send(self, message: Message) -> Dict[str, Any]:
print(f"Sending message to Kafka topic '{self.topic}'...")
self.logger.info(f"Sending message to Kafka topic '{self.topic}'...")
try:
future = self.producer.send(self.topic, value=message.__dict__)
future = self.producer.send(self.topic, value=message.__dict__) # Send the entire message
result = future.get(timeout=10) # Block until a single message is sent
print("Message sent successfully!")
self.logger.info(f"Message sent successfully! Offset: {result.offset}")
return {"status": "sent", "offset": result.offset}
except KafkaError as e:
print(f"Failed to send message: {e}")
self.logger.error(f"Failed to send message: {e}")
return {"error": str(e)}

def receive(self) -> Message:
print(f"Receiving message from Kafka topic '{self.topic}'...")
self.logger.info(f"Receiving message from Kafka topic '{self.topic}'...")
try:
for message in self.consumer:
print("Message received successfully!")
return Message(action="receive", data=message.value) # Return the first message received
self.logger.info(f"Message received successfully! Message: {message.value}")
return Message(action="receive", data=message.value) # Return the entire message
self.logger.info("No message received within the timeout period.")
return Message(action="error", data={}, metadata={"error": "No message found"})
except StopIteration:
self.logger.info("No message received.")
return Message(action="error", data={}, metadata={"error": "No message found"})
except KafkaError as e:
print(f"Failed to receive message: {e}")
self.logger.error(f"Failed to receive message: {e}")
return Message(action="error", data={}, metadata={"error": str(e)})

def disconnect(self):
print("Disconnecting from Kafka...")
if self.producer:
self.producer.close()
print("Kafka producer disconnected.")
def seek_to_end(self):
"""Move the consumer to the end of the log for the current topic."""
if self.consumer:
self.consumer.close()
print("Kafka consumer disconnected.")
try:
partitions = self.consumer.partitions_for_topic(self.topic)
if not partitions:
self.logger.error(f"No partitions found for topic {self.topic}.")
return

topic_partitions = [TopicPartition(self.topic, p) for p in partitions]
self.consumer.assign(topic_partitions) # Ensure partitions are assigned
self.consumer.seek_to_end()
self.logger.info("Moved consumer to the end of the log.")
except Exception as e:
self.logger.error(f"Failed to seek to end: {e}")

def disconnect(self):
self.logger.info("Disconnecting from Kafka...")
try:
if self.producer:
self.producer.close()
self.logger.info("Kafka producer disconnected.")
if self.consumer:
self.consumer.close()
self.logger.info("Kafka consumer disconnected.")
except Exception as e:
self.logger.error(f"Failed to disconnect Kafka: {e}")
60 changes: 60 additions & 0 deletions docker/docker-compose.kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
services:
zookeeper:
image: bitnami/zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
healthcheck:
test:
[
"CMD",
"echo",
"ruok",
"|",
"nc",
"localhost",
"2181",
"|",
"grep",
"imok"
]
interval: 5s
timeout: 5s
retries: 5
start_period: 10s

kafka:
image: bitnami/kafka:latest
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
depends_on:
zookeeper:
condition: service_healthy
healthcheck:
test:
[
"CMD",
"bash",
"-c",
"unset JMX_PORT; echo 'dump' | nc -w 1 localhost 9092 | grep -q brokers"
]
interval: 5s
timeout: 5s
retries: 5
start_period: 20s

networks:
default:
driver: bridge
Loading

0 comments on commit 1e72f25

Please sign in to comment.