Skip to content

Commit

Permalink
extra logging on kafka_consumer & kafka_producer
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Jan 11, 2024
1 parent 94a277f commit f61a4a0
Showing 1 changed file with 48 additions and 16 deletions.
64 changes: 48 additions & 16 deletions src/modules/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,60 @@
logger = logging.getLogger(__name__)


import asyncio
import json

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer


async def kafka_consumer(topic: str, group: str):
logger.info(f"starting consumer, topic: {topic}, group: {group}")
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=[app_config.KAFKA_HOST],
group_id=group,
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
auto_offset_reset="earliest",
)
await consumer.start()
return consumer

max_retries = 3
retry_count = 0

while retry_count < max_retries:
try:
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=[app_config.KAFKA_HOST],
group_id=group,
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
auto_offset_reset="earliest",
)
await consumer.start()
return consumer
except Exception as e:
logger.error(f"Error connecting to Kafka: {e}")
retry_count += 1
logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...")
await asyncio.sleep(5) # Add a delay before retrying

raise RuntimeError("Failed to connect to Kafka after multiple retries")


async def kafka_producer():
logger.info(f"starting producer")
producer = AIOKafkaProducer(
bootstrap_servers=[app_config.KAFKA_HOST],
value_serializer=lambda v: json.dumps(v).encode(),
acks="all",
)
await producer.start()
return producer

max_retries = 3
retry_count = 0

while retry_count < max_retries:
try:
producer = AIOKafkaProducer(
bootstrap_servers=[app_config.KAFKA_HOST],
value_serializer=lambda v: json.dumps(v).encode(),
acks="all",
)
await producer.start()
return producer
except Exception as e:
logger.error(f"Error connecting to Kafka: {e}")
retry_count += 1
logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...")
await asyncio.sleep(5) # Add a delay before retrying

raise RuntimeError("Failed to connect to Kafka after multiple retries")


def log_speed(
Expand Down

0 comments on commit f61a4a0

Please sign in to comment.