Skip to content

Commit

Permalink
fix(backend): batch_retry_nodes批量重试节点性能优化 #7709
Browse files Browse the repository at this point in the history
# Reviewed, transaction id: 22559
  • Loading branch information
WytheLi authored and zhangzhw8 committed Nov 1, 2024
1 parent dd6def8 commit bd4e938
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
15 changes: 12 additions & 3 deletions dbm-ui/backend/db_services/taskflow/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,25 @@ def revoke_pipeline(self):

return result

def retry_node(self, node_id: str):
def retry_node(self, node: str):
"""重试节点"""
return task.retry_node(root_id=self.root_id, node_id=node_id, retry_times=1)
if isinstance(node, str):
flow_node = FlowNode.objects.get(root_id=self.root_id, node_id=node)
else:
flow_node = node
return task.retry_node(root_id=self.root_id, flow_node=flow_node, retry_times=1)

def batch_retry_nodes(self):
"""批量重试节点"""
node_ids = self.get_failed_node_ids()

flow_nodes = FlowNode.objects.filter(node_id__in=node_ids, root_id=self.root_id).all()
flow_node_dict = {node.node_id: node for node in flow_nodes}

for node_id in node_ids:
flow_node = flow_node_dict.get(node_id)
try:
self.retry_node(node_id)
self.retry_node(flow_node)
except Exception as err:
logger.error(f"{node_id} retry failed, {err}")

Expand Down
12 changes: 5 additions & 7 deletions dbm-ui/backend/db_services/taskflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,23 @@


@shared_task
def retry_node(root_id: str, node_id: str, retry_times: int) -> Union[EngineAPIResult, Any]:
def retry_node(root_id: str, flow_node: FlowNode, retry_times: int) -> Union[EngineAPIResult, Any]:
"""重试flow任务节点"""

def send_flow_state(state, _root_id, _node_id, _version_id):
post_set_state.send(
sender=None,
node_id=node_id,
node_id=flow_node.node_id,
to_state=state,
version=flow_node.version_id,
root_id=flow_node.root_id,
parent_id=None,
loop=None,
)

flow_node = FlowNode.objects.get(root_id=root_id, node_id=node_id)

# 实例化一个Service实例用于捕获日志到日志平台
service = BaseService()
service.setup_runtime_attrs(root_pipeline_id=root_id, id=node_id, version=flow_node.version_id)
service.setup_runtime_attrs(root_pipeline_id=root_id, id=flow_node.node_id, version=flow_node.version_id)

# 限制最大重试次数
if retry_times > MAX_AUTO_RETRY_TIMES:
Expand Down Expand Up @@ -99,14 +97,14 @@ def send_flow_state(state, _root_id, _node_id, _version_id):
if retry_times == 1:
send_flow_state(StateType.RUNNING, root_id, flow_node.node_id, flow_node.version_id)

retry_node.apply_async((root_id, node_id, retry_times + 1), countdown=RETRY_INTERVAL)
retry_node.apply_async((root_id, flow_node, retry_times + 1), countdown=RETRY_INTERVAL)
return EngineAPIResult(result=False, message=_("存在执行互斥将自动进行重试..."))
except (Ticket.DoesNotExist, ValueError):
# 如果单据不存在,则忽略校验
pass

# 进行重试操作
result = BambooEngine(root_id=root_id).retry_node(node_id=node_id)
result = BambooEngine(root_id=root_id).retry_node(node_id=flow_node.node_id)
if not result.result:
raise RetryNodeException(str(result.exc.args))

Expand Down
2 changes: 1 addition & 1 deletion dbm-ui/backend/db_services/taskflow/views/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def retry_node(self, requests, *args, **kwargs):

root_id = kwargs["root_id"]
validated_data = self.params_validate(self.get_serializer_class())
return Response(TaskFlowHandler(root_id=root_id).retry_node(node_id=validated_data["node_id"]).result)
return Response(TaskFlowHandler(root_id=root_id).retry_node(node=validated_data["node_id"]).result)

@common_swagger_auto_schema(
operation_summary=_("批量重试"),
Expand Down

0 comments on commit bd4e938

Please sign in to comment.