Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get_task_status apigw 接口支持返回节点待自动重试信息 #7107

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ repos:
hooks:
- id: check-merge-conflict
- repo: https://github.com/psf/black
rev: 22.3.0
rev: stable
hooks:
- id: black
name: black
language: python
language_version: python3.6
- repo: https://github.com/pycqa/isort
rev: 5.11.5
rev: 5.6.4
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]
Expand Down
Binary file modified gcloud/apigw/docs/apigw-docs.tgz
Binary file not shown.
46 changes: 37 additions & 9 deletions gcloud/apigw/views/get_task_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,20 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from apigw_manager.apigw.decorators import apigw_require
from blueapps.account.decorators import login_exempt
from cachetools import TTLCache
from django.views.decorators.http import require_GET

from blueapps.account.decorators import login_exempt
from gcloud.apigw.utils import bucket_cached, BucketTTLCache, api_bucket_and_key

from gcloud import err_code
from gcloud.apigw.decorators import mark_request_whether_is_trust, return_json_response
from gcloud.apigw.decorators import project_inject
from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher
from gcloud.taskflow3.utils import add_node_name_to_status_tree
from gcloud.apigw.decorators import mark_request_whether_is_trust, project_inject, return_json_response
from gcloud.apigw.utils import BucketTTLCache, api_bucket_and_key, bucket_cached
from gcloud.apigw.views.utils import logger
from gcloud.iam_auth.intercept import iam_intercept
from gcloud.iam_auth.view_interceptors.apigw import TaskViewInterceptor
from apigw_manager.apigw.decorators import apigw_require
from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher
from gcloud.taskflow3.models import TaskFlowInstance
from gcloud.taskflow3.utils import add_node_name_to_status_tree, extract_failed_nodes, get_failed_nodes_info


def cache_decisioner(key, value):
Expand Down Expand Up @@ -53,6 +51,8 @@ def get_task_status(request, task_id, project_id):
project = request.project
subprocess_id = request.GET.get("subprocess_id")
with_ex_data = request.GET.get("with_ex_data")
with_failed_node_info = request.GET.get("with_failed_node_info")
with_auto_retry_status = request.GET.get("with_auto_retry_status")

try:
task = TaskFlowInstance.objects.get(pk=task_id, project_id=project.id, is_deleted=False)
Expand Down Expand Up @@ -87,6 +87,34 @@ def get_task_status(request, task_id, project_id):
"message": message,
"code": err_code.UNKNOWN_ERROR.code,
}

if with_failed_node_info or with_auto_retry_status:
try:
status_tree, root_pipeline_id = result["data"], result["data"]["id"]
failed_node_ids = extract_failed_nodes(status_tree)
failed_node_info = get_failed_nodes_info(root_pipeline_id, failed_node_ids)
if with_failed_node_info:
result["data"]["failed_node_info"] = failed_node_info
if with_auto_retry_status:
auto_retry_waiting_nodes = [
node_id
for node_id, failed_info in failed_node_info.items()
if "max_auto_retry_times" in failed_info
and failed_info["auto_retry_times"] < failed_info["max_auto_retry_times"]
]
result["data"]["auto_retry_status"] = {
"exist_auto_retry_nodes": True if len(auto_retry_waiting_nodes) else False,
"auto_retry_nodes": auto_retry_waiting_nodes,
}
except Exception as e:
message = "task[id={task_id}] extract failed node info error: {error}".format(task_id=task_id, error=e)
logger.exception(message)
return {
"result": False,
"message": message,
"code": err_code.UNKNOWN_ERROR.code,
}

result["data"]["name"] = task.name

return result
27 changes: 27 additions & 0 deletions gcloud/taskflow3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import logging

from django.apps import apps
from pipeline.engine import states as pipeline_states
from pipeline.engine.utils import calculate_elapsed_time
from pipeline.core import constants as pipeline_constants
Expand Down Expand Up @@ -89,6 +90,32 @@ def add_node_name_to_status_tree(pipeline_tree, status_tree_children):
add_node_name_to_status_tree(pipeline_tree.get("activities", {}).get(node_id, {}).get("pipeline", {}), children)


def extract_failed_nodes(status_tree):
FAILED_STATE = "FAILED"
failed_nodes = []
for node_id, status in status_tree["children"].items():
if status["state"] == FAILED_STATE:
failed_nodes.append(node_id)
failed_nodes += extract_failed_nodes(status)
return failed_nodes


def get_failed_nodes_info(root_pipeline_id, failed_node_ids):
info = {failed_node_id: {} for failed_node_id in failed_node_ids}

# 获取失败节点自动重试数据
AutoRetryNodeStrategy = apps.get_model("taskflow3", "AutoRetryNodeStrategy")
strategy_info = AutoRetryNodeStrategy.objects.filter(
root_pipeline_id=root_pipeline_id, node_id__in=failed_node_ids
).values("node_id", "retry_times", "max_retry_times")
for strategy in strategy_info:
info[strategy["node_id"]].update(
{"auto_retry_times": strategy["retry_times"], "max_auto_retry_times": strategy["max_retry_times"]}
)

return info


def parse_node_timeout_configs(pipeline_tree: dict) -> list:
configs = []
for act_id, act in pipeline_tree[pipeline_constants.PE.activities].items():
Expand Down
44 changes: 42 additions & 2 deletions gcloud/tests/taskflow3/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,22 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from django.test import TestCase

from gcloud.taskflow3.utils import parse_node_timeout_configs
from gcloud.taskflow3.models import AutoRetryNodeStrategy
from gcloud.taskflow3.utils import parse_node_timeout_configs, extract_failed_nodes, get_failed_nodes_info


class UtilsTestCase(TestCase):
def setUp(self):
self.arn_instance = AutoRetryNodeStrategy.objects.create(
taskflow_id=1, root_pipeline_id="root_pipeline_id", node_id="act_1", max_retry_times=5
)
self.arn_instance.save()

def tearDown(self):
self.arn_instance.delete()

def test_parse_node_timeout_configs_success(self):
pipeline_tree = {
"activities": {
Expand Down Expand Up @@ -57,3 +66,34 @@ def test_parse_node_timeout_configs_fail_and_ignore(self):
parse_result = parse_node_timeout_configs(pipeline_tree)
self.assertEqual(parse_result["result"], True)
self.assertEqual(parse_result["data"], parse_configs)

def test_extract_failed_nodes(self):
status_tree = {
"id": "root_pipeline_id",
"children": {
"act_1": {"id": "act_1", "state": "FINISHED", "children": {}},
"act_2": {
"id": "act_2",
"state": "FAILED",
"children": {
"act_2_1": {"id": "act_2_1", "state": "FINISHED", "children": {}},
"act_2_2": {"id": "act_2_2", "state": "FAILED", "children": {}},
},
},
"act_3": {"id": "act_3", "state": "FINISHED", "children": {}},
},
"state": "FAILED",
}
failed_nodes = extract_failed_nodes(status_tree)
self.assertEqual(failed_nodes, ["act_2", "act_2_2"])

def test_get_failed_nodes_info(self):
FAILED_NODES_INFO = {
"act_1": {
"auto_retry_times": self.arn_instance.retry_times,
"max_auto_retry_times": self.arn_instance.max_retry_times,
},
"act_2": {},
}
failed_nodes_info = get_failed_nodes_info("root_pipeline_id", ["act_1", "act_2"])
self.assertEqual(failed_nodes_info, FAILED_NODES_INFO)
Loading