Skip to content

Commit

Permalink
fix(backend): 调整mysql高可用单据参数 #1507
Browse files Browse the repository at this point in the history
  • Loading branch information
iSecloud authored and zhangzhw8 committed Oct 23, 2023
1 parent e9330da commit 3e6f4cb
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,13 @@ def query_binlog_from_bklog(
"port": binlogs[0]["port"],
"file_list_details": [],
}
collector_fields = ["file_mtime", "start_time", "stop_time", "size", "task_id"]
collector_fields = ["file_mtime", "start_time", "stop_time", "size", "task_id", "filename"]
for log in binlogs:
if str2datetime(log["stop_time"]) > end_time or str2datetime(log["stop_time"]) < start_time:
continue

detail = {field: log[field] for field in collector_fields}
detail["file_name"] = detail.pop("filename")
binlog_record["file_list_details"].append(detail)

return binlog_record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from backend.db_package.models import Package
from backend.flow.consts import MediumEnum, RollbackType
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import install_mysql_in_cluster_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import (
build_surrounding_apps_sub_flow,
install_mysql_in_cluster_sub_flow,
)
from backend.flow.engine.bamboo.scene.mysql.common.exceptions import NormalTenDBFlowException
from backend.flow.engine.bamboo.scene.mysql.mysql_rollback_data_sub_flow import (
rollback_local_and_backupid,
Expand Down Expand Up @@ -128,6 +131,18 @@ def rollback_data_flow(self):
),
)

sub_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
bk_cloud_id=cluster_class.bk_cloud_id,
master_ip_list=None,
slave_ip_list=[self.data["rollback_ip"]],
root_id=self.root_id,
parent_global_data=copy.deepcopy(self.data),
is_init=True,
cluster_type=ClusterType.TenDBHA.value,
)
)

exec_act_kwargs = ExecActuatorKwargs(
cluster=cluster,
bk_cloud_id=cluster_class.bk_cloud_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta
from backend.utils import time
from backend.utils.time import str2datetime

logger = logging.getLogger("flow")

Expand Down Expand Up @@ -237,8 +238,8 @@ def rollback_remote_and_time(root_id: str, ticket_data: dict, cluster_info: dict
kwargs=asdict(exec_act_kwargs),
write_payload_var="change_master_info",
)
backup_time = time.strptime(backupinfo["backup_time"], "%Y-%m-%d %H:%M:%S")
rollback_time = time.strptime(cluster_info["rollback_time"], "%Y-%m-%d %H:%M:%S")
backup_time = str2datetime(backupinfo["backup_time"], "%Y-%m-%d %H:%M:%S")
rollback_time = str2datetime(cluster_info["rollback_time"], "%Y-%m-%d %H:%M:%S")
rollback_handler = FixPointRollbackHandler(cluster_info["cluster_id"])
backup_binlog = rollback_handler.query_binlog_from_bklog(
start_time=backup_time,
Expand Down
3 changes: 2 additions & 1 deletion dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,7 @@ def tendb_recover_binlog_payload(self, **kwargs):
else:
binlog_files = self.cluster["binlog_files"]
backup_time = self.cluster["backup_time"]
binlog_files_list = binlog_files.split(",")
payload = {
"db_type": DBActuatorTypeEnum.MySQL.value,
"action": DBActuatorActionEnum.RecoverBinlog.value,
Expand All @@ -1874,7 +1875,7 @@ def tendb_recover_binlog_payload(self, **kwargs):
"extend": {
"work_dir": self.cluster["file_target_path"],
"binlog_dir": self.cluster["file_target_path"],
"binlog_files": binlog_files,
"binlog_files": binlog_files_list,
"tgt_instance": {
"host": kwargs["ip"],
"port": self.cluster["rollback_port"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FixPointRollbackSerializer(serializers.Serializer):
tables = serializers.ListField(help_text=_("目标table列表"), child=DBTableField())
tables_ignore = serializers.ListField(help_text=_("忽略table列表"), child=DBTableField())

infos = serializers.ListSerializer(help_text=_("定点回档信息"), child=FixPointRollbackSerializer())
infos = serializers.ListSerializer(help_text=_("定点构造信息"), child=FixPointRollbackSerializer())

@classmethod
def validate_rollback_info(cls, info, now):
Expand Down Expand Up @@ -86,5 +86,5 @@ def format_ticket_data(self):
class MysqlFixPointRollbackFlowBuilder(BaseMySQLTicketFlowBuilder):
serializer = MySQLFixPointRollbackDetailSerializer
inner_flow_builder = MySQLFixPointRollbackFlowParamBuilder
inner_flow_name = _("定点回档执行")
inner_flow_name = _("定点构造执行")
retry_type = FlowRetryType.MANUAL_RETRY
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class InfoSerializer(serializers.Serializer):
cluster_ids = serializers.ListField(help_text=_("集群ID列表"), child=serializers.IntegerField())

infos = serializers.ListField(help_text=_("单据信息"), child=InfoSerializer())
# TODO: 后续会删除is_safe参数
is_safe = serializers.BooleanField(help_text=_("安全模式"), default=False, required=False)
is_check_process = serializers.BooleanField(help_text=_("是否检测连接"), default=False, required=False)
is_check_delay = serializers.BooleanField(
help_text=_("是否检测数据同步延时情况(互切单据延时属于强制检测,故必须传True)"), default=False, required=False
)
is_verify_checksum = serializers.BooleanField(help_text=_("是否检测历史数据检验结果"), default=False, required=False)

def validate(self, attrs):
# 校验集群是否可用,集群类型为高可用
Expand Down
4 changes: 2 additions & 2 deletions dbm-ui/backend/ticket/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def get_choice_value(cls, label: str) -> str:
MYSQL_PARTITION = EnumField("MYSQL_PARTITION", _("MySQL 分区"))
MYSQL_DATA_REPAIR = EnumField("MYSQL_DATA_REPAIR", _("MySQL 数据修复"))
MYSQL_FLASHBACK = EnumField("MYSQL_FLASHBACK", _("MySQL 闪回"))
MYSQL_ROLLBACK_CLUSTER = EnumField("MYSQL_ROLLBACK_CLUSTER", _("MySQL 定点回档"))
MYSQL_ROLLBACK_CLUSTER = EnumField("MYSQL_ROLLBACK_CLUSTER", _("MySQL 定点构造"))
MYSQL_HA_FULL_BACKUP = EnumField("MYSQL_HA_FULL_BACKUP", _("MySQL 高可用全库备份"))
MYSQL_SINGLE_TRUNCATE_DATA = EnumField("MYSQL_SINGLE_TRUNCATE_DATA", _("MySQL 单节点清档"))
MYSQL_SINGLE_RENAME_DATABASE = EnumField("MYSQL_SINGLE_RENAME_DATABASE", _("MySQL 单节点DB重命名"))
Expand Down Expand Up @@ -188,7 +188,7 @@ def get_choice_value(cls, label: str) -> str:
TENDBCLUSTER_TEMPORARY_DESTROY = EnumField("TENDBCLUSTER_TEMPORARY_DESTROY", _("TenDB Cluster 临时集群销毁"))
TENDBCLUSTER_NODE_REBALANCE = EnumField("TENDBCLUSTER_NODE_REBALANCE", _("TenDB Cluster 集群容量变更"))
TENDBCLUSTER_FULL_BACKUP = EnumField("TENDBCLUSTER_FULL_BACKUP", _("TenDB Cluster 全库备份"))
TENDBCLUSTER_ROLLBACK_CLUSTER = EnumField("TENDBCLUSTER_ROLLBACK_CLUSTER", _("TenDB Cluster 定点回档"))
TENDBCLUSTER_ROLLBACK_CLUSTER = EnumField("TENDBCLUSTER_ROLLBACK_CLUSTER", _("TenDB Cluster 定点构造"))
TENDBCLUSTER_FLASHBACK = EnumField("TENDBCLUSTER_FLASHBACK", _("TenDB Cluster 闪回"))
TENDBCLUSTER_CLIENT_CLONE_RULES = EnumField("TENDBCLUSTER_CLIENT_CLONE_RULES", _("TenDB Cluster 客户端权限克隆"))
TENDBCLUSTER_INSTANCE_CLONE_RULES = EnumField("TENDBCLUSTER_INSTANCE_CLONE_RULES", _("TenDB Cluster DB实例权限克隆"))
Expand Down
10 changes: 8 additions & 2 deletions dbm-ui/backend/ticket/flow_manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
import operator
import traceback
from abc import ABC, abstractmethod
from functools import reduce
from typing import Any, Optional, Union
Expand All @@ -21,6 +23,8 @@
from backend.ticket.models import ClusterOperateRecord, Flow, InstanceOperateRecord
from backend.utils.basic import get_target_items_from_details

logger = logging.getLogger("root")


class BaseTicketFlow(ABC):
def __init__(self, flow_obj: Flow):
Expand Down Expand Up @@ -113,9 +117,11 @@ def run_error_status_handler(self, err: Exception):
self.ticket.status = constants.TicketStatus.FAILED
self.ticket.save(update_fields=["status", "update_at"])

# 如果是自动重试,则认为flow和ticket都在执行
# 如果是自动重试,则认为flow和ticket都在执行,否则打印异常的堆栈
if err_code == FlowErrCode.AUTO_EXCLUSIVE_ERROR.value:
self.run_status_handler(flow_obj_id="")
self.run_status_handler(flow_obj_id=self.flow_obj.flow_obj_id)
else:
logger.error(traceback.format_exc())

def run_status_handler(self, flow_obj_id: str):
"""run成功时,更新相关状态为RUNNING中"""
Expand Down

0 comments on commit 3e6f4cb

Please sign in to comment.