Skip to content

Commit

Permalink
feat: 订阅接口支持指定用户操作进程 (closed TencentBlueKing#2297)
Browse files Browse the repository at this point in the history
  • Loading branch information
Huayeaaa committed Aug 12, 2024
1 parent d7247a3 commit 2182629
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 10 deletions.
4 changes: 3 additions & 1 deletion apps/backend/components/collections/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,8 @@ def request_gse_or_finish_schedule(self, proc_operate_req: List, data, common_da
def _execute(self, data, parent_data, common_data: PluginCommonData):
op_type = data.get_one_of_inputs("op_type")
gse_version = data.get_one_of_inputs("meta", {}).get("GSE_VERSION")
host_account = data.get_one_of_inputs("meta", {}).get("host_account")
system_account = data.get_one_of_inputs("meta", {}).get("system_account")
policy_step_adapter = common_data.policy_step_adapter
process_statuses = common_data.process_statuses
plugin = policy_step_adapter.plugin_desc
Expand Down Expand Up @@ -1204,7 +1206,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
"proc_name": package_control.process_name or plugin.name,
"setup_path": process_status.setup_path,
"pid_path": process_status.pid_path,
"user": constants.ACCOUNT_MAP.get(host.os_type, "root"),
"user": host_account or system_account or constants.ACCOUNT_MAP.get(host.os_type, "root"),
},
"control": gse_control,
"resource": host_id__resource_policy_map[bk_host_id]["resource"],
Expand Down
17 changes: 17 additions & 0 deletions apps/backend/subscription/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ def validate(self, attrs):
raise ValidationError("目前机器参数必须要有 bk_host_id 或者 (ip/bk_host_innerip + bk_cloud_id)")


class OperateInfoSerializer(serializers.Serializer):
bk_host_id = serializers.IntegerField(required=False, label="主机ID")
operate_user = serializers.CharField(required=False, label="操作用户")
system_account = serializers.DictField(required=False, label="系统对应账户")

def validate(self, attrs):
if attrs.get("bk_host_id") and attrs.get("system_account"):
raise ValidationError(_("仅支持一种方式:实例 or 操作系统 指定操作用户"))
if attrs.get("system_account"):
for key in attrs["system_account"]:
if key not in constants.OS_TUPLE:
raise ValidationError(_(f"操作系统类型只能为{constants.OS_TUPLE}"))
return attrs


class CreateSubscriptionSerializer(GatewaySerializer):
class CreateStepSerializer(serializers.Serializer):
id = serializers.CharField(label="步骤标识符", validators=[])
Expand All @@ -83,6 +98,7 @@ class CreateStepSerializer(serializers.Serializer):
target_hosts = TargetHostSerializer(many=True, label="下发的目标机器列表", required=False, allow_empty=False)
run_immediately = serializers.BooleanField(required=False, default=False, label="是否立即执行")
is_main = serializers.BooleanField(required=False, default=False, label="是否为主配置")
operate_info = serializers.ListField(required=False, child=OperateInfoSerializer(), label="操作信息")

# 策略新参数
plugin_name = serializers.CharField(required=False, label="插件名")
Expand Down Expand Up @@ -147,6 +163,7 @@ class UpdateStepSerializer(serializers.Serializer):
scope = UpdateScopeSerializer()
steps = serializers.ListField(child=UpdateStepSerializer())
run_immediately = serializers.BooleanField(required=False, default=False)
operate_info = serializers.ListField(required=False, child=OperateInfoSerializer(), label="操作信息")

# 策略新参数
plugin_name = serializers.CharField(required=False)
Expand Down
19 changes: 16 additions & 3 deletions apps/backend/subscription/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,12 @@ def create_task(
instances: Dict[str, Dict[str, Union[Dict, Any]]],
instance_actions: Dict[str, Dict[str, str]],
preview_only: bool = False,
operate_info: List[Dict[str, Union[Dict, Any]]] = None,
):
"""
创建执行任务
:param preview_only: 是否仅预览
:param operate_info: 操作信息
:param subscription: Subscription
:param subscription_task: SubscriptionTask
:param instances: dict
Expand All @@ -337,8 +339,12 @@ def create_task(
}
:return: SubscriptionTask
"""
if not operate_info:
subscription_id = subscription.id
operate_info = tools.get_operate_info(subscription_id)

# 兜底注入 Meta,此处注入是覆盖面最全的(包含历史被移除实例)
GrayTools().inject_meta_to_instances(instances)
GrayTools().inject_meta_to_instances(instances, operate_info=operate_info)
logger.info(
"[sub_lifecycle<sub(%s), task(%s)>][create_task] inject meta to instances[num=%s] successfully",
subscription.id,
Expand Down Expand Up @@ -571,10 +577,12 @@ def run_subscription_task_and_create_instance(
scope: Optional[Dict] = None,
actions: Optional[Dict] = None,
preview_only: bool = False,
operate_info: List[Dict[str, Union[Dict, Any]]] = None,
):
"""
自动检查实例及配置的变更,执行相应动作
:param preview_only: 是否仅预览,若为true则不做任何保存或执行动作
:param operate_info:操作信息
:param subscription: Subscription
:param subscription_task: SubscriptionTask
:param scope
Expand Down Expand Up @@ -629,7 +637,7 @@ def run_subscription_task_and_create_instance(
return

# 预注入 Meta,用于变更计算(仅覆盖当前订阅范围,移除场景通过 create_task 兜底注入)
GrayTools().inject_meta_to_instances(instances)
GrayTools().inject_meta_to_instances(instances, operate_info=operate_info)
logger.info(
"[sub_lifecycle<sub(%s), task(%s)>][run_subscription_task_and_create_instance] "
"pre-inject meta to instances[num=%s] successfully",
Expand Down Expand Up @@ -768,7 +776,12 @@ def run_subscription_task_and_create_instance(
instances.update(deleted_instance_info)

create_task_result = create_task(
subscription, subscription_task, instances, instance_actions, preview_only=preview_only
subscription,
subscription_task,
instances,
instance_actions,
preview_only=preview_only,
operate_info=operate_info,
)

return {
Expand Down
57 changes: 56 additions & 1 deletion apps/backend/subscription/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import Any, Dict, List, Union

from django.conf import settings
from django.db.models import Q
from django.db.models import Q, QuerySet, Value
from django.utils import timezone

from apps.backend.components.collections import core
Expand Down Expand Up @@ -1641,3 +1641,58 @@ def by_biz_dispatch_task_queue(biz_ids_gby_queue: Dict[str, List[int]], bk_biz_i
return task_queue

return default_task_queue


def get_operate_info(subscription_id: int) -> List[Dict[str, Any]]:
"""
:param subscription_id: 订阅ID
return: 操作信息
"""
from apps.backend.subscription import handler

# 第一次注入且订阅实例记录没有订阅id对应的记录
if not models.SubscriptionInstanceRecord.objects.filter(subscription_id=subscription_id).exists():
operate_info = None
elif models.SubscriptionInstanceRecord.objects.filter(subscription_id=subscription_id, is_latest=Value(1)).exists():
latest_instance_records = models.SubscriptionInstanceRecord.objects.filter(
subscription_id=subscription_id, is_latest=Value(1)
)
operate_info = get_operate_info_from_instance_record(instance_records=latest_instance_records)
else:
instance_records = models.SubscriptionInstanceRecord.objects.filter(subscription_id=subscription_id)
max_instance_record_ids: List[int] = handler.SubscriptionTools.fetch_latest_record_ids_in_same_inst_id(
instance_record_qs=instance_records
)
max_inst_records: QuerySet = models.SubscriptionInstanceRecord.objects.filter(id__in=max_instance_record_ids)
operate_info = get_operate_info_from_instance_record(instance_records=max_inst_records)
return operate_info


def get_operate_info_from_instance_record(instance_records: QuerySet) -> List[Dict[str, Any]]:
"""
:param instance_records: 订阅实例记录
return: 操作信息
"""
instance_records: QuerySet = instance_records.values("instance_info")
instance_records_info: List[Dict[str, Any]] = [
instance_record["instance_info"] for instance_record in instance_records
]
host_instance_infos: Dict[int, str] = {
instance_info["host"]["bk_host_id"]: instance_info["meta"]["host_account"]
for instance_info in instance_records_info
if instance_info["host"].get("bk_host_id") and instance_info["meta"].get("host_account")
}
system_instance_infos: Dict[str, str] = {
instance_info["host"]["bk_os_type"]: instance_info["meta"]["system_account"]
for instance_info in instance_records_info
if instance_info["host"].get("bk_os_type") and instance_info["meta"].get("system_account")
}
if host_instance_infos:
operate_info = [{"bk_host_id": key, "operate_user": value} for key, value in host_instance_infos.items()]
elif system_instance_infos:
operate_info = [{"system_account": {}}]
for key, value in system_instance_infos.items():
operate_info[0]["system_account"][constants.OS_TYPE.get(key)] = value
else:
operate_info = None
return operate_info
6 changes: 4 additions & 2 deletions apps/backend/subscription/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def create_subscription(self, request):
params = self.validated_data
scope = params["scope"]
run_immediately = params["run_immediately"]
operate_info = params.get("operate_info")

category = params.get("category")
enable = params.get("enable") or False
Expand Down Expand Up @@ -121,7 +122,7 @@ def create_subscription(self, request):
subscription_id=subscription.id, scope=subscription.scope, actions={}
)
tasks.run_subscription_task_and_create_instance.delay(
subscription, subscription_task, language=get_language()
subscription, subscription_task, operate_info=operate_info, language=get_language()
)
result["task_id"] = subscription_task.id

Expand Down Expand Up @@ -189,6 +190,7 @@ def update_subscription(self, request):
params = self.validated_data
scope = params["scope"]
run_immediately = params["run_immediately"]
operate_info = params.get("operate_info")
with transaction.atomic():
try:
subscription = models.Subscription.objects.get(id=params["subscription_id"], is_deleted=False)
Expand Down Expand Up @@ -278,7 +280,7 @@ def update_subscription(self, request):
subscription_id=subscription.id, scope=subscription.scope, actions={}
)
tasks.run_subscription_task_and_create_instance.delay(
subscription, subscription_task, language=get_language()
subscription, subscription_task, operate_info=operate_info, language=get_language()
)
result["task_id"] = subscription_task.id

Expand Down
21 changes: 19 additions & 2 deletions apps/core/gray/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ def get_host_ap_gse_version(self, bk_biz_id: typing.Any, ap_id: int, is_install_
return gse_version

def inject_meta_to_instances(
self, instances: typing.Dict[str, typing.Dict[str, typing.Union[typing.Dict, typing.Any]]]
self,
instances: typing.Dict[str, typing.Dict[str, typing.Union[typing.Dict, typing.Any]]],
operate_info: typing.List[typing.Dict[str, typing.Union[typing.Dict, typing.Any]]] = None,
):
"""
在 instances 中注入 Meta 信息
:param instances:
:param instances:实例信息
:param operate_info:操作信息
:return:
"""
bk_host_ids: typing.Set[int] = {
Expand All @@ -87,6 +90,13 @@ def inject_meta_to_instances(
key=node_man_models.GlobalSettings.KeyEnum.JOB_TASK_POLICY.value, default={}
)
biz_ids_list: typing.List[int] = job_task_policy.get(node_man_constants.BkJobScopeType.BIZ.value, [])
# 主机ID与操作用户映射
host_id__user = {}
# 系统与操作用户映射
system__account = {}
if operate_info:
host_id__user: typing.Dict[int, str] = {op.get("bk_host_id"): op.get("operate_user") for op in operate_info}
system__account: typing.Dict[str, str] = operate_info[0].get("system_account", {})

for host_info in host_infos:
host_id__ap_id_map[host_info["bk_host_id"]] = host_info["ap_id"]
Expand All @@ -103,6 +113,13 @@ def inject_meta_to_instances(
if bk_biz_id in biz_ids_list:
meta["SCOPE_ID"] = bk_biz_id
meta["SCOPE_TYPE"] = node_man_constants.BkJobScopeType.BIZ.value
bk_host_id = host_info.get("bk_host_id")
bk_os_type = host_info.get("bk_os_type")
os_type = node_man_constants.OS_TYPE.get(bk_os_type)
if bk_host_id and host_id__user.get(bk_host_id):
meta["host_account"] = host_id__user[bk_host_id]
if os_type and system__account.get(os_type):
meta["system_account"] = system__account[os_type]
gse_version: str = self.get_host_ap_gse_version(
bk_biz_id=bk_biz_id,
ap_id=ap_id,
Expand Down
4 changes: 3 additions & 1 deletion apps/node_man/handlers/plugin_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def history(query_params: Dict):
return packages

@staticmethod
def operate(job_type: str, plugin_name: str, scope: Dict, steps: List[Dict]):
def operate(job_type: str, plugin_name: str, scope: Dict, steps: List[Dict], operate_info: List[Dict] = None):
bk_biz_scope = list(set([node["bk_biz_id"] for node in scope["nodes"]]))

CmdbHandler().check_biz_permission(bk_biz_scope, IamActionType.plugin_operate)
Expand All @@ -166,6 +166,8 @@ def operate(job_type: str, plugin_name: str, scope: Dict, steps: List[Dict]):
"scope": scope,
"bk_biz_scope": bk_biz_scope,
}
if operate_info:
base_create_kwargs["operate_info"] = operate_info

if job_type == constants.JobType.MAIN_INSTALL_PLUGIN:
create_data = {**base_create_kwargs, "steps": steps}
Expand Down
6 changes: 6 additions & 0 deletions apps/node_man/serializers/plugin_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,19 @@ def validate(self, data):
return data


class OperateInfoSerializer(serializers.Serializer):
bk_host_id = serializers.IntegerField(label=_("主机ID"), required=False)
operate_user = serializers.CharField(label=_("操作用户"), required=False)


class PluginOperateSerializer(serializers.Serializer):
job_type = serializers.ChoiceField(label=_("任务类型"), choices=list(constants.PLUGIN_JOB_TUPLE))
plugin_name = serializers.CharField(label=_("插件名称"))
# 一次性任务范围
scope = base.ScopeSerializer()
# 参数配置
steps = serializers.ListField(child=base.StepSerializer(), required=False, default=[])
operate_info = serializers.ListField(label=_("操作信息"), child=OperateInfoSerializer(), required=False)

def validate(self, data):
if models.GsePluginDesc.objects.filter(name=data["plugin_name"]).first() is None:
Expand Down
1 change: 1 addition & 0 deletions apps/node_man/views/plugin_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ def operate(self, request):
plugin_name=params["plugin_name"],
scope=params["scope"],
steps=params.get("steps"),
operate_info=params.get("operate_info"),
)
)

Expand Down

0 comments on commit 2182629

Please sign in to comment.