Skip to content

Commit

Permalink
feat: sops 回调功能同步代码 --story=120883063
Browse files Browse the repository at this point in the history
  • Loading branch information
guohelu committed Nov 25, 2024
1 parent aabe47d commit f8e9f04
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 5 deletions.
9 changes: 8 additions & 1 deletion gcloud/apigw/views/create_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def create_task(request, template_id, project_id):
"code": err_code.REQUEST_PARAM_INVALID.code,
"message": f"callback_url format error, must match {CALLBACK_URL_PATTERN}",
}
callback_version = params.get("callback_version", None)

# 兼容老版本的接口调用
if template_source in NON_COMMON_TEMPLATE_TYPES:
Expand Down Expand Up @@ -214,7 +215,13 @@ def create_task(request, template_id, project_id):

# create callback url record
if callback_url:
TaskCallBackRecord.objects.create(task_id=task.id, url=callback_url)
record_kwargs = {
"task_id": task.id,
"url": callback_url,
}
if callback_version:
record_kwargs["extra_info"] = json.dumps({"callback_version": callback_version})
TaskCallBackRecord.objects.create(**record_kwargs)

# crete auto retry strategy
arn_creator = AutoRetryNodeStrategyCreator(taskflow_id=task.id, root_pipeline_id=task.pipeline_instance.instance_id)
Expand Down
9 changes: 7 additions & 2 deletions gcloud/taskflow3/domains/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class TaskCallBacker:
def __init__(self, task_id, *args, **kwargs):
self.task_id = task_id
self.record = TaskCallBackRecord.objects.filter(task_id=self.task_id).first()
self.extra_info = {"task_id": self.task_id, **json.loads(self.record.extra_info), **kwargs}
self.record_extra_info = json.loads(self.record.extra_info)
self.extra_info = {"task_id": self.task_id, **self.record_extra_info, **kwargs}

def check_record_existence(self):
return True if self.record else False
Expand Down Expand Up @@ -96,9 +97,13 @@ def _url_callback(self):
logger.error(f"[TaskCallBacker _url_callback] get lock error: {err}")
return None
url = self.record.url
callback_version = self.record_extra_info.get("callback_version")
response = None
try:
response = requests.post(url, data=self.extra_info)
if callback_version == TaskCallBackRecord.CALLBACK_VERSION_V2:
response = requests.post(url, json=self.extra_info)
else:
response = requests.post(url, data=self.extra_info)
response.raise_for_status()
except HTTPError as e:
message = (
Expand Down
2 changes: 2 additions & 0 deletions gcloud/taskflow3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,8 @@ class Meta:


class TaskCallBackRecord(models.Model):
CALLBACK_VERSION_V2 = "v2"

id = models.BigAutoField(verbose_name="ID", primary_key=True)
task_id = models.BigIntegerField(verbose_name=_("任务ID"), db_index=True)
url = models.TextField(verbose_name=_("回调地址"))
Expand Down
3 changes: 1 addition & 2 deletions gcloud/taskflow3/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"""

import datetime
import json
import logging

from bamboo_engine import states as bamboo_engine_states
Expand Down Expand Up @@ -91,7 +90,7 @@ def _check_and_callback(taskflow_id, *args, **kwargs):
try:
if kwargs.get("task"):
task = kwargs.pop("task")
kwargs["task_outputs"] = json.dumps(task.get_task_detail()["outputs"])
kwargs["task_outputs"] = task.get_task_detail()["outputs"]
task_callback.apply_async(
kwargs=dict(task_id=taskflow_id, **kwargs),
queue="task_callback",
Expand Down

0 comments on commit f8e9f04

Please sign in to comment.