Skip to content

Commit

Permalink
Aio kafka engine (#46)
Browse files Browse the repository at this point in the history
* migrate to AioKafkaEngine

* update requirements
  • Loading branch information
extreme4all authored May 26, 2024
1 parent 030f6de commit a511796
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 231 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ test-setup: ## installs pytest singular package for local testing

requirements: ## installs all requirements
python3 -m pip install -r requirements.txt
python3 -m pip install ruff

create-env: ## create .env file
echo "ENV=DEV" > .env
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,14 @@ call pip install -r requirements.txt --upgrade
call pip freeze > requirements.txt
powershell "(Get-Content requirements.txt) | ForEach-Object { $_ -replace '>=', '==' } | Set-Content requirements.txt"
```
upgrading with linux
```sh
sed -i 's/==/>=/g' requirements.txt
pip install -r requirements.txt --upgrade
pip freeze > requirements.txt
```
if you are running the cluster
```sh
kubectl port-forward -n kafka svc/bd-prd-kafka-service 9094:9094
kubectl port-forward -n database svc/mysql 3306:3306
```
13 changes: 10 additions & 3 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ services:
- 9092:9092
healthcheck:
test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "localhost:9092"]
# during this period fails are not considered
start_period: 30s
# time between cmd
interval: 30s
timeout: 10s
# time given to the cmd
timeout: 5s
retries: 5
networks:
- botdetector-network
Expand Down Expand Up @@ -68,9 +72,12 @@ services:
- botdetector-network
healthcheck:
test: "mysqladmin ping -h localhost -u root -proot_bot_buster"
# during this period fails are not considered
start_period: 30s
# time between cmd
interval: 30s
timeout: 10s
retries: 5
# time given to the cmd
timeout: 5s

public_api:
container_name: public_api
Expand Down
17 changes: 0 additions & 17 deletions notes.md

This file was deleted.

72 changes: 45 additions & 27 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,57 +1,75 @@
aiokafka==0.10.0
AioKafkaEngine==0.0.4
aiomysql==0.2.0
annotated-types==0.6.0
annotated-types==0.7.0
anyio==4.3.0
async-timeout==4.0.3
asyncmy==0.2.9
attrs==23.1.0
black==24.3.0
certifi==2023.7.22
attrs==23.2.0
black==24.4.2
certifi==2024.2.2
cffi==1.16.0
cfgv==3.4.0
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
cryptography==42.0.5
cryptography==42.0.7
databases==0.9.0
distlib==0.3.8
exceptiongroup==1.2.0
fastapi==0.110.0
filelock==3.13.1
dnspython==2.6.1
email_validator==2.1.1
exceptiongroup==1.2.1
fastapi==0.111.0
fastapi-cli==0.0.4
filelock==3.14.0
greenlet==3.0.3
h11==0.14.0
httpcore==1.0.4
httpcore==1.0.5
httptools==0.6.1
httpx==0.27.0
hypothesis==6.88.3
identify==2.5.35
idna==3.6
hypothesis==6.102.6
identify==2.5.36
idna==3.7
iniconfig==2.0.0
Jinja2==3.1.4
kafka-python==2.0.2
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
mypy-extensions==1.0.0
nodeenv==1.8.0
orjson==3.10.3
packaging==24.0
pathspec==0.12.1
platformdirs==4.2.0
pluggy==1.3.0
pre-commit==3.6.2
pycparser==2.21
pydantic==2.6.4
platformdirs==4.2.2
pluggy==1.5.0
pre-commit==3.7.1
pycparser==2.22
pydantic==2.7.1
pydantic-settings==2.2.1
pydantic_core==2.16.3
PyMySQL==1.1.0
pytest==7.4.3
pydantic_core==2.18.2
Pygments==2.18.0
PyMySQL==1.1.1
pytest==8.2.1
pytest-asyncio==0.23.7
python-dotenv==1.0.1
python-multipart==0.0.9
PyYAML==6.0.1
requests==2.31.0
requests==2.32.2
rich==13.7.1
ruff==0.4.5
shellingham==1.5.4
sniffio==1.3.1
sortedcontainers==2.4.0
SQLAlchemy==2.0.28
starlette==0.36.3
SQLAlchemy==2.0.30
starlette==0.37.2
tomli==2.0.1
typing_extensions==4.10.0
urllib3==2.0.7
uvicorn==0.28.0
virtualenv==20.25.1
typer==0.12.3
typing_extensions==4.12.0
ujson==5.10.0
urllib3==2.2.1
uvicorn==0.29.0
uvloop==0.19.0
virtualenv==20.26.2
watchfiles==0.21.0
websockets==12.0
7 changes: 3 additions & 4 deletions src/app/repositories/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time

from src.app.views.input.report import Detection
from src.core import config
from src.core.fastapi.dependencies import kafka_engine

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -62,7 +62,6 @@ async def parse_data(self, data: list[Detection]) -> list[Detection] | None:

async def send_to_kafka(self, data: list[Detection]) -> None:
detections = [d.model_dump(mode="json") for d in data]
await asyncio.gather(
*[config.send_queue.put(detection) for detection in detections]
)
send_queue = kafka_engine.producer.get_queue()
await asyncio.gather(*[send_queue.put(d) for d in detections])
return
3 changes: 2 additions & 1 deletion src/app/views/input/report.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from typing import Optional

from pydantic import BaseModel
Expand All @@ -23,7 +24,7 @@ class Detection(BaseModel):
x_coord: int = Field(0, ge=0)
y_coord: int = Field(0, ge=0)
z_coord: int = Field(0, ge=0)
ts: int = Field(0, ge=0)
ts: int = Field(int(time.time()), ge=0)
manual_detect: int = Field(0, ge=0, le=1)
on_members_world: int = Field(0, ge=0, le=1)
on_pvp_world: int = Field(0, ge=0, le=1)
Expand Down
8 changes: 3 additions & 5 deletions src/core/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio

from dotenv import find_dotenv, load_dotenv
from pydantic_settings import BaseSettings

load_dotenv(find_dotenv())


class Settings(BaseSettings):
ENV: str
Expand All @@ -12,6 +13,3 @@ class Settings(BaseSettings):


settings = Settings()
producer = None
send_queue = None
sd_event = asyncio.Event()
156 changes: 0 additions & 156 deletions src/core/fastapi/dependencies/_kafka.py

This file was deleted.

9 changes: 9 additions & 0 deletions src/core/fastapi/dependencies/kafka_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from AioKafkaEngine import ProducerEngine

from src.core.config import settings

producer = ProducerEngine(
bootstrap_servers=[settings.KAFKA_HOST],
report_interval=60,
queue_size=500,
)
Loading

0 comments on commit a511796

Please sign in to comment.