Skip to content

Commit

Permalink
abstrract error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Jan 12, 2024
1 parent 43657c2 commit 5b19cb1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 29 deletions.
4 changes: 2 additions & 2 deletions src/main_highscore.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ async def get_proxies() -> list:

async def main():
shutdown_event = Event()
consumer = await _kafka.kafka_consumer_safe(topic="player", group="scraper")
producer = await _kafka.kafka_producer_safe()
consumer = await _kafka.kafka_consumer(topic="player", group="scraper")
producer = await _kafka.kafka_producer()

receive_queue = Queue(maxsize=500)
send_queue = Queue(maxsize=100)
Expand Down
6 changes: 2 additions & 4 deletions src/main_runemetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,8 @@ async def get_proxies() -> list:

async def main():
shutdown_event = Event()
consumer = await _kafka.kafka_consumer_safe(
topic="scraper-runemetrics", group="scraper"
)
producer = await _kafka.kafka_producer_safe()
consumer = await _kafka.kafka_consumer(topic="scraper-runemetrics", group="scraper")
producer = await _kafka.kafka_producer()

receive_queue = Queue(maxsize=500)
send_queue = Queue(maxsize=100)
Expand Down
91 changes: 68 additions & 23 deletions src/modules/_kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import functools
import json
import logging
import time
Expand All @@ -12,8 +13,48 @@
logger = logging.getLogger(__name__)


def print_traceback(_, error):
error_type = type(error)
logger.error(
{
"error_type": error_type.__name__,
"error": error,
}
)
tb_str = traceback.format_exc()
logger.error(f"{error}, \n{tb_str}")


def retry(max_retries=3, retry_delay=5, on_retry=None, on_failure=None):
def wrapper(func):
@functools.wraps(func)
async def wrapped(*args, **kwargs):
retry_count = 0

while retry_count < max_retries:
try:
return await func(*args, **kwargs)
except Exception as e:
if on_retry:
on_retry(retry_count, e)

retry_count += 1
logger.error(f"Error: {e}")
logger.info(f"Retrying ({retry_count}/{max_retries})...")
await asyncio.sleep(retry_delay) # Add a delay before retrying

if on_failure:
on_failure(retry_count)

raise RuntimeError(f"Failed after {max_retries} retries")

return wrapped

return wrapper


def log_speed(
counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 60
counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 15
) -> tuple[float, int]:
# Calculate the time elapsed since the function started
delta_time = time.time() - start_time
Expand All @@ -37,21 +78,22 @@ def log_speed(
return time.time(), 0


async def kafka_consumer_safe(topic: str, group: str, max_retries=3, retry_delay=30):
retry_count = 0
# async def kafka_consumer_safe(topic: str, group: str, max_retries=3, retry_delay=30):
# retry_count = 0

while retry_count < max_retries:
try:
return await kafka_consumer(topic, group)
except Exception as e:
retry_count += 1
logger.error(f"Error connecting to Kafka: {e}")
logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...")
await asyncio.sleep(retry_delay) # Add a delay before retrying
# while retry_count < max_retries:
# try:
# return await kafka_consumer(topic, group)
# except Exception as e:
# retry_count += 1
# logger.error(f"Error connecting to Kafka: {e}")
# logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...")
# await asyncio.sleep(retry_delay) # Add a delay before retrying

raise RuntimeError("Failed to connect to Kafka after multiple retries")
# raise RuntimeError("Failed to connect to Kafka after multiple retries")


@retry(max_retries=3, retry_delay=5, on_failure=print_traceback)
async def kafka_consumer(topic: str, group: str):
logger.info(f"Starting consumer, {topic=}, {group=}, {app_config.KAFKA_HOST=}")

Expand All @@ -67,21 +109,22 @@ async def kafka_consumer(topic: str, group: str):
return consumer


async def kafka_producer_safe(max_retries=3, retry_delay=30):
retry_count = 0
# async def kafka_producer_safe(max_retries=3, retry_delay=30):
# retry_count = 0

while retry_count < max_retries:
try:
return await kafka_producer()
except Exception as e:
retry_count += 1
logger.error(f"Error connecting to Kafka: {e}")
logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...")
await asyncio.sleep(retry_delay) # Add a delay before retrying
# while retry_count < max_retries:
# try:
# return await kafka_producer()
# except Exception as e:
# retry_count += 1
# logger.error(f"Error connecting to Kafka: {e}")
# logger.info(f"Retrying Kafka connection ({retry_count}/{max_retries})...")
# await asyncio.sleep(retry_delay) # Add a delay before retrying

raise RuntimeError("Failed to connect to Kafka after multiple retries")
# raise RuntimeError("Failed to connect to Kafka after multiple retries")


@retry(max_retries=3, retry_delay=5, on_failure=print_traceback)
async def kafka_producer():
logger.info(f"Starting producer")

Expand All @@ -95,6 +138,7 @@ async def kafka_producer():
return producer


@retry(max_retries=3, retry_delay=5, on_failure=print_traceback)
async def receive_messages(
consumer: AIOKafkaConsumer,
receive_queue: Queue,
Expand All @@ -112,6 +156,7 @@ async def receive_messages(
logger.info("shutdown")


@retry(max_retries=3, retry_delay=5, on_failure=print_traceback)
async def send_messages(
topic: str,
producer: AIOKafkaProducer,
Expand Down

0 comments on commit 5b19cb1

Please sign in to comment.