Skip to content

Commit

Permalink
feat(frondend): add rebuild index button
Browse files Browse the repository at this point in the history
  • Loading branch information
Mini256 committed Feb 11, 2025
1 parent 859b1fb commit 017d8aa
Show file tree
Hide file tree
Showing 19 changed files with 392 additions and 241 deletions.
2 changes: 1 addition & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ migrate:

run_dev_server:
@echo "Running server..."
@rye run python main.py runserver
@rye run python main.py runserver --host 0.0.0.0 --port 5001

run_dev_celery_worker:
@echo "Running celery..."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from typing import Optional
from uuid import UUID

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -44,3 +45,10 @@ class DocumentItem(BaseModel):
last_modified_at: datetime
created_at: datetime
updated_at: datetime


class RebuildIndexResult(BaseModel):
reindex_document_ids: list[int] = Field(default_factory=list)
ignore_document_ids: list[int] = Field(default_factory=list)
reindex_chunk_ids: list[UUID] = Field(default_factory=list)
ignore_chunk_ids: list[UUID] = Field(default_factory=list)
159 changes: 92 additions & 67 deletions backend/app/api/admin_routes/knowledge_base/document/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,27 @@

from fastapi import APIRouter, Depends, Query, HTTPException
from fastapi_pagination import Params, Page
from sqlmodel import Session

from app.api.admin_routes.knowledge_base.models import ChunkItem
from app.api.deps import SessionDep, CurrentSuperuserDep
from app.models import Document, DocIndexTaskStatus
from app.models.chunk import KgIndexStatus, get_kb_chunk_model
from app.models import Document
from app.models.chunk import Chunk, KgIndexStatus, get_kb_chunk_model
from app.models.document import DocIndexTaskStatus
from app.models.entity import get_kb_entity_model
from app.models.relationship import get_kb_relationship_model
from app.repositories import knowledge_base_repo, document_repo
from app.repositories.chunk import ChunkRepo
from app.api.admin_routes.knowledge_base.document.models import (
DocumentFilters,
DocumentItem,
RebuildIndexResult,
)
from app.exceptions import (
InternalServerError,
KBNotFound,
DocumentNotFound,
)
from app.exceptions import InternalServerError
from app.repositories.graph import GraphRepo
from app.tasks.build_index import build_index_for_document, build_kg_index_for_chunk
from app.tasks.knowledge_base import stats_for_knowledge_base
from app.tasks import (
build_kg_index_for_chunk,
build_index_for_document,
)


router = APIRouter()
logger = logging.getLogger(__name__)
Expand All @@ -48,8 +45,8 @@ def list_kb_documents(
filters=filters,
params=params,
)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -66,8 +63,8 @@ def get_kb_document_by_id(
document = document_repo.must_get(session, doc_id)
assert document.knowledge_base_id == kb_id
return document
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -84,8 +81,8 @@ def list_kb_document_chunks(
kb = knowledge_base_repo.must_get(session, kb_id)
chunk_repo = ChunkRepo(get_kb_chunk_model(kb))
return chunk_repo.get_document_chunks(session, doc_id)
except KBNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(e)
raise InternalServerError()
Expand All @@ -97,7 +94,7 @@ def remove_kb_document(
user: CurrentSuperuserDep,
kb_id: int,
document_id: int,
):
) -> RebuildIndexResult:
try:
kb = knowledge_base_repo.must_get(session, kb_id)
doc = document_repo.must_get(session, document_id)
Expand Down Expand Up @@ -127,71 +124,99 @@ def remove_kb_document(
stats_for_knowledge_base.delay(kb_id)

return {"detail": "success"}
except KBNotFound as e:
raise e
except DocumentNotFound as e:
raise e
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to remove document #{document_id}: {e}")
raise InternalServerError()


@router.post("/admin/knowledge_bases/{kb_id}/documents/{doc_id}/reindex")
def retry_kb_document_index(
@router.post("/admin/knowledge_bases/{kb_id}/documents/reindex")
def rebuild_kb_documents_index(
session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
doc_id: int,
reindex_completed: bool = True,
document_ids: list[int],
reindex_completed_task: bool = False,
):
try:
kb = knowledge_base_repo.must_get(session, kb_id)
return rebuild_kb_document_index_by_ids(
session, kb_id, document_ids, reindex_completed_task
)
except HTTPException:
raise
except Exception as e:
logger.exception(e, exc_info=True)
raise InternalServerError()


@router.post("/admin/knowledge_bases/{kb_id}/documents/{doc_id}/reindex")
def rebuild_kb_document_index(
db_session: SessionDep,
user: CurrentSuperuserDep,
kb_id: int,
doc_id: int,
reindex_completed_task: bool = False,
) -> RebuildIndexResult:
try:
document_ids = [doc_id]
return rebuild_kb_document_index_by_ids(
db_session, kb_id, document_ids, reindex_completed_task
)
except HTTPException:
raise
except Exception as e:
logger.exception(e, exc_info=True)
raise InternalServerError()

# Retry failed vector index tasks.
doc = document_repo.must_get(session, doc_id)
reindex_document_ids = []
ignore_document_ids = []

if doc.index_status == DocIndexTaskStatus.COMPLETED and not reindex_completed:
def rebuild_kb_document_index_by_ids(
db_session: Session,
kb_id: int,
document_ids: list[int],
reindex_completed_task: bool = False,
) -> RebuildIndexResult:
kb = knowledge_base_repo.must_get(db_session, kb_id)
kb_chunk_repo = ChunkRepo(get_kb_chunk_model(kb))

# Retry failed vector index tasks.
documents = document_repo.fetch_by_ids(db_session, document_ids)
reindex_document_ids = []
ignore_document_ids = []

for doc in documents:
# TODO: check NOT_STARTED, PENDING, RUNNING
if doc.index_status != DocIndexTaskStatus.FAILED and not reindex_completed_task:
ignore_document_ids.append(doc.id)
else:
reindex_document_ids.append(doc.id)

doc.index_status = DocIndexTaskStatus.PENDING
session.add(doc)
session.commit()
build_index_for_document.delay(kb.id, doc.id)
logger.info(f"Triggered document #{len(doc.id)} to rebuilt vector index.")
doc.index_status = DocIndexTaskStatus.PENDING
db_session.add(doc)
db_session.commit()

# Retry failed kg index tasks.
chunk_repo = ChunkRepo(get_kb_chunk_model(kb))
chunks = chunk_repo.get_document_chunks(session, doc_id)
reindex_chunk_ids = []
ignore_chunk_ids = []
for chunk in chunks:
if chunk.index_status == KgIndexStatus.COMPLETED and not reindex_completed:
ignore_chunk_ids.append(chunk.id)
continue
else:
reindex_chunk_ids.append(chunk.id)

chunk.index_status = KgIndexStatus.PENDING
session.add(chunk)
session.commit()
build_kg_index_for_chunk.delay(kb_id, chunk.id)
build_index_for_document.delay(kb.id, doc.id)

logger.info(
f"Triggered {len(reindex_chunk_ids)} chunks to rebuilt knowledge graph index."
)
# Retry failed kg index tasks.
chunks: list[Chunk] = kb_chunk_repo.fetch_by_document_ids(db_session, document_ids)
reindex_chunk_ids = []
ignore_chunk_ids = []
for chunk in chunks:
if chunk.index_status == KgIndexStatus.COMPLETED and not reindex_completed_task:
ignore_chunk_ids.append(chunk.id)
continue
else:
reindex_chunk_ids.append(chunk.id)

return {
"reindex_document_ids": reindex_document_ids,
"ignore_document_ids": ignore_document_ids,
"reindex_chunk_ids": reindex_chunk_ids,
"ignore_chunk_ids": ignore_chunk_ids,
}
except HTTPException as e:
raise e
except Exception as e:
logger.exception(e, exc_info=True)
raise InternalServerError()
chunk.index_status = KgIndexStatus.PENDING
db_session.add(chunk)
db_session.commit()

build_kg_index_for_chunk.delay(kb.id, chunk.id)

return RebuildIndexResult(
reindex_document_ids=reindex_document_ids,
ignore_document_ids=ignore_document_ids,
reindex_chunk_ids=reindex_chunk_ids,
ignore_chunk_ids=ignore_chunk_ids,
)
Loading

0 comments on commit 017d8aa

Please sign in to comment.