Skip to content

Commit

Permalink
topic creation within kafka protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
tavallaie committed Aug 4, 2024
1 parent 495cf75 commit 9cb2de6
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
54 changes: 50 additions & 4 deletions connectiva/protocols/kafka_protocol.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import TopicPartition
from typing import Dict, Any
from connectiva import CommunicationMethod, Message
import json
Expand All @@ -19,8 +21,11 @@ def __init__(self, **kwargs):
self.endpoint = kwargs.get("endpoint")
self.topic = kwargs.get("topic")
self.group_id = kwargs.get("group_id")
self.partitions = kwargs.get("partitions", 1)
self.replication_factor = kwargs.get("replication_factor", 1)
self.producer = None
self.consumer = None
self.admin_client = None

# Parse the endpoint to get broker list
self.broker_list = self._parse_endpoint(self.endpoint)
Expand All @@ -40,9 +45,39 @@ def _parse_endpoint(self, endpoint: str) -> list:
self.logger.debug(f"Parsed brokers: {brokers}")
return brokers

def create_topic(self):
"""
Create Kafka topic if it does not exist.
"""
try:
self.admin_client = KafkaAdminClient(bootstrap_servers=self.broker_list)
topic_list = self.admin_client.list_topics()
if self.topic not in topic_list:
self.logger.info(f"Creating topic {self.topic}...")
new_topic = NewTopic(
name=self.topic,
num_partitions=self.partitions,
replication_factor=self.replication_factor
)
self.admin_client.create_topics([new_topic])
self.logger.info(f"Topic {self.topic} created successfully!")
else:
self.logger.info(f"Topic {self.topic} already exists.")
except TopicAlreadyExistsError:
self.logger.info(f"Topic {self.topic} already exists.")
except KafkaError as e:
self.logger.error(f"Failed to create topic: {e}")
raise
finally:
if self.admin_client:
self.admin_client.close()

def connect(self):
self.logger.info(f"Connecting to Kafka brokers at {self.broker_list}...")
try:
# Create the topic if it doesn't exist
self.create_topic()

# Initialize Kafka producer
self.producer = KafkaProducer(
bootstrap_servers=self.broker_list,
Expand All @@ -61,6 +96,7 @@ def connect(self):
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
self.logger.info("Kafka consumer connected.")
self.consumer.subscribe([self.topic]) # Subscribe to the topic
else:
self.logger.info("No consumer group ID provided; skipping consumer initialization.")

Expand Down Expand Up @@ -95,8 +131,18 @@ def receive(self) -> Message:
def seek_to_end(self):
"""Move the consumer to the end of the log for the current topic."""
if self.consumer:
self.consumer.seek_to_end()
self.logger.info("Moved consumer to the end of the log.")
try:
partitions = self.consumer.partitions_for_topic(self.topic)
if not partitions:
self.logger.error(f"No partitions found for topic {self.topic}.")
return

topic_partitions = [TopicPartition(self.topic, p) for p in partitions]
self.consumer.assign(topic_partitions) # Ensure partitions are assigned
self.consumer.seek_to_end()
self.logger.info("Moved consumer to the end of the log.")
except Exception as e:
self.logger.error(f"Failed to seek to end: {e}")

def disconnect(self):
self.logger.info("Disconnecting from Kafka...")
Expand Down
2 changes: 2 additions & 0 deletions tests/test_kafka_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def setUpClass(cls):
endpoint='kafka://localhost:9092', # Correct Kafka endpoint format
topic='test_topic',
group_id='test_group',
partitions=1, # Define the number of partitions
replication_factor=1, # Define the replication factor
log=True, # Enable logging to stdout
log_file=cls.log_file, # Enable logging to file
log_level="DEBUG"
Expand Down

0 comments on commit 9cb2de6

Please sign in to comment.