Skip to content

Commit

Permalink
Merge pull request #953 from RockChinQ/perf/dify-sv-api
Browse files Browse the repository at this point in the history
perf: 完善 dify api runner
  • Loading branch information
RockChinQ authored Dec 16, 2024
2 parents 0dcd2d8 + 3314a7a commit 209e897
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 94 deletions.
6 changes: 3 additions & 3 deletions libs/dify_service_api/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ class TestDifyClient:
async def test_chat_messages(self):
cln = client.AsyncDifyServiceClient(api_key=os.getenv("DIFY_API_KEY"), base_url=os.getenv("DIFY_BASE_URL"))

resp = await cln.chat_messages(inputs={}, query="Who are you?", user="test")
print(json.dumps(resp, ensure_ascii=False, indent=4))
async for chunk in cln.chat_messages(inputs={}, query="调用工具查看现在几点?", user="test"):
print(json.dumps(chunk, ensure_ascii=False, indent=4))

async def test_upload_file(self):
cln = client.AsyncDifyServiceClient(api_key=os.getenv("DIFY_API_KEY"), base_url=os.getenv("DIFY_BASE_URL"))
Expand Down Expand Up @@ -41,4 +41,4 @@ async def test_workflow_run(self):
print(json.dumps(chunks, ensure_ascii=False, indent=4))

if __name__ == "__main__":
asyncio.run(TestDifyClient().test_workflow_run())
asyncio.run(TestDifyClient().test_chat_messages())
31 changes: 16 additions & 15 deletions libs/dify_service_api/v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ async def chat_messages(
inputs: dict[str, typing.Any],
query: str,
user: str,
response_mode: str = "blocking", # 当前不支持 streaming
response_mode: str = "streaming", # 当前不支持 blocking
conversation_id: str = "",
files: list[dict[str, typing.Any]] = [],
timeout: float = 30.0,
) -> dict[str, typing.Any]:
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""发送消息"""
if response_mode != "blocking":
raise DifyAPIError("当前仅支持 blocking 模式")
if response_mode != "streaming":
raise DifyAPIError("当前仅支持 streaming 模式")

async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
timeout=timeout,
) as client:
response = await client.post(
async with client.stream(
"POST",
"/chat-messages",
headers={"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"},
json={
Expand All @@ -51,12 +52,14 @@ async def chat_messages(
"conversation_id": conversation_id,
"files": files,
},
)

if response.status_code != 200:
raise DifyAPIError(f"{response.status_code} {response.text}")

return response.json()
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise DifyAPIError(f"{r.status_code} {chunk}")
if chunk.strip() == "":
continue
if chunk.startswith("data:"):
yield json.loads(chunk[5:])

async def workflow_run(
self,
Expand Down Expand Up @@ -88,6 +91,8 @@ async def workflow_run(
},
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise DifyAPIError(f"{r.status_code} {chunk}")
if chunk.strip() == "":
continue
if chunk.startswith("data:"):
Expand All @@ -100,10 +105,6 @@ async def upload_file(
timeout: float = 30.0,
) -> str:
"""上传文件"""
# curl -X POST 'http://dify.rockchin.top/v1/files/upload' \
# --header 'Authorization: Bearer {api_key}' \
# --form 'file=@localfile;type=image/[png|jpeg|jpg|webp|gif] \
# --form 'user=abc-123'
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
Expand Down
24 changes: 24 additions & 0 deletions pkg/core/migrations/m017_dify_api_timeout_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from .. import migration


@migration.migration_class("dify-api-timeout-params", 17)
class DifyAPITimeoutParamsMigration(migration.Migration):
"""迁移"""

async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['chat'] or 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['workflow'] \
or 'agent' not in self.ap.provider_cfg.data['dify-service-api']

async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dify-service-api']['chat']['timeout'] = 120
self.ap.provider_cfg.data['dify-service-api']['workflow']['timeout'] = 120
self.ap.provider_cfg.data['dify-service-api']['agent'] = {
"api-key": "app-1234567890",
"timeout": 120
}

await self.ap.provider_cfg.dump_config()
2 changes: 1 addition & 1 deletion pkg/core/stages/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ..migrations import m001_sensitive_word_migration, m002_openai_config_migration, m003_anthropic_requester_cfg_completion, m004_moonshot_cfg_completion
from ..migrations import m005_deepseek_cfg_completion, m006_vision_config, m007_qcg_center_url, m008_ad_fixwin_config_migrate, m009_msg_truncator_cfg
from ..migrations import m010_ollama_requester_config, m011_command_prefix_config, m012_runner_config, m013_http_api_config, m014_force_delay_config
from ..migrations import m015_gitee_ai_config, m016_dify_service_api
from ..migrations import m015_gitee_ai_config, m016_dify_service_api, m017_dify_api_timeout_params


@stage.stage_class("MigrationStage")
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/preproc/preproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def process(


# 检查vision是否启用,没启用就删除所有图片
if not self.ap.provider_cfg.data['enable-vision'] or not query.use_model.vision_supported:
if not self.ap.provider_cfg.data['enable-vision'] or (self.ap.provider_cfg.data['runner'] == 'local-agent' and not query.use_model.vision_supported):
for msg in query.messages:
if isinstance(msg.content, list):
for me in msg.content:
Expand All @@ -60,13 +60,13 @@ async def process(
llm_entities.ContentElement.from_text(me.text)
)
elif isinstance(me, platform_message.Image):
if self.ap.provider_cfg.data['enable-vision'] and query.use_model.vision_supported:
if self.ap.provider_cfg.data['enable-vision'] and (self.ap.provider_cfg.data['runner'] != 'local-agent' or query.use_model.vision_supported):
if me.url is not None:
content_list.append(
llm_entities.ContentElement.from_image_url(str(me.url))
)

query.user_message = llm_entities.Message( # TODO 适配多模态输入
query.user_message = llm_entities.Message(
role='user',
content=content_list
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/process/handlers/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async def handle(
query.session.using_conversation.messages.extend(query.resp_messages)
except Exception as e:

self.ap.logger.error(f'对话({query.query_id})请求失败: {str(e)}')
self.ap.logger.error(f'对话({query.query_id})请求失败: {type(e).__name__} {str(e)}')

yield entities.StageProcessResult(
result_type=entities.ResultType.INTERRUPT,
Expand Down
Loading

0 comments on commit 209e897

Please sign in to comment.