From efe6a88220421523a821c6b25f88e0b872c7e664 Mon Sep 17 00:00:00 2001 From: dcd <1151627903@qq.com> Date: Thu, 25 Apr 2024 14:53:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=20=E5=90=8E=E5=8F=B0=20Job=20=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=94=AF=E6=8C=81=E5=9C=A8=E9=9D=9E=E5=85=A8=E4=B8=9A?= =?UTF-8?q?=E5=8A=A1=E9=9B=86=E4=B8=8B=E6=89=A7=E8=A1=8C=20(closed=20#2158?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/backend/components/collections/base.py | 5 ++ apps/backend/components/collections/job.py | 47 ++++++++++++------ apps/backend/components/collections/plugin.py | 49 +++++++++++++------ apps/backend/subscription/tasks.py | 9 ++-- .../plugin/test_transfer_script.py | 27 ++++++++++ apps/core/gray/tools.py | 11 ++++- apps/node_man/models.py | 2 + 7 files changed, 115 insertions(+), 35 deletions(-) diff --git a/apps/backend/components/collections/base.py b/apps/backend/components/collections/base.py index 16c84da4a..808b2f128 100644 --- a/apps/backend/components/collections/base.py +++ b/apps/backend/components/collections/base.py @@ -363,6 +363,11 @@ def get_meta(data) -> Dict[str, Any]: meta["STEPS"] = [] return meta + @staticmethod + def get_job_meta(data) -> Dict[str, Any]: + meta: Dict[str, Any] = data.get_one_of_inputs("meta", {}) + return meta + @classmethod def get_common_data(cls, data): """ diff --git a/apps/backend/components/collections/job.py b/apps/backend/components/collections/job.py index eb1c787ae..9f2c740d3 100644 --- a/apps/backend/components/collections/job.py +++ b/apps/backend/components/collections/job.py @@ -101,11 +101,14 @@ def request_single_job_and_create_map( script_language = (constants.ScriptLanguageType.SHELL.value, constants.ScriptLanguageType.BAT.value)[ os_type == constants.OsType.WINDOWS ] + meta: Dict[str, Union[str, int]] = job_params.pop("meta") + scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID) + scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value) job_params.update( { - "bk_biz_id": settings.BLUEKING_BIZ_ID, - "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, - "bk_scope_id": settings.BLUEKING_BIZ_ID, + "bk_biz_id": scope_id, + "bk_scope_type": scope_type, + "bk_scope_id": scope_id, "script_language": script_language, "script_content": process_parms(job_params.get("script_content", "")), "script_param": process_parms(job_params.get("script_param", "")), @@ -188,17 +191,20 @@ def generate_api_params_log( job_params.pop("target_server", None) return json.dumps(job_params, indent=2) - def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap) -> List[int]: + def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap, meta: Dict[str, Any]) -> List[int]: """ 处理作业平台执行结果 :param job_sub_map: 作业平台ID映射 + :param meta: 注入实例的meta信息 :return: succeed_sub_inst_ids """ + scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID) + scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value) ip_results = JobApi.get_job_instance_status( { - "bk_biz_id": settings.BLUEKING_BIZ_ID, - "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, - "bk_scope_id": settings.BLUEKING_BIZ_ID, + "bk_biz_id": scope_id, + "bk_scope_type": scope_type, + "bk_scope_id": scope_id, "job_instance_id": job_sub_map.job_instance_id, "return_ip_result": True, } @@ -262,17 +268,20 @@ def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap) -> succeed_sub_inst_ids.append(sub_inst.id) return succeed_sub_inst_ids - def request_get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap): + def request_get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap, meta: Dict[str, Any]): """ 查询作业平台执行状态 :param job_sub_map: + :param meta: 注入实例的meta信息 :return: """ + scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID) + scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value) result = JobApi.get_job_instance_status( { - "bk_biz_id": settings.BLUEKING_BIZ_ID, - "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, - "bk_scope_id": settings.BLUEKING_BIZ_ID, + "bk_biz_id": scope_id, + "bk_scope_type": scope_type, + "bk_scope_id": scope_id, "job_instance_id": job_sub_map.job_instance_id, "return_ip_result": False, } @@ -290,7 +299,7 @@ def request_get_job_instance_status(self, job_sub_map: models.JobSubscriptionIns return # 其它都认为存在失败的情况,需要具体查作业平台的接口查IP详情 - self.handler_job_result(job_sub_map) + self.handler_job_result(job_sub_map, meta) job_sub_map.status = job_status job_sub_map.save() @@ -316,11 +325,12 @@ def skip_polling_result_by_os_types(self, os_types: Optional[List[str]] = None): ).update(status=constants.BkJobStatus.SUCCEEDED) def _schedule(self, data, parent_data, callback_data=None): + job_meta = self.get_job_meta(data) polling_time = data.get_one_of_outputs("polling_time") or 0 skip_polling_result = data.get_one_of_inputs("skip_polling_result", default=False) # 查询未完成的作业, 批量查询作业状态并更新DB multi_params = [ - {"job_sub_map": job_sub_map} + {"job_sub_map": job_sub_map, "meta": job_meta} for job_sub_map in models.JobSubscriptionInstanceMap.objects.filter( node_id=self.id, status=constants.BkJobStatus.PENDING ) @@ -346,7 +356,7 @@ def _schedule(self, data, parent_data, callback_data=None): node_id=self.id, status=constants.BkJobStatus.PENDING ) handler_job_result_params_list = [ - {"job_sub_map": pending_job_sub_map} for pending_job_sub_map in pending_job_sub_maps + {"job_sub_map": pending_job_sub_map, "meta": job_meta} for pending_job_sub_map in pending_job_sub_maps ] # 挽救策略,查询作业中已完成的节点,避免全部误判为超时失败 succeed_sub_inst_ids: Set[int] = set( @@ -427,7 +437,7 @@ def script_name(self): return "" def _execute(self, data, parent_data, common_data: CommonData): - + job_meta = self.get_job_meta(data) timeout = data.get_one_of_inputs("timeout") # 批量请求作业平台的参数 multi_job_params_map: Dict[str, Dict[str, Any]] = defaultdict(lambda: defaultdict(list)) @@ -450,6 +460,7 @@ def _execute(self, data, parent_data, common_data: CommonData): sub_inst=sub_inst, host_infos=target_servers, ) + multi_job_params_map[md5_key]["job_params"]["meta"] = job_meta else: multi_job_params_map[md5_key] = { "job_func": JobApi.fast_execute_script, @@ -461,6 +472,7 @@ def _execute(self, data, parent_data, common_data: CommonData): "script_param": script_param, "timeout": timeout, "os_type": self.get_job_param_os_type(host_obj), + "meta": job_meta, }, } @@ -495,6 +507,7 @@ def inputs_format(self): ] def _execute(self, data, parent_data, common_data: CommonData): + job_meta = self.get_job_meta(data) timeout = data.get_one_of_inputs("timeout") # 批量请求作业平台的参数 multi_job_params_map: Dict[str, Dict[str, Any]] = {} @@ -518,6 +531,7 @@ def _execute(self, data, parent_data, common_data: CommonData): host_infos=target_servers, sub_inst=sub_inst, ) + multi_job_params_map[md5_key]["job_params"]["meta"] = job_meta else: multi_job_params_map[md5_key] = { "job_func": JobApi.fast_transfer_file, @@ -529,6 +543,7 @@ def _execute(self, data, parent_data, common_data: CommonData): "file_source_list": [{"file_list": file_list}], "timeout": timeout, "os_type": self.get_job_param_os_type(host_obj), + "meta": job_meta, }, } @@ -591,6 +606,7 @@ def cal_job_unique_key(self, config_info_list: List[Dict[str, Any]], file_target return f"{'-'.join(sorted(config_unique_keys))}-{file_target_path}" def _execute(self, data, parent_data, common_data: CommonData): + job_meta = self.get_job_meta(data) timeout = data.get_one_of_inputs("timeout") # 批量请求作业平台的参数 multi_job_params_map: Dict[str, Dict[str, Any]] = {} @@ -609,6 +625,7 @@ def _execute(self, data, parent_data, common_data: CommonData): host_obj=host_obj, sub_inst=sub_inst, ) + multi_job_params_map[job_unique_key]["job_params"]["meta"] = job_meta else: file_source_list = [] for config_info in config_info_list: diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py index fe7fc979c..77353a944 100644 --- a/apps/backend/components/collections/plugin.py +++ b/apps/backend/components/collections/plugin.py @@ -526,6 +526,7 @@ class TransferPackageService(JobV3BaseService, PluginBaseService): """调用作业平台传输插件包""" def _execute(self, data, parent_data, common_data: PluginCommonData): + job_meta = self.get_job_meta(data) process_statuses = common_data.process_statuses group_id_instance_map = common_data.group_id_instance_map host_id_obj_map = common_data.host_id_obj_map @@ -567,6 +568,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): "file_source_list": [{"file_list": file_list}], "os_type": job["os_type"], "target_server": {"ip_list": job["ip_list"], "host_id_list": job["host_id_list"]}, + "meta": job_meta, }, } ) @@ -621,6 +623,7 @@ def need_skipped(self, process_status: models.ProcessStatus, common_data: Plugin return False def _execute(self, data, parent_data, common_data: PluginCommonData): + job_meta = self.get_job_meta(data) process_statuses = common_data.process_statuses timeout = data.get_one_of_inputs("timeout") group_id_instance_map = common_data.group_id_instance_map @@ -652,6 +655,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): {"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip} ) multi_job_params_map[key]["job_params"]["target_server"]["host_id_list"].append(host.bk_host_id) + multi_job_params_map[key]["job_params"]["meta"] = job_meta else: multi_job_params_map[key] = { "job_func": JobApi.fast_execute_script, @@ -666,6 +670,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): "script_param": script_param, "timeout": timeout, "os_type": host.os_type, + "meta": job_meta, }, } self.run_job_or_finish_schedule(multi_job_params_map) @@ -827,15 +832,17 @@ def allocate_port_to_process_status( subscription_instance: models.SubscriptionInstanceRecord, job_instance_id: int, step_instance_id: int, + meta: Dict[str, Any], ): """根据job返回日志分配端口""" bk_host_id = process_status.bk_host_id - + scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID) + scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value) # 查询并解析该主机已被占用的端口号 instance_log_base_params: Dict[str, Union[str, int]] = { - "bk_biz_id": settings.BLUEKING_BIZ_ID, - "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, - "bk_scope_id": settings.BLUEKING_BIZ_ID, + "bk_biz_id": scope_id, + "bk_scope_type": scope_type, + "bk_scope_id": scope_id, "job_instance_id": job_instance_id, "step_instance_id": step_instance_id, } @@ -870,18 +877,22 @@ def allocate_port_to_process_status( [subscription_instance.id], _("主机[{}]在ip->[{}]上无可用端口").format(host.inner_ip, listen_ip) ) - def get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap, common_data: PluginCommonData): + def get_job_instance_status( + self, job_sub_map: models.JobSubscriptionInstanceMap, common_data: PluginCommonData, meta: Dict[str, Any] + ): """查询作业平台执行状态""" bk_host_ids = common_data.bk_host_ids process_statuses = common_data.process_statuses group_id_instance_map = common_data.group_id_instance_map host_id_obj_map: Dict[int, models.Host] = models.Host.host_id_obj_map(bk_host_id__in=bk_host_ids) + scope_id = meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID) + scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value) result = JobApi.get_job_instance_status( { - "bk_biz_id": settings.BLUEKING_BIZ_ID, - "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, - "bk_scope_id": settings.BLUEKING_BIZ_ID, + "bk_biz_id": scope_id, + "bk_scope_type": scope_type, + "bk_scope_id": scope_id, "job_instance_id": job_sub_map.job_instance_id, "return_ip_result": True, } @@ -910,6 +921,7 @@ def get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap "subscription_instance": subscription_instance, "job_instance_id": job_sub_map.job_instance_id, "step_instance_id": step_instance_id, + "meta": meta, } ) request_multi_thread(self.allocate_port_to_process_status, multi_allocate_params) @@ -918,8 +930,9 @@ def get_job_instance_status(self, job_sub_map: models.JobSubscriptionInstanceMap def _schedule(self, data, parent_data, callback_data=None): # 查询未完成的作业, 批量查询作业状态并更新DB + job_meta = self.get_job_meta(data) multi_params = [ - {"job_sub_map": job_sub_map, "common_data": self.get_common_data(data)} + {"job_sub_map": job_sub_map, "common_data": self.get_common_data(data), "meta": job_meta} for job_sub_map in models.JobSubscriptionInstanceMap.objects.filter( node_id=self.id, status=constants.BkJobStatus.PENDING ) @@ -939,6 +952,7 @@ class RenderAndPushConfigService(PluginBaseService, JobV3BaseService): """ def _execute(self, data, parent_data, common_data: PluginCommonData): + job_meta = self.get_job_meta(data) subscription_step_id = data.get_one_of_inputs("subscription_step_id") process_statuses = common_data.process_statuses policy_step_adapter = common_data.policy_step_adapter @@ -993,6 +1007,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): host_obj=target_host, sub_inst=subscription_instance, ) + multi_job_params_map[key]["job_params"]["meta"] = job_meta else: multi_job_params_map[key] = { "job_func": JobApi.push_config_file, @@ -1011,6 +1026,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): }, "file_target_path": file_target_path, "file_list": [{"file_name": file_name, "content": process_parms(file_content)}], + "meta": job_meta, }, } @@ -1366,12 +1382,15 @@ def _schedule(self, data, parent_data, callback_data=None): job_sub_inst_map = models.JobSubscriptionInstanceMap.objects.filter(node_id=self.id).first() subscription_instance_id = job_sub_inst_map.subscription_instance_ids[0] job_instance_id = job_sub_inst_map.job_instance_id + job_meta = self.get_job_meta(data) + scope_id = job_meta.get("SCOPE_ID", settings.BLUEKING_BIZ_ID) + scope_type = job_meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value) result = JobApi.get_job_instance_status( { - "bk_biz_id": settings.BLUEKING_BIZ_ID, - "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, - "bk_scope_id": settings.BLUEKING_BIZ_ID, + "bk_biz_id": scope_id, + "bk_scope_type": scope_type, + "bk_scope_id": scope_id, "job_instance_id": job_instance_id, "return_ip_result": True, } @@ -1389,9 +1408,9 @@ def _schedule(self, data, parent_data, callback_data=None): instance_log_base_params: Dict[str, Union[str, int]] = { "job_instance_id": job_instance_id, - "bk_biz_id": settings.BLUEKING_BIZ_ID, - "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, - "bk_scope_id": settings.BLUEKING_BIZ_ID, + "bk_biz_id": scope_id, + "bk_scope_type": scope_type, + "bk_scope_id": scope_id, "bk_username": settings.BACKEND_JOB_OPERATOR, "step_instance_id": result["step_instance_list"][0]["step_instance_id"], } diff --git a/apps/backend/subscription/tasks.py b/apps/backend/subscription/tasks.py index fd4f8c905..590bbf652 100644 --- a/apps/backend/subscription/tasks.py +++ b/apps/backend/subscription/tasks.py @@ -31,7 +31,7 @@ from apps.node_man import tools as node_man_tools from apps.node_man.handlers.cmdb import CmdbHandler from apps.prometheus import metrics -from apps.utils import translation +from apps.utils import md5, translation from pipeline import builder from pipeline.builder import Data, NodeOutput, ServiceActivity, Var from pipeline.core.pipeline import Pipeline @@ -211,16 +211,19 @@ def create_pipeline( } sub_insts_gby_metadata: Dict[str, List[models.SubscriptionInstanceRecord]] = defaultdict(list) + md5_value__metadata = {} for instance_id, step_actions in instances_action.items(): if instance_id not in subscription_instance_map: continue sub_inst = subscription_instance_map[instance_id] # metadata 包含:meta-任务元数据、step_actions-操作步骤及类型 metadata = {"meta": sub_inst.instance_info["meta"], "step_actions": step_actions} + metadata_md5_value = md5.count_md5(metadata) + if metadata_md5_value not in md5_value__metadata: + md5_value__metadata[metadata_md5_value] = metadata # 聚合同 metadata 的任务 - sub_insts_gby_metadata[json.dumps(metadata)].append(sub_inst) + sub_insts_gby_metadata[json.dumps(md5_value__metadata[metadata_md5_value])].append(sub_inst) - # # # 把同类型操作进行聚合 # action_instances = defaultdict(list) # for instance_id, step_actions in instances_action.items(): diff --git a/apps/backend/tests/components/collections/plugin/test_transfer_script.py b/apps/backend/tests/components/collections/plugin/test_transfer_script.py index b4eea9191..6a9bea936 100644 --- a/apps/backend/tests/components/collections/plugin/test_transfer_script.py +++ b/apps/backend/tests/components/collections/plugin/test_transfer_script.py @@ -101,3 +101,30 @@ def test_component(self): "c:\\gse\\plugins\\bin", fast_transfer_file.call_args[0][0]["file_target_path"], ) + + +class TransferScriptBySingleBiz(TransferWindowsScriptTest): + def test_component(self): + models.GlobalSettings.set_config(key=models.GlobalSettings.KeyEnum.JOB_TASK_POLICY.value, value={"biz": [10]}) + with patch("apps.backend.components.collections.base.BaseService.get_job_meta") as get_job_meta: + get_job_meta.return_value = {"SCOPE_ID": 10, "SCOPE_TYPE": "biz"} + with patch( + "apps.backend.tests.components.collections.plugin.utils.JobMockClient.fast_transfer_file" + ) as fast_transfer_file: + fast_transfer_file.return_value = { + "job_instance_name": "API Quick Distribution File1521101427176", + "job_instance_id": JOB_INSTANCE_ID, + } + super().test_component() + self.assertEqual( + 10, + fast_transfer_file.call_args[0][0]["bk_biz_id"], + ) + self.assertEqual( + 10, + fast_transfer_file.call_args[0][0]["bk_scope_id"], + ) + self.assertEqual( + "biz", + fast_transfer_file.call_args[0][0]["bk_scope_type"], + ) diff --git a/apps/core/gray/tools.py b/apps/core/gray/tools.py index 729e1cc5e..7f084447a 100644 --- a/apps/core/gray/tools.py +++ b/apps/core/gray/tools.py @@ -83,6 +83,10 @@ def inject_meta_to_instances( bk_host_id__in=bk_host_ids ).values("bk_host_id", "ap_id") host_id__ap_id_map: typing.Dict[int, typing.Optional[int]] = {} + job_task_policy: typing.Dict[str, typing.List[int]] = node_man_models.GlobalSettings.get_config( + key=node_man_models.GlobalSettings.KeyEnum.JOB_TASK_POLICY.value, default={} + ) + biz_ids_list: typing.List[int] = job_task_policy.get("biz", []) for host_info in host_infos: host_id__ap_id_map[host_info["bk_host_id"]] = host_info["ap_id"] @@ -95,9 +99,12 @@ def inject_meta_to_instances( if host_info.get("is_need_inject_ap_id"): # 双如果为安装额外Agent 将ap_id 注入 meta meta["AP_ID"] = ap_id - + bk_biz_id = host_info.get("bk_biz_id") + if bk_biz_id in biz_ids_list: + meta["SCOPE_ID"] = bk_biz_id + meta["SCOPE_TYPE"] = node_man_constants.BkJobScopeType.BIZ.value gse_version: str = self.get_host_ap_gse_version( - bk_biz_id=host_info.get("bk_biz_id"), + bk_biz_id=bk_biz_id, ap_id=ap_id, is_install_other_agent=host_info.get("is_need_inject_ap_id"), ) diff --git a/apps/node_man/models.py b/apps/node_man/models.py index f579639a2..b15db4075 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -156,6 +156,8 @@ class KeyEnum(Enum): DISABLE_STOPPED_PLUGIN = "DISABLE_STOPPED_PLUGIN" # 根据订阅分配任务队列 SUBSCRIPTION_UPDATE_TASK_QUEUE = "SUBSCRIPTION_UPDATE_TASK_QUEUE" + # JOB任务策略 + JOB_TASK_POLICY = "JOB_TASK_POLICY" key = models.CharField(_("键"), max_length=255, db_index=True, primary_key=True) v_json = JSONField(_("值"))