Skip to content

Commit

Permalink
fix: 修复知识库替换同步未对本地知识库进行覆盖
Browse files Browse the repository at this point in the history
  • Loading branch information
shaohuzhang1 committed Oct 25, 2024
1 parent f19a4d9 commit bf0e5c2
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 6 deletions.
11 changes: 7 additions & 4 deletions apps/dataset/serializers/dataset_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from dataset.serializers.common_serializers import list_paragraph, MetaSerializer, ProblemParagraphManage, \
get_embedding_model_by_dataset_id, get_embedding_model_id_by_dataset_id
from dataset.serializers.document_serializers import DocumentSerializers, DocumentInstanceSerializer
from dataset.task import sync_web_dataset
from dataset.task import sync_web_dataset, sync_replace_web_dataset
from embedding.models import SearchMode
from embedding.task import embedding_by_dataset, delete_embedding_by_dataset
from setting.models import AuthOperate
Expand Down Expand Up @@ -602,15 +602,18 @@ def handler(child_link: ChildLink, response: Fork.Response):
document_name = child_link.tag.text if child_link.tag is not None and len(
child_link.tag.text.strip()) > 0 else child_link.url
paragraphs = get_split_model('web.md').parse(response.content)
first = QuerySet(Document).filter(meta__source_url=child_link.url, dataset=dataset).first()
print(child_link.url.strip())
first = QuerySet(Document).filter(meta__source_url=child_link.url.strip(),
dataset=dataset).first()
if first is not None:
# 如果存在,使用文档同步
DocumentSerializers.Sync(data={'document_id': first.id}).sync()
else:
# 插入
DocumentSerializers.Create(data={'dataset_id': dataset.id}).save(
{'name': document_name, 'paragraphs': paragraphs,
'meta': {'source_url': child_link.url, 'selector': dataset.meta.get('selector')},
'meta': {'source_url': child_link.url.strip(),
'selector': dataset.meta.get('selector')},
'type': Type.web}, with_valid=True)
except Exception as e:
logging.getLogger("max_kb_error").error(f'{str(e)}:{traceback.format_exc()}')
Expand All @@ -624,7 +627,7 @@ def replace_sync(self, dataset):
"""
url = dataset.meta.get('source_url')
selector = dataset.meta.get('selector') if 'selector' in dataset.meta else None
sync_web_dataset.delay(str(dataset.id), url, selector)
sync_replace_web_dataset.delay(str(dataset.id), url, selector)

def complete_sync(self, dataset):
"""
Expand Down
14 changes: 13 additions & 1 deletion apps/dataset/task/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from celery_once import QueueOnce

from common.util.fork import ForkManage, Fork
from dataset.task.tools import get_save_handler, get_sync_web_document_handler
from dataset.task.tools import get_save_handler, get_sync_web_document_handler, get_sync_handler

from ops import celery_app

Expand All @@ -34,6 +34,18 @@ def sync_web_dataset(dataset_id: str, url: str, selector: str):
max_kb_error.error(f'同步web知识库:{dataset_id}出现错误{str(e)}{traceback.format_exc()}')


@celery_app.task(base=QueueOnce, once={'keys': ['dataset_id']}, name='celery:sync_web_dataset')
def sync_replace_web_dataset(dataset_id: str, url: str, selector: str):
try:
max_kb.info(f"开始--->开始同步web知识库:{dataset_id}")
ForkManage(url, selector.split(" ") if selector is not None else []).fork(2, set(),
get_sync_handler(dataset_id
))
max_kb.info(f"结束--->结束同步web知识库:{dataset_id}")
except Exception as e:
max_kb_error.error(f'同步web知识库:{dataset_id}出现错误{str(e)}{traceback.format_exc()}')


@celery_app.task(name='celery:sync_web_document')
def sync_web_document(dataset_id, source_url_list: List[str], selector: str):
handler = get_sync_web_document_handler(dataset_id)
Expand Down
32 changes: 31 additions & 1 deletion apps/dataset/task/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import re
import traceback

from django.db.models import QuerySet

from common.util.fork import ChildLink, Fork
from common.util.split_model import get_split_model
from dataset.models import Type, Document, Status
from dataset.models import Type, Document, DataSet, Status

max_kb_error = logging.getLogger("max_kb_error")
max_kb = logging.getLogger("max_kb")
Expand All @@ -38,6 +40,34 @@ def handler(child_link: ChildLink, response: Fork.Response):
return handler


def get_sync_handler(dataset_id):
from dataset.serializers.document_serializers import DocumentSerializers
dataset = QuerySet(DataSet).filter(id=dataset_id).first()

def handler(child_link: ChildLink, response: Fork.Response):
if response.status == 200:
try:

document_name = child_link.tag.text if child_link.tag is not None and len(
child_link.tag.text.strip()) > 0 else child_link.url
paragraphs = get_split_model('web.md').parse(response.content)
first = QuerySet(Document).filter(meta__source_url=child_link.url.strip(),
dataset=dataset).first()
if first is not None:
# 如果存在,使用文档同步
DocumentSerializers.Sync(data={'document_id': first.id}).sync()
else:
# 插入
DocumentSerializers.Create(data={'dataset_id': dataset.id}).save(
{'name': document_name, 'paragraphs': paragraphs,
'meta': {'source_url': child_link.url.strip(), 'selector': dataset.meta.get('selector')},
'type': Type.web}, with_valid=True)
except Exception as e:
logging.getLogger("max_kb_error").error(f'{str(e)}:{traceback.format_exc()}')

return handler


def get_sync_web_document_handler(dataset_id):
from dataset.serializers.document_serializers import DocumentSerializers

Expand Down

0 comments on commit bf0e5c2

Please sign in to comment.