Skip to content

Commit

Permalink
fix: 冲突处理
Browse files Browse the repository at this point in the history
  • Loading branch information
Myfatguy11 committed Oct 8, 2023
1 parent 5cced9d commit 26566b2
Showing 1 changed file with 54 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,66 @@

from celery.schedules import crontab

from backend import env
from backend.components import CCApi
from backend.db_meta.models import Cluster, Machine
from backend.db_meta.models.cluster_monitor import SyncFailedMachine
from backend.db_periodic_task.local_tasks.register import register_periodic_task
from backend.db_services.ipchooser.query.resource import ResourceQueryHelper
from backend.dbm_init.constants import CC_HOST_DBM_ATTR

logger = logging.getLogger("celery")


@register_periodic_task(run_every=crontab(minute="*/5"))
def reset_host_dbmeta(bk_biz_id=env.DBA_APP_BK_BIZ_ID, bk_host_ids=None):
"""
重置业务下空闲集群主机的dbm_meta属性
"""
now = datetime.datetime.now()
logger.info("[reset_host_dbmeta] start reset begin: %s", now)

if not bk_host_ids:
machines = ResourceQueryHelper.query_cc_hosts(
{
"bk_biz_id": bk_biz_id,
"bk_inst_id": CCApi.get_biz_internal_module({"bk_biz_id": bk_biz_id}, use_admin=True)["bk_set_id"],
"bk_obj_id": "set",
},
fields=["bk_host_id"],
)["info"]
bk_host_ids = list(map(lambda x: x["bk_host_id"], machines))

# 批量更新接口限制最多500条
step_size = 496
reset_hosts, failed_hosts = [], []
for step in range(len(bk_host_ids) // step_size + 1):
updates = []
for bk_host_id in bk_host_ids[step * step_size : (step + 1) * step_size]:
updates.append({"properties": {CC_HOST_DBM_ATTR: json.dumps([])}, "bk_host_id": bk_host_id})
reset_hosts.extend(updates)

try:
logger.info("[reset_host_dbmeta] batch_update_host: %s", updates)
CCApi.batch_update_host({"update": updates}, use_admin=True)
except Exception as e: # pylint: disable=wildcard-import
failed_hosts.extend(updates)
logger.error("[reset_host_dbmeta] batch reset exception: %s (%s)", updates, e)

# 容错处理:逐个更新,避免批量更新误伤有效ip
for fail_host in failed_hosts:
try:
CCApi.update_host({"bk_host_id": fail_host["bk_host_id"], "data": fail_host["properties"]}, use_admin=True)
except Exception as e: # pylint: disable=wildcard-import
logger.error("[reset_host_dbmeta] single reset error: %s (%s)", fail_host, e)

logger.info(
"[reset_host_dbmeta] finish reset end: %s, update_cnt: %s",
datetime.datetime.now() - now,
len(reset_hosts),
)


@register_periodic_task(run_every=crontab(minute="*/2"))
def update_host_dbmeta(bk_biz_id=None, cluster_id=None, cluster_ips=None, dbm_meta=None):
"""
Expand Down Expand Up @@ -57,12 +108,12 @@ def update_host_dbmeta(bk_biz_id=None, cluster_id=None, cluster_ips=None, dbm_me
machines = machines.filter(ip__in=cluster_ips)

# 批量更新接口限制最多500条,这里取456条
STEP = 456
step_size = 456
updated_hosts, failed_updates = [], []
machine_count = machines.count()
for step in range(machine_count // STEP + 1):
for step in range(machine_count // step_size + 1):
updates = []
for machine in machines[step * STEP : (step + 1) * STEP]:
for machine in machines[step * step_size : (step + 1) * step_size]:
cc_dbm_meta = machine.dbm_meta if dbm_meta is None else dbm_meta
updates.append(
{"properties": {CC_HOST_DBM_ATTR: json.dumps(cc_dbm_meta)}, "bk_host_id": machine.bk_host_id}
Expand Down

0 comments on commit 26566b2

Please sign in to comment.