diff --git a/dbm-ui/backend/db_services/taskflow/handlers.py b/dbm-ui/backend/db_services/taskflow/handlers.py index fc8eed2462..5ccc310b45 100644 --- a/dbm-ui/backend/db_services/taskflow/handlers.py +++ b/dbm-ui/backend/db_services/taskflow/handlers.py @@ -75,16 +75,25 @@ def revoke_pipeline(self): return result - def retry_node(self, node_id: str): + def retry_node(self, node: str): """重试节点""" - return task.retry_node(root_id=self.root_id, node_id=node_id, retry_times=1) + if isinstance(node, str): + flow_node = FlowNode.objects.get(root_id=self.root_id, node_id=node) + else: + flow_node = node + return task.retry_node(root_id=self.root_id, flow_node=flow_node, retry_times=1) def batch_retry_nodes(self): """批量重试节点""" node_ids = self.get_failed_node_ids() + + flow_nodes = FlowNode.objects.filter(node_id__in=node_ids, root_id=self.root_id).all() + flow_node_dict = {node.node_id: node for node in flow_nodes} + for node_id in node_ids: + flow_node = flow_node_dict.get(node_id) try: - self.retry_node(node_id) + self.retry_node(flow_node) except Exception as err: logger.error(f"{node_id} retry failed, {err}") diff --git a/dbm-ui/backend/db_services/taskflow/task.py b/dbm-ui/backend/db_services/taskflow/task.py index cdf0b99e22..871e5432df 100644 --- a/dbm-ui/backend/db_services/taskflow/task.py +++ b/dbm-ui/backend/db_services/taskflow/task.py @@ -49,13 +49,13 @@ @shared_task -def retry_node(root_id: str, node_id: str, retry_times: int) -> Union[EngineAPIResult, Any]: +def retry_node(root_id: str, flow_node: FlowNode, retry_times: int) -> Union[EngineAPIResult, Any]: """重试flow任务节点""" def send_flow_state(state, _root_id, _node_id, _version_id): post_set_state.send( sender=None, - node_id=node_id, + node_id=flow_node.node_id, to_state=state, version=flow_node.version_id, root_id=flow_node.root_id, @@ -63,11 +63,9 @@ def send_flow_state(state, _root_id, _node_id, _version_id): loop=None, ) - flow_node = FlowNode.objects.get(root_id=root_id, node_id=node_id) - # 实例化一个Service实例用于捕获日志到日志平台 service = BaseService() - service.setup_runtime_attrs(root_pipeline_id=root_id, id=node_id, version=flow_node.version_id) + service.setup_runtime_attrs(root_pipeline_id=root_id, id=flow_node.node_id, version=flow_node.version_id) # 限制最大重试次数 if retry_times > MAX_AUTO_RETRY_TIMES: @@ -99,14 +97,14 @@ def send_flow_state(state, _root_id, _node_id, _version_id): if retry_times == 1: send_flow_state(StateType.RUNNING, root_id, flow_node.node_id, flow_node.version_id) - retry_node.apply_async((root_id, node_id, retry_times + 1), countdown=RETRY_INTERVAL) + retry_node.apply_async((root_id, flow_node, retry_times + 1), countdown=RETRY_INTERVAL) return EngineAPIResult(result=False, message=_("存在执行互斥将自动进行重试...")) except (Ticket.DoesNotExist, ValueError): # 如果单据不存在,则忽略校验 pass # 进行重试操作 - result = BambooEngine(root_id=root_id).retry_node(node_id=node_id) + result = BambooEngine(root_id=root_id).retry_node(node_id=flow_node.node_id) if not result.result: raise RetryNodeException(str(result.exc.args)) diff --git a/dbm-ui/backend/db_services/taskflow/views/flow.py b/dbm-ui/backend/db_services/taskflow/views/flow.py index 1a85643c0d..166fb9c1ae 100644 --- a/dbm-ui/backend/db_services/taskflow/views/flow.py +++ b/dbm-ui/backend/db_services/taskflow/views/flow.py @@ -143,7 +143,7 @@ def retry_node(self, requests, *args, **kwargs): root_id = kwargs["root_id"] validated_data = self.params_validate(self.get_serializer_class()) - return Response(TaskFlowHandler(root_id=root_id).retry_node(node_id=validated_data["node_id"]).result) + return Response(TaskFlowHandler(root_id=root_id).retry_node(node=validated_data["node_id"]).result) @common_swagger_auto_schema( operation_summary=_("批量重试"),