Skip to content

Commit

Permalink
fix: 同步主机差量逻辑修复 (closed TencentBlueKing#2167)
Browse files Browse the repository at this point in the history
  • Loading branch information
Huayeaaa committed May 27, 2024
1 parent dd3f842 commit 3666791
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 6 deletions.
6 changes: 6 additions & 0 deletions apps/node_man/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class TimeUnit:

SYNC_CMDB_BIZ_TOPO_TASK_INTERVAL = 1 * TimeUnit.DAY
SYNC_CMDB_HOST_INTERVAL = 1 * TimeUnit.DAY
CLEAR_NEED_DELETE_HOST_IDS_INTERVAL = 1 * TimeUnit.MINUTE

########################################################################################################
# 第三方系统相关配置
Expand Down Expand Up @@ -588,6 +589,11 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]:
LIST_SERVICE_INSTANCE_DETAIL_LIMIT = 1000
LIST_SERVICE_INSTANCE_DETAIL_INTERVAL = 0.2

# redis键名模板
REDIS_NEED_DELETE_HOST_IDS_KEY_TPL = f"{settings.APP_CODE}:node_man:need_delete_host_ids:list"
# 从redis中读取bk_host_ids最大长度
MAX_HOST_IDS_LENGTH = 5000


class ProxyFileFromType(Enum):
"""Proxy上传文件名称来源"""
Expand Down
104 changes: 100 additions & 4 deletions apps/node_man/periodic_tasks/sync_cmdb_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from django.db import transaction

from apps.backend.celery import app
from apps.backend.utils.redis import REDIS_INST
from apps.component.esbclient import client_v2
from apps.core.concurrent import controller
from apps.core.gray.tools import GrayTools
from apps.exceptions import ComponentCallError
from apps.node_man import constants, models, tools
Expand All @@ -28,7 +30,7 @@
query_bk_biz_ids,
)
from apps.utils.batch_request import batch_request
from apps.utils.concurrent import batch_call
from apps.utils.concurrent import batch_call, batch_call_serial
from common.log import logger


Expand Down Expand Up @@ -434,6 +436,8 @@ def sync_cmdb_host(bk_biz_id=None, task_id=None):
"""
同步cmdb主机
"""
from apps.backend.views import LPUSH_AND_EXPIRE_FUNC

logger.info(f"[sync_cmdb_host] start: task_id -> {task_id}, bk_biz_id -> {bk_biz_id}")

ap_map_config: SyncHostApMapConfig = get_sync_host_ap_map_config()
Expand Down Expand Up @@ -466,10 +470,89 @@ def sync_cmdb_host(bk_biz_id=None, task_id=None):
if need_delete_host_ids:
models.Host.objects.filter(bk_host_id__in=need_delete_host_ids).delete()
models.IdentityData.objects.filter(bk_host_id__in=need_delete_host_ids).delete()
models.ProcessStatus.objects.filter(bk_host_id__in=need_delete_host_ids).delete()
logger.info(f"[sync_cmdb_host] task_id -> {task_id}, need_delete_host_ids -> {need_delete_host_ids}")
if not LPUSH_AND_EXPIRE_FUNC:
models.ProcessStatus.objects.filter(bk_host_id__in=need_delete_host_ids).delete()
logger.info(
"Enterprise binary: [sync_cmdb_host] task_id -> %s, need_delete_host_ids -> %s, num -> %s"
% (
task_id,
need_delete_host_ids,
len(need_delete_host_ids),
)
)
else:
name = constants.REDIS_NEED_DELETE_HOST_IDS_KEY_TPL
LPUSH_AND_EXPIRE_FUNC(keys=[name], args=[constants.TimeUnit.DAY] + list(need_delete_host_ids))
logger.info(
"[sync_cmdb_host] task_id -> %s, store need_delete_host_ids -> %s into redis, num -> %s"
% (
task_id,
need_delete_host_ids,
len(need_delete_host_ids),
)
)

logger.info("[sync_cmdb_host] complete: task_id -> %s, bk_biz_ids -> %s" % (task_id, bk_biz_ids))

logger.info(f"[sync_cmdb_host] complete: task_id -> {task_id}, bk_biz_ids -> {bk_biz_ids}")

def clear_need_delete_host_ids(task_id=None):
name: str = constants.REDIS_NEED_DELETE_HOST_IDS_KEY_TPL
# 计算出要从redis取数据的长度,限制最大长度
report_data_len: int = min(REDIS_INST.llen(name), constants.MAX_HOST_IDS_LENGTH)
# 从redis中取出对应长度的数据
report_data: typing.List[str] = REDIS_INST.lrange(name, -report_data_len, -1)
# 使用ltrim保留剩下的,可以保证redis中新push的值不会丢失
REDIS_INST.ltrim(name, 0, -report_data_len - 1)
need_delete_host_ids: typing.Set[int] = set([int(data) for data in report_data])
if need_delete_host_ids:
host_ids = list(need_delete_host_ids)
query_cmdb_and_handle_need_delete_host_ids(host_ids=host_ids, task_id=task_id)

logger.info("[clear_need_delete_host_ids] complete: task_id -> %s" % (task_id,))


@controller.ConcurrentController(
data_list_name="host_ids",
batch_call_func=batch_call_serial,
get_config_dict_func=lambda: {"limit": constants.QUERY_CMDB_LIMIT},
)
def query_cmdb_and_handle_need_delete_host_ids(host_ids: typing.List[int], task_id: str):
"""
查询CMDB并处理需要删掉对应主机ID的ProcessStatus记录
:param host_ids: 主机ID列表
:param task_id: 任务ID
"""
query_hosts_params: typing.Dict[str, typing.Any] = {
"page": {"start": 0, "limit": constants.QUERY_CMDB_LIMIT},
"fields": [constants.CC_HOST_FIELDS[0]],
"host_property_filter": {
"condition": "AND",
"rules": [{"field": "bk_host_id", "operator": "in", "value": host_ids}],
},
}
cmdb_host_infos: typing.List[typing.Dict[str, int]] = client_v2.cc.list_hosts_without_biz(query_hosts_params)[
"info"
]
bk_host_ids_in_cmdb: typing.List[int] = [cmdb_host_info.get("bk_host_id") for cmdb_host_info in cmdb_host_infos]
logger.info(
"[find_hosts_in_cmdb] task_id -> %s, bk_host_ids -> %s , num -> %s"
% (
task_id,
bk_host_ids_in_cmdb,
len(bk_host_ids_in_cmdb),
)
)
final_delete_host_ids = set(host_ids) - set(bk_host_ids_in_cmdb)
models.ProcessStatus.objects.filter(bk_host_id__in=final_delete_host_ids).delete()
logger.info(
"[clear_final_delete_host_ids] task_id -> %s, final_delete_host_ids -> %s, num -> %s"
% (
task_id,
final_delete_host_ids,
len(final_delete_host_ids),
)
)
return []


@periodic_task(
Expand All @@ -492,3 +575,16 @@ def sync_cmdb_host_task(bk_biz_id=None):
"""
task_id = sync_cmdb_host_task.request.id
sync_cmdb_host(bk_biz_id, task_id)


@periodic_task(
queue="default",
options={"queue": "default"},
run_every=constants.CLEAR_NEED_DELETE_HOST_IDS_INTERVAL,
)
def clear_need_delete_host_ids_task():
"""
被动周期删除cmdb不存在的主机
"""
task_id = clear_need_delete_host_ids_task.request.id
clear_need_delete_host_ids(task_id)
45 changes: 43 additions & 2 deletions apps/node_man/tests/test_pericdic_tasks/test_sync_cmdb_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
import copy
from unittest.mock import patch

from apps.backend.views import LPUSH_AND_EXPIRE_FUNC
from apps.mock_data.common_unit.host import PROCESS_STATUS_MODEL_DATA
from apps.node_man import constants
from apps.node_man.models import Host
from apps.node_man.periodic_tasks.sync_cmdb_host import sync_cmdb_host_periodic_task
from apps.node_man.models import Host, ProcessStatus
from apps.node_man.periodic_tasks.sync_cmdb_host import (
clear_need_delete_host_ids_task,
sync_cmdb_host_periodic_task,
)
from apps.utils.unittest.testcase import CustomBaseTestCase

from .mock_data import MOCK_BK_BIZ_ID, MOCK_HOST, MOCK_HOST_NUM
Expand Down Expand Up @@ -56,3 +61,39 @@ def test_sync_cmdb_host(self):

# 验证主机信息是否删除成功
self.assertEqual(Host.objects.filter(bk_host_id=-1).count(), 0)


class TestClearNeedDeleteHostIds(CustomBaseTestCase):
@staticmethod
def init_db():
proc_status_data = copy.deepcopy(PROCESS_STATUS_MODEL_DATA)
proc_status_list = []
for bk_host_id in range(1, 6):
proc_status_data["bk_host_id"] = bk_host_id
proc_status_list.append(ProcessStatus(**proc_status_data))

ProcessStatus.objects.bulk_create(proc_status_list)

need_delete_host_ids = range(1, 6)
name = constants.REDIS_NEED_DELETE_HOST_IDS_KEY_TPL
LPUSH_AND_EXPIRE_FUNC(keys=[name], args=[constants.TimeUnit.DAY] + list(need_delete_host_ids))

@staticmethod
def list_hosts_without_biz(*args, **kwargs):
return {
"count": 1,
"info": [
{"bk_host_id": 1},
],
}

def start_patch(self):
MockClient.cc.list_hosts_without_biz = self.list_hosts_without_biz

@patch("apps.node_man.periodic_tasks.sync_cmdb_host.client_v2", MockClient)
def test_clear_need_delete_host_ids(self):
self.init_db()
self.start_patch()
clear_need_delete_host_ids_task()
# 验证ProcessStatus中信息是否删除成功
self.assertEqual(ProcessStatus.objects.count(), 1)

0 comments on commit 3666791

Please sign in to comment.