diff --git a/src/modules/kafka.py b/src/modules/kafka.py index e084bab..8495922 100644 --- a/src/modules/kafka.py +++ b/src/modules/kafka.py @@ -12,28 +12,60 @@ logger = logging.getLogger(__name__) +import asyncio +import json + +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer + + async def kafka_consumer(topic: str, group: str): logger.info(f"starting consumer, topic: {topic}, group: {group}") - consumer = AIOKafkaConsumer( - topic, - bootstrap_servers=[app_config.KAFKA_HOST], - group_id=group, - value_deserializer=lambda x: json.loads(x.decode("utf-8")), - auto_offset_reset="earliest", - ) - await consumer.start() - return consumer + + max_retries = 3 + retry_count = 0 + + while retry_count < max_retries: + try: + consumer = AIOKafkaConsumer( + topic, + bootstrap_servers=[app_config.KAFKA_HOST], + group_id=group, + value_deserializer=lambda x: json.loads(x.decode("utf-8")), + auto_offset_reset="earliest", + ) + await consumer.start() + return consumer + except Exception as e: + logger.error(f"Error connecting to Kafka: {e}") + retry_count += 1 + logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...") + await asyncio.sleep(5) # Add a delay before retrying + + raise RuntimeError("Failed to connect to Kafka after multiple retries") async def kafka_producer(): logger.info(f"starting producer") - producer = AIOKafkaProducer( - bootstrap_servers=[app_config.KAFKA_HOST], - value_serializer=lambda v: json.dumps(v).encode(), - acks="all", - ) - await producer.start() - return producer + + max_retries = 3 + retry_count = 0 + + while retry_count < max_retries: + try: + producer = AIOKafkaProducer( + bootstrap_servers=[app_config.KAFKA_HOST], + value_serializer=lambda v: json.dumps(v).encode(), + acks="all", + ) + await producer.start() + return producer + except Exception as e: + logger.error(f"Error connecting to Kafka: {e}") + retry_count += 1 + logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...") + await asyncio.sleep(5) # Add a delay before retrying + + raise RuntimeError("Failed to connect to Kafka after multiple retries") def log_speed(