From 3e6f4cb439cc001230b0e1270645781a204d03d7 Mon Sep 17 00:00:00 2001 From: iSecloud <869820505@qq.com> Date: Mon, 23 Oct 2023 15:53:28 +0800 Subject: [PATCH] =?UTF-8?q?fix(backend):=20=E8=B0=83=E6=95=B4mysql?= =?UTF-8?q?=E9=AB=98=E5=8F=AF=E7=94=A8=E5=8D=95=E6=8D=AE=E5=8F=82=E6=95=B0?= =?UTF-8?q?=20#1507?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mysql/fixpoint_rollback/handlers.py | 3 ++- .../scene/mysql/mysql_rollback_data_flow.py | 17 ++++++++++++++++- .../scene/mysql/mysql_rollback_data_sub_flow.py | 5 +++-- .../flow/utils/mysql/mysql_act_playload.py | 3 ++- .../builders/mysql/mysql_fixpoint_rollback.py | 4 ++-- .../builders/mysql/mysql_master_slave_switch.py | 7 +++++++ dbm-ui/backend/ticket/constants.py | 4 ++-- dbm-ui/backend/ticket/flow_manager/base.py | 10 ++++++++-- 8 files changed, 42 insertions(+), 11 deletions(-) diff --git a/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py b/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py index f1e4043e20..f5dbc23041 100644 --- a/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py +++ b/dbm-ui/backend/db_services/mysql/fixpoint_rollback/handlers.py @@ -400,12 +400,13 @@ def query_binlog_from_bklog( "port": binlogs[0]["port"], "file_list_details": [], } - collector_fields = ["file_mtime", "start_time", "stop_time", "size", "task_id"] + collector_fields = ["file_mtime", "start_time", "stop_time", "size", "task_id", "filename"] for log in binlogs: if str2datetime(log["stop_time"]) > end_time or str2datetime(log["stop_time"]) < start_time: continue detail = {field: log[field] for field in collector_fields} + detail["file_name"] = detail.pop("filename") binlog_record["file_list_details"].append(detail) return binlog_record diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py index d0bb992ac7..bb2fdded28 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_flow.py @@ -21,7 +21,10 @@ from backend.db_package.models import Package from backend.flow.consts import MediumEnum, RollbackType from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder -from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import install_mysql_in_cluster_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import ( + build_surrounding_apps_sub_flow, + install_mysql_in_cluster_sub_flow, +) from backend.flow.engine.bamboo.scene.mysql.common.exceptions import NormalTenDBFlowException from backend.flow.engine.bamboo.scene.mysql.mysql_rollback_data_sub_flow import ( rollback_local_and_backupid, @@ -128,6 +131,18 @@ def rollback_data_flow(self): ), ) + sub_pipeline.add_sub_pipeline( + sub_flow=build_surrounding_apps_sub_flow( + bk_cloud_id=cluster_class.bk_cloud_id, + master_ip_list=None, + slave_ip_list=[self.data["rollback_ip"]], + root_id=self.root_id, + parent_global_data=copy.deepcopy(self.data), + is_init=True, + cluster_type=ClusterType.TenDBHA.value, + ) + ) + exec_act_kwargs = ExecActuatorKwargs( cluster=cluster, bk_cloud_id=cluster_class.bk_cloud_id, diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_sub_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_sub_flow.py index f6dbdc4974..fa29ca8a39 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_sub_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_rollback_data_sub_flow.py @@ -44,6 +44,7 @@ from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta from backend.utils import time +from backend.utils.time import str2datetime logger = logging.getLogger("flow") @@ -237,8 +238,8 @@ def rollback_remote_and_time(root_id: str, ticket_data: dict, cluster_info: dict kwargs=asdict(exec_act_kwargs), write_payload_var="change_master_info", ) - backup_time = time.strptime(backupinfo["backup_time"], "%Y-%m-%d %H:%M:%S") - rollback_time = time.strptime(cluster_info["rollback_time"], "%Y-%m-%d %H:%M:%S") + backup_time = str2datetime(backupinfo["backup_time"], "%Y-%m-%d %H:%M:%S") + rollback_time = str2datetime(cluster_info["rollback_time"], "%Y-%m-%d %H:%M:%S") rollback_handler = FixPointRollbackHandler(cluster_info["cluster_id"]) backup_binlog = rollback_handler.query_binlog_from_bklog( start_time=backup_time, diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py index 12a4e48231..50a77492ac 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py @@ -1866,6 +1866,7 @@ def tendb_recover_binlog_payload(self, **kwargs): else: binlog_files = self.cluster["binlog_files"] backup_time = self.cluster["backup_time"] + binlog_files_list = binlog_files.split(",") payload = { "db_type": DBActuatorTypeEnum.MySQL.value, "action": DBActuatorActionEnum.RecoverBinlog.value, @@ -1874,7 +1875,7 @@ def tendb_recover_binlog_payload(self, **kwargs): "extend": { "work_dir": self.cluster["file_target_path"], "binlog_dir": self.cluster["file_target_path"], - "binlog_files": binlog_files, + "binlog_files": binlog_files_list, "tgt_instance": { "host": kwargs["ip"], "port": self.cluster["rollback_port"], diff --git a/dbm-ui/backend/ticket/builders/mysql/mysql_fixpoint_rollback.py b/dbm-ui/backend/ticket/builders/mysql/mysql_fixpoint_rollback.py index 8decd98b4e..fc2de212d1 100644 --- a/dbm-ui/backend/ticket/builders/mysql/mysql_fixpoint_rollback.py +++ b/dbm-ui/backend/ticket/builders/mysql/mysql_fixpoint_rollback.py @@ -43,7 +43,7 @@ class FixPointRollbackSerializer(serializers.Serializer): tables = serializers.ListField(help_text=_("目标table列表"), child=DBTableField()) tables_ignore = serializers.ListField(help_text=_("忽略table列表"), child=DBTableField()) - infos = serializers.ListSerializer(help_text=_("定点回档信息"), child=FixPointRollbackSerializer()) + infos = serializers.ListSerializer(help_text=_("定点构造信息"), child=FixPointRollbackSerializer()) @classmethod def validate_rollback_info(cls, info, now): @@ -86,5 +86,5 @@ def format_ticket_data(self): class MysqlFixPointRollbackFlowBuilder(BaseMySQLTicketFlowBuilder): serializer = MySQLFixPointRollbackDetailSerializer inner_flow_builder = MySQLFixPointRollbackFlowParamBuilder - inner_flow_name = _("定点回档执行") + inner_flow_name = _("定点构造执行") retry_type = FlowRetryType.MANUAL_RETRY diff --git a/dbm-ui/backend/ticket/builders/mysql/mysql_master_slave_switch.py b/dbm-ui/backend/ticket/builders/mysql/mysql_master_slave_switch.py index 3df2ce7e05..dcb00abcd1 100644 --- a/dbm-ui/backend/ticket/builders/mysql/mysql_master_slave_switch.py +++ b/dbm-ui/backend/ticket/builders/mysql/mysql_master_slave_switch.py @@ -33,6 +33,13 @@ class InfoSerializer(serializers.Serializer): cluster_ids = serializers.ListField(help_text=_("集群ID列表"), child=serializers.IntegerField()) infos = serializers.ListField(help_text=_("单据信息"), child=InfoSerializer()) + # TODO: 后续会删除is_safe参数 + is_safe = serializers.BooleanField(help_text=_("安全模式"), default=False, required=False) + is_check_process = serializers.BooleanField(help_text=_("是否检测连接"), default=False, required=False) + is_check_delay = serializers.BooleanField( + help_text=_("是否检测数据同步延时情况(互切单据延时属于强制检测,故必须传True)"), default=False, required=False + ) + is_verify_checksum = serializers.BooleanField(help_text=_("是否检测历史数据检验结果"), default=False, required=False) def validate(self, attrs): # 校验集群是否可用,集群类型为高可用 diff --git a/dbm-ui/backend/ticket/constants.py b/dbm-ui/backend/ticket/constants.py index 2c838a3538..854ffa098a 100644 --- a/dbm-ui/backend/ticket/constants.py +++ b/dbm-ui/backend/ticket/constants.py @@ -156,7 +156,7 @@ def get_choice_value(cls, label: str) -> str: MYSQL_PARTITION = EnumField("MYSQL_PARTITION", _("MySQL 分区")) MYSQL_DATA_REPAIR = EnumField("MYSQL_DATA_REPAIR", _("MySQL 数据修复")) MYSQL_FLASHBACK = EnumField("MYSQL_FLASHBACK", _("MySQL 闪回")) - MYSQL_ROLLBACK_CLUSTER = EnumField("MYSQL_ROLLBACK_CLUSTER", _("MySQL 定点回档")) + MYSQL_ROLLBACK_CLUSTER = EnumField("MYSQL_ROLLBACK_CLUSTER", _("MySQL 定点构造")) MYSQL_HA_FULL_BACKUP = EnumField("MYSQL_HA_FULL_BACKUP", _("MySQL 高可用全库备份")) MYSQL_SINGLE_TRUNCATE_DATA = EnumField("MYSQL_SINGLE_TRUNCATE_DATA", _("MySQL 单节点清档")) MYSQL_SINGLE_RENAME_DATABASE = EnumField("MYSQL_SINGLE_RENAME_DATABASE", _("MySQL 单节点DB重命名")) @@ -188,7 +188,7 @@ def get_choice_value(cls, label: str) -> str: TENDBCLUSTER_TEMPORARY_DESTROY = EnumField("TENDBCLUSTER_TEMPORARY_DESTROY", _("TenDB Cluster 临时集群销毁")) TENDBCLUSTER_NODE_REBALANCE = EnumField("TENDBCLUSTER_NODE_REBALANCE", _("TenDB Cluster 集群容量变更")) TENDBCLUSTER_FULL_BACKUP = EnumField("TENDBCLUSTER_FULL_BACKUP", _("TenDB Cluster 全库备份")) - TENDBCLUSTER_ROLLBACK_CLUSTER = EnumField("TENDBCLUSTER_ROLLBACK_CLUSTER", _("TenDB Cluster 定点回档")) + TENDBCLUSTER_ROLLBACK_CLUSTER = EnumField("TENDBCLUSTER_ROLLBACK_CLUSTER", _("TenDB Cluster 定点构造")) TENDBCLUSTER_FLASHBACK = EnumField("TENDBCLUSTER_FLASHBACK", _("TenDB Cluster 闪回")) TENDBCLUSTER_CLIENT_CLONE_RULES = EnumField("TENDBCLUSTER_CLIENT_CLONE_RULES", _("TenDB Cluster 客户端权限克隆")) TENDBCLUSTER_INSTANCE_CLONE_RULES = EnumField("TENDBCLUSTER_INSTANCE_CLONE_RULES", _("TenDB Cluster DB实例权限克隆")) diff --git a/dbm-ui/backend/ticket/flow_manager/base.py b/dbm-ui/backend/ticket/flow_manager/base.py index 4268e64aa7..22c05f4a20 100644 --- a/dbm-ui/backend/ticket/flow_manager/base.py +++ b/dbm-ui/backend/ticket/flow_manager/base.py @@ -8,7 +8,9 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import logging import operator +import traceback from abc import ABC, abstractmethod from functools import reduce from typing import Any, Optional, Union @@ -21,6 +23,8 @@ from backend.ticket.models import ClusterOperateRecord, Flow, InstanceOperateRecord from backend.utils.basic import get_target_items_from_details +logger = logging.getLogger("root") + class BaseTicketFlow(ABC): def __init__(self, flow_obj: Flow): @@ -113,9 +117,11 @@ def run_error_status_handler(self, err: Exception): self.ticket.status = constants.TicketStatus.FAILED self.ticket.save(update_fields=["status", "update_at"]) - # 如果是自动重试,则认为flow和ticket都在执行 + # 如果是自动重试,则认为flow和ticket都在执行,否则打印异常的堆栈 if err_code == FlowErrCode.AUTO_EXCLUSIVE_ERROR.value: - self.run_status_handler(flow_obj_id="") + self.run_status_handler(flow_obj_id=self.flow_obj.flow_obj_id) + else: + logger.error(traceback.format_exc()) def run_status_handler(self, flow_obj_id: str): """run成功时,更新相关状态为RUNNING中"""