Skip to content

Commit

Permalink
better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Nov 26, 2023
1 parent b609333 commit 07a7ea2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 40 deletions.
60 changes: 30 additions & 30 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
aiohttp==3.8.6
aiokafka==0.8.1
aiosignal==1.3.1
async-timeout==4.0.2
attrs==22.1.0
black==22.12.0
certifi==2023.7.22
cffi==1.15.1
chardet==5.1.0
charset-normalizer==2.1.1
click==8.1.3
colorama==0.4.6
frozenlist==1.3.3
idna==3.4
kafka-python==2.0.2
multidict==6.0.3
mypy-extensions==0.4.3
packaging==23.1
pathspec==0.10.3
platformdirs==2.6.0
pycares==4.3.0
pycparser==2.21
pydantic==1.10.8
python-dotenv==0.21.0
requests==2.31.0
rfc3339==6.2
tomli==2.0.1
typing_extensions==4.4.0
urllib3==1.26.13
yarl==1.8.2
aiohttp==3.8.6
aiokafka==0.8.1
aiosignal==1.3.1
async-timeout==4.0.2
attrs==22.1.0
black==22.12.0
certifi==2023.7.22
cffi==1.15.1
chardet==5.1.0
charset-normalizer==2.1.1
click==8.1.3
colorama==0.4.6
frozenlist==1.3.3
idna==3.4
kafka-python==2.0.2
multidict==6.0.3
mypy-extensions==0.4.3
packaging==23.1
pathspec==0.10.3
platformdirs==2.6.0
pycares==4.3.0
pycparser==2.21
pydantic==1.10.8
python-dotenv==0.21.0
requests==2.31.0
rfc3339==6.2
tomli==2.0.1
typing_extensions==4.4.0
urllib3==1.26.13
yarl==1.8.2
28 changes: 18 additions & 10 deletions src/main_v2.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import json
import logging
import time
import traceback
import uuid
from asyncio import Queue
from time import time

import requests
from aiohttp import (
Expand Down Expand Up @@ -166,13 +166,27 @@ async def receive_messages(consumer: AIOKafkaConsumer, receive_queue: Queue):
await receive_queue.put(value)


def log_speed(counter: int, start_time: float, _queue: Queue) -> tuple[float, int]:
end_time = time.time()
delta_time = end_time - start_time
speed = counter / delta_time
logger.info(
f"qsize={_queue.qsize()}, processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec"
)
return time.time(), 0


async def send_messages(topic: str, producer: AIOKafkaProducer, send_queue: Queue):
last_interval = time()
start_time = time.time()
messages_sent = 0

while True:
if send_queue.empty():
start_time, messages_sent = log_speed(
counter=messages_sent, start_time=start_time, _queue=send_queue
)
await asyncio.sleep(1)
continue
message = await send_queue.get()
await producer.send(topic, value=message)
send_queue.task_done()
Expand All @@ -181,16 +195,10 @@ async def send_messages(topic: str, producer: AIOKafkaProducer, send_queue: Queu
messages_sent += 1

if topic == "scraper" and messages_sent >= 100:
current_time = time()
elapsed_time = current_time - last_interval
speed = messages_sent / elapsed_time
logger.info(
f"processed {messages_sent} in {elapsed_time:.2f} seconds, {speed:.2f} msg/sec"
start_time, messages_sent = log_speed(
counter=messages_sent, start_time=start_time, _queue=send_queue
)

last_interval = time()
messages_sent = 0


async def main():
# get kafka engine
Expand Down

0 comments on commit 07a7ea2

Please sign in to comment.