-
Notifications
You must be signed in to change notification settings - Fork 379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
节点&任务状态优化 #7066
节点&任务状态优化 #7066
Conversation
350768c
to
787475e
Compare
project_id=taskflow_instance.project_id, | ||
) | ||
get_task_status_result: typing.Dict[str, typing.Any] = dispatcher.get_task_status(with_ex_data=False) | ||
if get_task_status_result.get("result"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
关于状态是否为 pending_process的逻辑可以直接在这里处理,在这一步就可以筛选出来 pending_process_taskflow_ids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
关于状态是否为 pending_process的逻辑可以直接在这里处理,在这一步就可以筛选出来 pending_process_taskflow_ids
不行,get_task_status
相对于 DB 查询来说,是一个耗时操作,需要先 DB 过滤出 RUNNING 的流程,再根据 RUNNING 判断最终状态是「等待处理」的流程,这样走 get_task_status
的任务数量在一个项目下就非常可控。
.values("root_id") | ||
.distinct() | ||
return self.queryset.exclude( | ||
pipeline_instance_id__in=self._fetch_pipeline_instance_ids(statuses=[states.FAILED, states.SUSPENDED]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
看我的注释,是 未完成的任务 -(暂停 + 失败的任务),这里改的好像有问题
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
看我的注释,是 未完成的任务 -(暂停 + 失败的任务),这里改的好像有问题
已确认,没有问题
# selected_related 只能在 objects 最前面加,此处先查处 ID 列表 | ||
taskflow_instance_ids: typing.List[int] = list( | ||
self.queryset.exclude( | ||
pipeline_instance_id__in=self._fetch_pipeline_instance_ids(statuses=[states.FAILED]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
等待处理应该只存在running状态下的任务,这里可以只拿处于running状态的taskflow_instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
等待处理应该只存在running状态下的任务,这里可以只拿处于running状态的taskflow_instance
逻辑就是这样,exclude
|
||
# Raise StopIteration if not found | ||
task_id: typing.Optional[int] = next( | ||
(node_output["value"] for node_output in node_outputs if node_output["key"] == "task_id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是取第一个task吗
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是取第一个task吗
不是,因为输出是 k-v,这里是遍历取第一个名为 task_id
的键
return ( | ||
notify_type | ||
if isinstance(notify_type, dict) | ||
else {"success": notify_type, "fail": notify_type, "pending_processing": []} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{"success": notify_type, "fail": notify_type, "pending_processing": notify_type}
是不是应该跟上面保持一致
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{"success": notify_type, "fail": notify_type, "pending_processing": notify_type} 是不是应该跟上面保持一致
已修改
gcloud/taskflow3/signals/handlers.py
Outdated
@@ -189,6 +191,13 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version, | |||
|
|||
_finish_taskflow_and_send_signal(root_id, taskflow_finished, True) | |||
|
|||
elif to_state == bamboo_engine_states.SUSPENDED and node_id == root_id: | |||
# TODO 发送通知,向上找到根流程,发送通知 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这块是否需要确认完再合入
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这块是否需要确认完再合入
已去除,按之前讨论的:独立子流程继承父流程通知配置
@@ -83,6 +85,15 @@ def execute(self, data, parent_data): | |||
return False | |||
|
|||
data.outputs.sn = result["data"]["sn"] | |||
|
|||
task_id: int = parent_data.get_one_of_inputs("task_id") | |||
send_taskflow_message.delay( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
目前发送通知的进程配置较低,需要看看这块加上之后是否需要调整进程资源
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
目前发送通知的进程配置较低,需要看看这块加上之后是否需要调整进程资源
后续关注
gcloud/taskflow3/celery/tasks.py
Outdated
|
||
if skip_if_not_status: | ||
# 满足某个具体状态才发通知 | ||
dispatcher = TaskCommandDispatcher( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
个人认为这个地方不应该有查询任务状态的操作,这个函数的功能抽象未发送任务消息,至于发送什么消息和配置,理论上应该在外部确认好传入
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
个人认为这个地方不应该有查询任务状态的操作,这个函数的功能抽象未发送任务消息,至于发送什么消息和配置,理论上应该在外部确认好传入
讨论结论:不做「等待处理」的整体状态判断,节点走到便发通知。
背景:并行网关场景下一分支有失败节点,另一分支有审批/人工确认场景下,任务状态是失败,如果这种场景发通知是否合理。
else: | ||
return {"id": taskflow_instance.id, "state": None} | ||
|
||
task_status_infos: typing.List[typing.Dict[str, typing.Any]] = concurrent.batch_call( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这块的逻辑可能对 db 有比较大的请求,需要考虑下这块风险
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这块的逻辑可能对 db 有比较大的请求,需要考虑下这块风险
先 DB 过滤出 RUNNING 的流程,再根据 RUNNING 判断最终状态是「等待处理」的流程,这样走 get_task_status 的任务数量在一个项目下就非常可控。
6937a9e
to
2e0a59e
Compare
2e0a59e
to
b0cdb91
Compare
…ture/issue_6653_bird # Conflicts: # frontend/desktop/src/assets/fonts/bksops-icon.eot # frontend/desktop/src/assets/fonts/bksops-icon.svg # frontend/desktop/src/assets/fonts/bksops-icon.ttf # frontend/desktop/src/assets/fonts/bksops-icon.woff # frontend/desktop/src/config/i18n/cn.js # frontend/desktop/src/config/i18n/en.js # frontend/desktop/src/pages/task/TaskExecute/TaskOperation.vue # frontend/desktop/src/scss/iconfont.scss # gcloud/core/apis/drf/viewsets/taskflow.py
* fix: 节点详情是否允许操作逻辑优化 # Reviewed, transaction id: 591 * fix: 失败后跳过节点展示报错信息 # Reviewed, transaction id: 592 * fix: 任务列表【等待处理】传参优化 # Reviewed, transaction id: 594 * fix: 失败后跳过展示异常修复 # Reviewed, transaction id: 596 * fix: 节点详情状态的icon对齐节点右上角icon # Reviewed, transaction id: 602
# Reviewed, transaction id: 621
* optimization: 更换icon&&任务详情中画布连线取消hover效果 # Reviewed, transaction id: 624 * optimization: 设置了自动重试的节点,点击 "暂停执行" 后,不给节点后面的线条加样式 # Reviewed, transaction id: 628
# Reviewed, transaction id: 650
# Reviewed, transaction id: 654
# Reviewed, transaction id: 667
feat: 任务状态优化
No description provided.