Skip to content

Commit

Permalink
fixed interval vs iterations of log_speed
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Jan 12, 2024
1 parent 3844c17 commit 43657c2
Showing 1 changed file with 34 additions and 44 deletions.
78 changes: 34 additions & 44 deletions src/modules/_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,31 @@
logger = logging.getLogger(__name__)


def log_speed(
counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 60
) -> tuple[float, int]:
# Calculate the time elapsed since the function started
delta_time = time.time() - start_time

# Check if the specified interval has not elapsed yet
if delta_time < interval:
# Return the original start time and the current counter value
return start_time, counter

# Calculate the processing speed (messages per second)
speed = counter / delta_time

# Log the processing speed and relevant information
log_message = (
f"{topic=}, qsize={_queue.qsize()}, "
f"processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec"
)
logger.info(log_message)

# Return the current time and reset the counter to zero
return time.time(), 0


async def kafka_consumer_safe(topic: str, group: str, max_retries=3, retry_delay=30):
retry_count = 0

Expand Down Expand Up @@ -70,48 +95,21 @@ async def kafka_producer():
return producer


def log_speed(
counter: int, start_time: float, _queue: Queue, topic: str
) -> tuple[float, int]:
end_time = time.time()
delta_time = end_time - start_time
speed = counter / delta_time
logger.info(
f"{topic=}, qsize={_queue.qsize()}, processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec"
)
return time.time(), 0


async def receive_messages(
consumer: AIOKafkaConsumer,
receive_queue: Queue,
shutdown_event: Event,
batch_size: int = 200,
):
logger.info(f"start receiving messages, topics={await consumer.topics()}")
while not shutdown_event.is_set():
try:
batch = await consumer.getmany(timeout_ms=1000, max_records=batch_size)
except Exception as error:
error_type = type(error)
logger.error(
{
"error_type": error_type.__name__,
"error": error,
}
)
tb_str = traceback.format_exc()
logger.error(f"{error}, \n{tb_str}")
await asyncio.sleep(5) # Add a delay before retrying
continue # Restart the loop

batch = await consumer.getmany(timeout_ms=1000, max_records=batch_size)
for tp, messages in batch.items():
logger.info(f"Partition {tp}: {len(messages)} messages")
await asyncio.gather(*[receive_queue.put(m.value) for m in messages])
logger.info("done")
await consumer.commit()

logger.info("stop receiving messages")
logger.info("shutdown")


async def send_messages(
Expand All @@ -122,16 +120,15 @@ async def send_messages(
):
start_time = time.time()
messages_sent = 0
logger.info(f"start sending messages, topic={topic}")

while not shutdown_event.is_set():
start_time, messages_sent = log_speed(
counter=messages_sent,
start_time=start_time,
_queue=send_queue,
topic=topic,
)
if send_queue.empty():
if messages_sent > 0:
start_time, messages_sent = log_speed(
counter=messages_sent,
start_time=start_time,
_queue=send_queue,
topic=topic,
)
await asyncio.sleep(1)
continue

Expand All @@ -141,11 +138,4 @@ async def send_messages(

messages_sent += 1

if messages_sent >= 100:
start_time, messages_sent = log_speed(
counter=messages_sent,
start_time=start_time,
_queue=send_queue,
topic=topic,
)
logger.info("shutdown")

0 comments on commit 43657c2

Please sign in to comment.