From bf0e5c29494486b47b1e0466bdf216430568b4d6 Mon Sep 17 00:00:00 2001 From: shaohuzhang1 Date: Fri, 25 Oct 2024 19:11:49 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E7=9F=A5=E8=AF=86?= =?UTF-8?q?=E5=BA=93=E6=9B=BF=E6=8D=A2=E5=90=8C=E6=AD=A5=E6=9C=AA=E5=AF=B9?= =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E7=9F=A5=E8=AF=86=E5=BA=93=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?=E8=A6=86=E7=9B=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../serializers/dataset_serializers.py | 11 ++++--- apps/dataset/task/sync.py | 14 +++++++- apps/dataset/task/tools.py | 32 ++++++++++++++++++- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/apps/dataset/serializers/dataset_serializers.py b/apps/dataset/serializers/dataset_serializers.py index d48b9d8f6b..7250bea774 100644 --- a/apps/dataset/serializers/dataset_serializers.py +++ b/apps/dataset/serializers/dataset_serializers.py @@ -38,7 +38,7 @@ from dataset.serializers.common_serializers import list_paragraph, MetaSerializer, ProblemParagraphManage, \ get_embedding_model_by_dataset_id, get_embedding_model_id_by_dataset_id from dataset.serializers.document_serializers import DocumentSerializers, DocumentInstanceSerializer -from dataset.task import sync_web_dataset +from dataset.task import sync_web_dataset, sync_replace_web_dataset from embedding.models import SearchMode from embedding.task import embedding_by_dataset, delete_embedding_by_dataset from setting.models import AuthOperate @@ -602,7 +602,9 @@ def handler(child_link: ChildLink, response: Fork.Response): document_name = child_link.tag.text if child_link.tag is not None and len( child_link.tag.text.strip()) > 0 else child_link.url paragraphs = get_split_model('web.md').parse(response.content) - first = QuerySet(Document).filter(meta__source_url=child_link.url, dataset=dataset).first() + print(child_link.url.strip()) + first = QuerySet(Document).filter(meta__source_url=child_link.url.strip(), + dataset=dataset).first() if first is not None: # 如果存在,使用文档同步 DocumentSerializers.Sync(data={'document_id': first.id}).sync() @@ -610,7 +612,8 @@ def handler(child_link: ChildLink, response: Fork.Response): # 插入 DocumentSerializers.Create(data={'dataset_id': dataset.id}).save( {'name': document_name, 'paragraphs': paragraphs, - 'meta': {'source_url': child_link.url, 'selector': dataset.meta.get('selector')}, + 'meta': {'source_url': child_link.url.strip(), + 'selector': dataset.meta.get('selector')}, 'type': Type.web}, with_valid=True) except Exception as e: logging.getLogger("max_kb_error").error(f'{str(e)}:{traceback.format_exc()}') @@ -624,7 +627,7 @@ def replace_sync(self, dataset): """ url = dataset.meta.get('source_url') selector = dataset.meta.get('selector') if 'selector' in dataset.meta else None - sync_web_dataset.delay(str(dataset.id), url, selector) + sync_replace_web_dataset.delay(str(dataset.id), url, selector) def complete_sync(self, dataset): """ diff --git a/apps/dataset/task/sync.py b/apps/dataset/task/sync.py index ee4c034309..47c72d1bc7 100644 --- a/apps/dataset/task/sync.py +++ b/apps/dataset/task/sync.py @@ -14,7 +14,7 @@ from celery_once import QueueOnce from common.util.fork import ForkManage, Fork -from dataset.task.tools import get_save_handler, get_sync_web_document_handler +from dataset.task.tools import get_save_handler, get_sync_web_document_handler, get_sync_handler from ops import celery_app @@ -34,6 +34,18 @@ def sync_web_dataset(dataset_id: str, url: str, selector: str): max_kb_error.error(f'同步web知识库:{dataset_id}出现错误{str(e)}{traceback.format_exc()}') +@celery_app.task(base=QueueOnce, once={'keys': ['dataset_id']}, name='celery:sync_web_dataset') +def sync_replace_web_dataset(dataset_id: str, url: str, selector: str): + try: + max_kb.info(f"开始--->开始同步web知识库:{dataset_id}") + ForkManage(url, selector.split(" ") if selector is not None else []).fork(2, set(), + get_sync_handler(dataset_id + )) + max_kb.info(f"结束--->结束同步web知识库:{dataset_id}") + except Exception as e: + max_kb_error.error(f'同步web知识库:{dataset_id}出现错误{str(e)}{traceback.format_exc()}') + + @celery_app.task(name='celery:sync_web_document') def sync_web_document(dataset_id, source_url_list: List[str], selector: str): handler = get_sync_web_document_handler(dataset_id) diff --git a/apps/dataset/task/tools.py b/apps/dataset/task/tools.py index 427a691a0e..9838a755c7 100644 --- a/apps/dataset/task/tools.py +++ b/apps/dataset/task/tools.py @@ -11,9 +11,11 @@ import re import traceback +from django.db.models import QuerySet + from common.util.fork import ChildLink, Fork from common.util.split_model import get_split_model -from dataset.models import Type, Document, Status +from dataset.models import Type, Document, DataSet, Status max_kb_error = logging.getLogger("max_kb_error") max_kb = logging.getLogger("max_kb") @@ -38,6 +40,34 @@ def handler(child_link: ChildLink, response: Fork.Response): return handler +def get_sync_handler(dataset_id): + from dataset.serializers.document_serializers import DocumentSerializers + dataset = QuerySet(DataSet).filter(id=dataset_id).first() + + def handler(child_link: ChildLink, response: Fork.Response): + if response.status == 200: + try: + + document_name = child_link.tag.text if child_link.tag is not None and len( + child_link.tag.text.strip()) > 0 else child_link.url + paragraphs = get_split_model('web.md').parse(response.content) + first = QuerySet(Document).filter(meta__source_url=child_link.url.strip(), + dataset=dataset).first() + if first is not None: + # 如果存在,使用文档同步 + DocumentSerializers.Sync(data={'document_id': first.id}).sync() + else: + # 插入 + DocumentSerializers.Create(data={'dataset_id': dataset.id}).save( + {'name': document_name, 'paragraphs': paragraphs, + 'meta': {'source_url': child_link.url.strip(), 'selector': dataset.meta.get('selector')}, + 'type': Type.web}, with_valid=True) + except Exception as e: + logging.getLogger("max_kb_error").error(f'{str(e)}:{traceback.format_exc()}') + + return handler + + def get_sync_web_document_handler(dataset_id): from dataset.serializers.document_serializers import DocumentSerializers