From 43657c26a70f7bec9ad19ca06ba6915493e1e2ae Mon Sep 17 00:00:00 2001 From: extreme4all <> Date: Fri, 12 Jan 2024 20:09:45 +0100 Subject: [PATCH] fixed interval vs iterations of log_speed --- src/modules/_kafka.py | 78 +++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/src/modules/_kafka.py b/src/modules/_kafka.py index 2c58327..7612484 100644 --- a/src/modules/_kafka.py +++ b/src/modules/_kafka.py @@ -12,6 +12,31 @@ logger = logging.getLogger(__name__) +def log_speed( + counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 60 +) -> tuple[float, int]: + # Calculate the time elapsed since the function started + delta_time = time.time() - start_time + + # Check if the specified interval has not elapsed yet + if delta_time < interval: + # Return the original start time and the current counter value + return start_time, counter + + # Calculate the processing speed (messages per second) + speed = counter / delta_time + + # Log the processing speed and relevant information + log_message = ( + f"{topic=}, qsize={_queue.qsize()}, " + f"processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec" + ) + logger.info(log_message) + + # Return the current time and reset the counter to zero + return time.time(), 0 + + async def kafka_consumer_safe(topic: str, group: str, max_retries=3, retry_delay=30): retry_count = 0 @@ -70,48 +95,21 @@ async def kafka_producer(): return producer -def log_speed( - counter: int, start_time: float, _queue: Queue, topic: str -) -> tuple[float, int]: - end_time = time.time() - delta_time = end_time - start_time - speed = counter / delta_time - logger.info( - f"{topic=}, qsize={_queue.qsize()}, processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec" - ) - return time.time(), 0 - - async def receive_messages( consumer: AIOKafkaConsumer, receive_queue: Queue, shutdown_event: Event, batch_size: int = 200, ): - logger.info(f"start receiving messages, topics={await consumer.topics()}") while not shutdown_event.is_set(): - try: - batch = await consumer.getmany(timeout_ms=1000, max_records=batch_size) - except Exception as error: - error_type = type(error) - logger.error( - { - "error_type": error_type.__name__, - "error": error, - } - ) - tb_str = traceback.format_exc() - logger.error(f"{error}, \n{tb_str}") - await asyncio.sleep(5) # Add a delay before retrying - continue # Restart the loop - + batch = await consumer.getmany(timeout_ms=1000, max_records=batch_size) for tp, messages in batch.items(): logger.info(f"Partition {tp}: {len(messages)} messages") await asyncio.gather(*[receive_queue.put(m.value) for m in messages]) logger.info("done") await consumer.commit() - logger.info("stop receiving messages") + logger.info("shutdown") async def send_messages( @@ -122,16 +120,15 @@ async def send_messages( ): start_time = time.time() messages_sent = 0 - logger.info(f"start sending messages, topic={topic}") + while not shutdown_event.is_set(): + start_time, messages_sent = log_speed( + counter=messages_sent, + start_time=start_time, + _queue=send_queue, + topic=topic, + ) if send_queue.empty(): - if messages_sent > 0: - start_time, messages_sent = log_speed( - counter=messages_sent, - start_time=start_time, - _queue=send_queue, - topic=topic, - ) await asyncio.sleep(1) continue @@ -141,11 +138,4 @@ async def send_messages( messages_sent += 1 - if messages_sent >= 100: - start_time, messages_sent = log_speed( - counter=messages_sent, - start_time=start_time, - _queue=send_queue, - topic=topic, - ) logger.info("shutdown")