Skip to content

Commit

Permalink
feat: 定点回档物理备份changeMaster TencentBlueKing#8338
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo committed Dec 4, 2024
1 parent fe1e985 commit b66e54e
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from backend.db_meta.models import Cluster, StorageInstanceTuple
from backend.db_package.models import Package
from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler
from backend.flow.consts import InstanceStatus, MediumEnum, MysqlChangeMasterType, RollbackType
from backend.flow.consts import InstanceStatus, MediumEnum, MySQLBackupTypeEnum, MysqlChangeMasterType, RollbackType
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.exceptions import NormalTenDBFlowException
Expand Down Expand Up @@ -184,7 +184,6 @@ def rollback_data_flow(self):
)

backupinfo = get_local_backup(inst_list, cluster_class, mycluster["rollback_time"])
logger.debug(backupinfo)
if backupinfo is None:
logger.error("cluster {} backup info not exists".format(cluster_class.id))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_class.id)))
Expand Down Expand Up @@ -281,11 +280,9 @@ def rollback_to_cluster_flow(self):
)

sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))

rollback_class = Cluster.objects.get(id=self.data["rollback_cluster_id"])
storages = rollback_class.storageinstance_set.all()
rollback_pipeline_list = []
repl_pipeline_list = []
for rollback_storage in storages:
if not check_storage_database(
rollback_class.bk_cloud_id, rollback_storage.machine.ip, rollback_storage.port
Expand All @@ -294,6 +291,7 @@ def rollback_to_cluster_flow(self):
raise NormalSpiderFlowException(
message=_("回档集群 {} 空闲检查不通过,请确认回档集群是否存在非系统数据库".format(rollback_class.id))
)
backupinfo = copy.deepcopy(self.data["backupinfo"])
mycluster = {
"name": cluster_class.name,
"cluster_id": cluster_class.id,
Expand All @@ -311,7 +309,7 @@ def rollback_to_cluster_flow(self):
"skip_local_exists": True,
"name_regex": "^.+{}\\.\\d+(\\..*)*$".format(master.port),
"rollback_time": self.data["rollback_time"],
"backupinfo": self.data["backupinfo"],
"backupinfo": backupinfo,
"rollback_type": self.data["rollback_type"],
"rollback_ip": rollback_storage.machine.ip,
"rollback_port": rollback_storage.port,
Expand Down Expand Up @@ -359,7 +357,7 @@ def rollback_to_cluster_flow(self):
if rollback_storage.instance_role in (InstanceRole.BACKEND_SLAVE, InstanceRole.BACKEND_REPEATER):
rollback_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
rollback_pipeline.add_act(
act_name=_("从库reset slave {}").format(rollback_storage.ip_port),
act_name=_("从库stop slave {}").format(rollback_storage.ip_port),
act_component_code=MySQLExecuteRdsComponent.code,
kwargs=asdict(
ExecuteRdsKwargs(
Expand All @@ -370,7 +368,6 @@ def rollback_to_cluster_flow(self):
)
),
)
# todo 屏蔽告警
# 本地备份+时间
if self.data["rollback_type"] == RollbackType.LOCAL_AND_TIME:
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
Expand All @@ -384,7 +381,6 @@ def rollback_to_cluster_flow(self):
"{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port)
)
backupinfo = get_local_backup(inst_list, cluster_class, mycluster["rollback_time"])
logger.debug(backupinfo)
if backupinfo is None:
logger.error("cluster {} backup info not exists".format(cluster_class.id))
raise TendbGetBackupInfoFailedException(message=_("获取集群 {} 的备份信息失败".format(cluster_class.id)))
Expand Down Expand Up @@ -431,54 +427,71 @@ def rollback_to_cluster_flow(self):
else:
raise NormalTenDBFlowException(message=_("rollback_type不存在"))

# 针对slave repeater角色的从库。建立复制链路。重置slave>添加复制账号和获取位点>建立主从关系
backup_type = backupinfo.get("backup_type", "")
if rollback_storage.instance_role in (InstanceRole.BACKEND_SLAVE, InstanceRole.BACKEND_REPEATER):
repl_master = StorageInstanceTuple.objects.get(receiver=rollback_storage)
if backup_type == MySQLBackupTypeEnum.PHYSICAL.value:
repl_cluster = {
"target_ip": repl_master.ejector.machine.ip,
"target_port": repl_master.ejector.port,
"repl_ip": rollback_storage.machine.ip,
"repl_port": rollback_storage.port,
"change_master_type": MysqlChangeMasterType.MASTERSTATUS.value,
"change_master_force": True,
}
# 获取位点信息
exec_act_kwargs.cluster = copy.deepcopy(repl_cluster)
exec_act_kwargs.exec_ip = repl_master.ejector.machine.ip
exec_act_kwargs.job_timeout = MYSQL_USUAL_JOB_TIME
exec_act_kwargs.get_mysql_payload_func = (
MysqlActPayload.tendb_grant_remotedb_repl_user.__name__
)
rollback_pipeline.add_act(
act_name=_("新增repl帐户{}".format(exec_act_kwargs.exec_ip)),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
write_payload_var="show_master_status_info",
)
# 启动,或者建立组从关系
exec_act_kwargs.exec_ip = rollback_storage.machine.ip
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_remotedb_change_master.__name__
rollback_pipeline.add_act(
act_name=_("建立原主从关系{}".format(rollback_storage.ip_port)),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
elif backup_type == MySQLBackupTypeEnum.LOGICAL.value:
rollback_pipeline.add_act(
act_name=_("从库start slave {}").format(rollback_storage.ip_port),
act_component_code=MySQLExecuteRdsComponent.code,
kwargs=asdict(
ExecuteRdsKwargs(
bk_cloud_id=cluster_class.bk_cloud_id,
instance_ip=rollback_storage.machine.ip,
instance_port=rollback_storage.port,
sqls=["start slave"],
)
),
)
rollback_pipeline.add_act(
act_name=_("解除监控屏蔽 {}").format(rollback_storage.ip_port),
act_component_code=MysqlCrondMonitorControlComponent.code,
kwargs=asdict(
CrondMonitorKwargs(
bk_cloud_id=cluster_class.bk_cloud_id,
exec_ips=[rollback_storage.machine.ip],
port=rollback_storage.port,
enable=True,
)
),
)
rollback_pipeline_list.append(
rollback_pipeline.build_sub_process(
sub_name=_("定点回档到{}:{}".format(rollback_storage.machine.ip, rollback_storage.port))
)
)

# todo 针对slave repeater角色的从库。建立复制链路。重置slave>添加复制账号和获取位点>建立主从关系
if rollback_storage.instance_role in (InstanceRole.BACKEND_SLAVE, InstanceRole.BACKEND_REPEATER):
repl_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))

repl_master = StorageInstanceTuple.objects.get(receiver=rollback_storage)
repl_cluster = {
"target_ip": repl_master.ejector.machine.ip,
"target_port": repl_master.ejector.port,
"repl_ip": rollback_storage.machine.ip,
"repl_port": rollback_storage.port,
"change_master_type": MysqlChangeMasterType.MASTERSTATUS.value,
"change_master_force": True,
}
# 获取位点信息
exec_act_kwargs.cluster = copy.deepcopy(repl_cluster)
exec_act_kwargs.exec_ip = repl_master.ejector.machine.ip
exec_act_kwargs.job_timeout = MYSQL_USUAL_JOB_TIME
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_grant_remotedb_repl_user.__name__
repl_pipeline.add_act(
act_name=_("新增repl帐户{}".format(exec_act_kwargs.exec_ip)),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
write_payload_var="show_master_status_info",
)
# 启动,或者建立组从关系
exec_act_kwargs.exec_ip = rollback_storage.machine.ip
exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.tendb_remotedb_change_master.__name__
repl_pipeline.add_act(
act_name=_("建立原主从关系{}".format(rollback_storage.ip_port)),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)

repl_pipeline_list.append(
repl_pipeline.build_sub_process(
sub_name=_("建立主从关系: {}->{}".format(repl_master.ejector.ip_port, rollback_storage.ip_port))
)
)

sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=rollback_pipeline_list)
if len(repl_pipeline_list) > 0:
sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=repl_pipeline_list)
sub_pipeline_list.append(
sub_pipeline.build_sub_process(sub_name=_("定点回档到{}".format(rollback_class.immute_domain)))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,17 @@ def tendb_rollback_data(self):
"change_master": False,
}
spd_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
spd_sub_pipeline.add_act(
act_name=_("屏蔽监控 {}").format(spider_node["instance"]),
act_component_code=MysqlCrondMonitorControlComponent.code,
kwargs=asdict(
CrondMonitorKwargs(
bk_cloud_id=target_cluster.bk_cloud_id,
exec_ips=[spider_node["ip"]],
port=spider_node["port"],
)
),
)
cluster = {"proxy_status": InstanceStatus.RESTORING.value, "proxy_ids": [target_spider.id]}
spd_sub_pipeline.add_act(
act_name=_("写入初始化实例的db_meta元信息"),
Expand Down Expand Up @@ -180,6 +191,18 @@ def tendb_rollback_data(self):
)
),
)
spd_sub_pipeline.add_act(
act_name=_("解除监控屏蔽 {}").format(spider_node["instance"]),
act_component_code=MysqlCrondMonitorControlComponent.code,
kwargs=asdict(
CrondMonitorKwargs(
bk_cloud_id=target_cluster.bk_cloud_id,
exec_ips=[spider_node["ip"]],
port=spider_node["port"],
enable=True,
)
),
)
ins_sub_pipeline_list.append(
spd_sub_pipeline.build_sub_process(sub_name=_("{} spider节点恢复".format(spider_node["instance"])))
)
Expand Down Expand Up @@ -260,7 +283,7 @@ def tendb_rollback_data(self):
)
),
)
# todo 屏蔽监控,停止从库备份
# 屏蔽监控
ins_sub_pipeline.add_act(
act_name=_("屏蔽监控 {}").format(shard_id),
act_component_code=MysqlCrondMonitorControlComponent.code,
Expand Down Expand Up @@ -293,6 +316,18 @@ def tendb_rollback_data(self):
)
),
)
ins_sub_pipeline.add_act(
act_name=_("解除监控屏蔽 {}").format(shard_id),
act_component_code=MysqlCrondMonitorControlComponent.code,
kwargs=asdict(
CrondMonitorKwargs(
bk_cloud_id=target_cluster.bk_cloud_id,
exec_ips=[remote_node["new_master"]["ip"], remote_node["new_slave"]["ip"]],
port=remote_node["new_master"]["port"],
enable=True,
)
),
)
ins_sub_pipeline_list.append(
ins_sub_pipeline.build_sub_process(sub_name=_("{} 分片主从恢复".format(shard_id)))
)
Expand Down
Loading

0 comments on commit b66e54e

Please sign in to comment.