Skip to content

Commit

Permalink
fix: 优化job api调用时支持多IP日志捕捉场景 #1271
Browse files Browse the repository at this point in the history
  • Loading branch information
yksitu authored and zhangzhw8 committed Oct 8, 2023
1 parent cc0b0d6 commit 9f88c68
Showing 1 changed file with 35 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,27 @@ def _schedule(self, data, parent_data, callback_data=None) -> bool:
self.log_info(_("[{}] 任务正在执行🤔").format(node_name))
return True

# 获取job的状态
job_status = resp["data"]["job_instance"]["status"]

# 默认dbm调用job是一个步骤,所以统一获取第一个步骤id
step_instance_id = resp["data"]["step_instance_list"][0]["step_instance_id"]
ip_dict = {"bk_cloud_id": kwargs["bk_cloud_id"], "ip": exec_ips[0]} if exec_ips else {}

# 获取本次执行的所有ip信息
# ip_dict = {"bk_cloud_id": kwargs["bk_cloud_id"], "ip": exec_ips[0]} if exec_ips else {}
ip_dicts = [{"bk_cloud_id": kwargs["bk_cloud_id"], "ip": ip} for ip in exec_ips] if exec_ips else []

# 判断本次job任务是否异常
if job_status not in SUCCESS_LIST:
self.log_info("{} job status: {}".format(node_name, resp))
self.log_info(_("[{}] 任务调度失败😱").format(node_name))

# 转载job脚本节点报错日志
if ip_dict:
resp = self.__log__(job_instance_id, step_instance_id, ip_dict)
if resp.get("result"):
self.log_error(resp["data"]["log_content"])
# 转载job脚本节点报错日志,兼容多IP执行场景的日志输出
if ip_dicts:
for ip_dict in ip_dicts:
resp = self.__log__(job_instance_id, step_instance_id, ip_dict)
if resp.get("result"):
self.log_error(f"{ip_dict}:{resp['data']['log_content']}")

self.finish_schedule()
return False
Expand All @@ -273,19 +281,27 @@ def _schedule(self, data, parent_data, callback_data=None) -> bool:
self.finish_schedule()
return True

# 写入上下文,目前如果传入的ip_list只支持一组ip,多组ip会存在问题,因为这里只拿第一个执行ip来拼接结果到对应的上下文变量
self.log_info(_("[{}]该节点需要获取执行后日志,赋值到trans_data").format(node_name))
self.log_info(exec_ips)
if not self.__get_target_ip_context(
job_instance_id=job_instance_id,
step_instance_id=step_instance_id,
ip_dict=ip_dict,
data=data,
trans_data=trans_data,
write_payload_var=write_payload_var,
write_op=kwargs.get("write_op", WriteContextOpType.REWRITE.value),
):
self.log_info(_("[{}] 获取执行后日志失败,获取ip[{}]").format(node_name, exec_ips[0]))
# 写入上下文,支持多IP传入上下文捕捉场景
# 写入上下文的位置是trans_data.{write_payload_var} 属性上,分别执行覆盖写入和追加写入
# 覆盖写入是会直接赋值给上下文属性上,不管之前有什么值,这是默认写入 WriteContextOpType.REWRITE
# 追加写入是特殊行为,如果想IP日志结果都写入,可以选择追加写入,上下文变成list,每个元素是{"ip":"log"} WriteContextOpType.APPEND
self.log_info(_("[{}]该节点需要获取执行后日志,赋值到流程上下文").format(node_name))

is_false = False
for ip_dict in ip_dicts:
if not self.__get_target_ip_context(
job_instance_id=job_instance_id,
step_instance_id=step_instance_id,
ip_dict=ip_dict,
data=data,
trans_data=trans_data,
write_payload_var=write_payload_var,
write_op=kwargs.get("write_op", WriteContextOpType.REWRITE.value),
):
self.log_error(_("[{}] 获取执行后写入流程上下文失败,ip:[{}]").format(node_name, ip_dict["ip"]))
is_false = True

if is_false:
self.finish_schedule()
return False

Expand Down

0 comments on commit 9f88c68

Please sign in to comment.