Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statuses sending and workers statuses update #42

Merged
merged 35 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
66ef5c2
first docker formula
Nekxis Jun 2, 2024
3608f4f
output fix
Nekxis Jun 2, 2024
da14d51
root orientation to fix
Nekxis Jun 2, 2024
a5f3f05
docker-compose
Nekxis Jun 2, 2024
22b8710
add running guide to README.md
latekvo Jun 2, 2024
07a33b9
add frontend running entry to README.md
latekvo Jun 2, 2024
f83f09f
update install steps in README
latekvo Jun 2, 2024
e8e3051
docker-compose fast api connection
Nekxis Jun 2, 2024
172d60d
Merge remote-tracking branch 'origin/@nekxis/app_dockerization' into …
Nekxis Jun 2, 2024
0bf7a9b
mac os fix
Nekxis Jun 6, 2024
40330ec
docker ignore and environment clenup.
Nekxis Jun 6, 2024
196c739
Merge branch 'main' into @nekxis/app_dockerization
latekvo Jun 8, 2024
c44e316
Added connection handling and status sending using websocket.
SOSNE Jun 12, 2024
a625fe7
Added RabbitMQ Consumer that after callback sends status to a user.
SOSNE Jun 13, 2024
eae57cc
Merge branch 'refs/heads/main' into @SOSNE/statuses-sending-and-worke…
SOSNE Jun 13, 2024
3535c7e
Changes in logic, added function to send status via RabbitMq (this fu…
SOSNE Jun 14, 2024
53c938d
Merge branch 'refs/heads/@nekxis/app_dockerization' into @SOSNE/statu…
SOSNE Jun 15, 2024
2d854a3
Fixed async callback using sync decorator, small fixes to postgres.
SOSNE Jun 15, 2024
daa5f33
Python black reformat.
SOSNE Jun 15, 2024
29249b4
Added support for sending lists of uuids to track.
SOSNE Jun 16, 2024
869c1e2
Added handling websocket disconnection.
SOSNE Jun 16, 2024
5682a86
Merge remote-tracking branch 'refs/remotes/origin/main' into @SOSNE/s…
SOSNE Jun 16, 2024
9616dfc
Added RabbitMq to a docker-compose.yml. Changed FAISS path in dbops.p…
SOSNE Jun 19, 2024
7211c13
Rabbit connection address fix.
SOSNE Jun 19, 2024
60d6453
Fix in summarizer.
SOSNE Jun 19, 2024
d43c773
Status sending placement fix.
SOSNE Jun 19, 2024
692c3b3
Python black refactor.
SOSNE Jun 19, 2024
fe52590
Added payload to send summary. Added rabbit producer connection closing.
SOSNE Jun 21, 2024
69bd53f
Bring back good code.
SOSNE Jun 21, 2024
b0d3aea
Applied small changes .
SOSNE Jun 23, 2024
1333039
Applied changes. Function no longer close connection after sending st…
SOSNE Jun 23, 2024
8c6f0c2
Fixed error that started connection in every worker container. Fixed …
SOSNE Jun 23, 2024
a71a754
Merge branch 'main' into @SOSNE/statuses-sending-and-workers-statuses…
latekvo Jun 23, 2024
e391779
Restore path to faiss.
SOSNE Jun 25, 2024
cdb8609
Merge remote-tracking branch 'origin/@SOSNE/statuses-sending-and-work…
SOSNE Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/tools/dbops.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def get_vec_db_by_name(db_name: str, embeddings: Embeddings) -> FAISS:
except Exception:
# legacy version
db_connection = FAISS.load_local(
folder_path=folder_path, embeddings=embeddings, index_name=db_name
folder_path=folder_path,
embeddings=embeddings,
index_name=db_name,
allow_dangerous_deserialization=True,
SOSNE marked this conversation as resolved.
Show resolved Hide resolved
)

return db_connection
27 changes: 24 additions & 3 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
services:
rabbitmq:
image: rabbitmq:3.13-management
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
networks:
- app-network
healthcheck:
test: [ "CMD", "rabbitmqctl", "status" ]
interval: 5s
timeout: 20s
retries: 5

postgres:
image: postgres
restart: always
Expand Down Expand Up @@ -68,8 +82,12 @@ services:
summarizer:
image: summarizer
depends_on:
- postgres
- ollama
postgres:
condition: service_started
rabbitmq:
condition: service_healthy
ollama:
condition: service_started
build:
context: ../.
dockerfile: ./docker/summarizer/Dockerfile
Expand All @@ -82,7 +100,10 @@ services:
webui:
image: webui
depends_on:
- postgres
postgres:
condition: service_started
rabbitmq:
condition: service_healthy
build:
context: ../.
dockerfile: ./docker/webui/Dockerfile
Expand Down
113 changes: 69 additions & 44 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,93 +3,118 @@ channels:
- conda-forge
- defaults
dependencies:
- _libgcc_mutex=0.1
- _openmp_mutex=5.1
- annotated-types=0.6.0
- anyio=4.2.0
- black=24.4.2
- beautifulsoup4=4.12.2
- brotli-python=1.0.9
- bzip2=1.0.8
- ca-certificates=2024.3.11
- click=8.1.7
- exceptiongroup=1.2.0
- fastapi=0.103.0
- idna=3.7
- certifi=2024.6.2
- colorama=0.4.6
- filelock=3.13.1
- fsspec=2023.10.0
- googlesearch=3.0.0
- huggingface_hub=0.20.3
- krb5=1.20.1
- ld_impl_linux-64=2.36.1
- libblas=3.9.0
- libcxx=8.0.0
- libedit=3.1.20230828
- libffi=3.4.4
- libgcc-ng=11.2.0
- libgomp=11.2.0
- libfaiss=1.7.4
- libffi=3.4.2
- libgfortran5=11.3.0
- liblapack=3.9.0
- libopenblas=0.3.23
- libpq=12.17
- libstdcxx-ng=11.2.0
- mypy_extensions=1.0.0
- ncurses=6.4
- openssl=3.0.14
- libsqlite=3.45.3
- libzlib=1.2.13
- llvm-openmp=18.1.3
- ncurses=6.4.20240210
- openssl=3.2.1
- packaging=23.2
- pathspec=0.10.3
- pip=24.0
- platformdirs=3.10.0
- psycopg2=2.9.9
- python=3.9.7
- pysocks=1.7.1
- python=3.9.19
- python_abi=3.9
- pyyaml=6.0.1
- readline=8.2
- requests=2.31.0
- setuptools=69.5.1
- sniffio=1.3.0
- soupsieve=2.5
- sqlite=3.45.3
- starlette=0.27.0
- tk=8.6.14
- tomli=2.0.1
- typing-extensions=4.11.0
- typing_extensions=4.11.0
- tk=8.6.13
- tqdm=4.65.0
- typing_extensions=4.9.0
- tzdata=2024a
- wheel=0.43.0
- xz=5.4.6
- xz=5.2.6
- yaml=0.2.5
- zlib=1.2.13
- pip:
- aiohttp==3.9.5
- aiosignal==1.3.1
- annotated-types==0.6.0
- anyio==4.3.0
- async-timeout==4.0.3
- attrs==23.2.0
- beautifulsoup4==4.12.3
- certifi==2024.2.2
- black==24.4.2
- chardet==5.2.0
- charset-normalizer==3.3.2
- colorama==0.4.6
- click==8.1.7
- dataclasses-json==0.6.4
- diskcache==5.6.3
- eval-type-backport==0.2.0
- faiss-cpu==1.7.2
- filelock==3.13.4
- dnspython==2.6.1
- email-validator==2.1.1
- exceptiongroup==1.2.1
- faiss-cpu==1.8.0
- fastapi==0.111.0
- fastapi-cli==0.0.4
- frozenlist==1.4.1
- fsspec==2024.3.1
- google==3.0.0
- greenlet==3.0.3
- h11==0.14.0
- huggingface-hub==0.22.2
- httpcore==1.0.5
- httptools==0.6.1
- httpx==0.27.0
- idna==3.7
- jinja2==3.1.3
- jsonpatch==1.33
- jsonpointer==2.4
- langchain==0.1.16
- langchain-community==0.0.33
- langchain-core==0.1.43
- langchain==0.1.17
- langchain-community==0.0.36
- langchain-core==0.1.50
- langchain-text-splitters==0.0.1
- langsmith==0.1.48
- llama-cpp-python==0.2.61
- markdown-it-py==3.0.0
- markupsafe==2.1.5
- marshmallow==3.21.1
- mdurl==0.1.2
- multidict==6.0.5
- mypy-extensions==1.0.0
- numpy==1.26.4
- orjson==3.10.1
- pathspec==0.12.1
- pika==1.3.2
- platformdirs==4.2.1
- pydantic==2.7.0
- pydantic-core==2.18.1
- pyyaml==6.0.1
- pygments==2.18.0
- python-dotenv==1.0.1
- python-multipart==0.0.9
- regex==2024.4.16
- requests==2.31.0
- soupsieve==2.5
- rich==13.7.1
- shellingham==1.5.4
- sniffio==1.3.1
- sqlalchemy==2.0.29
- starlette==0.37.2
- tenacity==8.2.3
- tiktoken==0.6.0
- tinydb==4.8.0
- tqdm==4.66.2
- tomli==2.0.1
- typer==0.12.3
- typing-extensions==4.11.0
- typing-inspect==0.9.0
- ujson==5.10.0
- urllib3==2.2.1
- uvicorn==0.29.0
- uvloop==0.19.0
- watchfiles==0.21.0
- websockets==12.0
- yarl==1.9.4
82 changes: 82 additions & 0 deletions webui/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from core.databases.db_crawl_tasks import db_add_crawl_task, db_get_crawl_tasks_by_page
Expand All @@ -7,6 +8,12 @@
db_get_completion_tasks_by_page,
)
from pydantic import BaseModel
import pika
import threading
import json
import asyncio
import functools

from typing import Literal


Expand All @@ -32,11 +39,13 @@ def add_crawl_task(req_body: RequestBody):
raise HTTPException(status_code=500, detail="Something went wrong")
return {"uuid": new_uuid}


@app.get("/crawl")
def get_crawl_tasks(page: int = 0):
crawl_tasks = db_get_crawl_tasks_by_page(page)
return {"tasks": crawl_tasks}


@app.post("/completion")
def add_completion_task(req_body: RequestBody):
new_uuid = db_add_completion_task(req_body.prompt, req_body.mode)
Expand All @@ -49,3 +58,76 @@ def add_completion_task(req_body: RequestBody):
def get_completion_tasks(page: int = 0):
completion_tasks = db_get_completion_tasks_by_page(page)
return {"tasks": completion_tasks}


active_connections = {}


@app.websocket("/ws")
async def on_connection(websocket: WebSocket):
await websocket.accept()
active_connections[websocket] = []
try:
while True:
try:
data = await websocket.receive_text()
data_dict = json.loads(data)
if isinstance(data_dict["uuid"], list):
active_connections[websocket].extend(data_dict["uuid"])
else:
active_connections[websocket].append(data_dict["uuid"])
except WebSocketDisconnect:
break
finally:
if websocket in active_connections:
del active_connections[websocket]


# This decorator allow to call async callback function in side synchronous context
def sync(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(f(*args, **kwargs))

return wrapper


@sync
async def callback(ch, method, properties, body):
data = json.loads(body)
uuid = data["task_uuid"]
status = data["status"]
payload = data["payload"]
message = json.dumps({"uuid": uuid, "status": status, "payload": payload})
for websocket, uuid_list in active_connections.items():
if uuid in uuid_list:
try:
await websocket.send_text(message)
except WebSocketDisconnect:
del active_connections[websocket]


def consumer():
connection_params = pika.ConnectionParameters(host="rabbitmq", port=5672)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()

channel.exchange_declare(exchange="status", exchange_type="direct")

channel.queue_declare(queue="update_status", durable=True)

channel.queue_bind(
exchange="status", queue="update_status", routing_key="update_status"
)

channel.basic_consume(
queue="update_status", on_message_callback=callback, auto_ack=True
)

channel.start_consuming()


consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()
23 changes: 19 additions & 4 deletions workers/summarizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from core.tools.utils import sleep_noisy
from colorama import Fore, Style

import pika
import json

output_parser = StrOutputParser()

llm = None
Expand All @@ -34,11 +37,17 @@
task_queue_limit = 10


def send_status_to_api_via_rabbitmq(task_uuid: str, status: str, routing_key: str, payload: str, channel):
channel.exchange_declare(exchange="status", exchange_type="direct")
message = json.dumps({"task_uuid": task_uuid, "status": status, "payload": payload})
channel.basic_publish(exchange="status", routing_key=routing_key, body=message)


def extract_uuid(task):
return task.uuid


def summarize():
def summarize(channel):
global task_queue, llm

if llm is None:
Expand All @@ -61,6 +70,9 @@ def summarize():
task_queue.remove(task)
task_uuid_list = list(map(extract_uuid, task_queue))
db_release_executing_tasks(task_uuid_list)
send_status_to_api_via_rabbitmq(
current_task.uuid, "embedding completed", "update_status", "", channel
)

if current_task is None:
return
Expand Down Expand Up @@ -105,6 +117,7 @@ def get_context(_: dict):
db_update_completion_task_after_summarizing(summary, current_task.uuid)

print(f"{Fore.CYAN}Completed task with uuid: {Fore.RESET}", current_task.uuid)
send_status_to_api_via_rabbitmq(current_task.uuid, "summary completed", "update_status", summary, channel)


previous_queued_tasks = 0
Expand All @@ -119,7 +132,9 @@ def get_context(_: dict):

def start_summarizer():
global previous_queued_tasks

connection_params = pika.ConnectionParameters(host="rabbitmq", port=5672, heartbeat=600)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
while True:
queue_length = len(task_queue)
if queue_length > previous_queued_tasks:
Expand All @@ -129,7 +144,7 @@ def start_summarizer():

if queue_length != previous_queued_tasks:
print(f"{Fore.CYAN}tasks left:", queue_length, f"{Style.RESET_ALL}")
previous_tasks_queued = queue_length
previous_queued_tasks = queue_length

summarize()
summarize(channel)
sleep_noisy(5)
Loading