From 52da228f82c7a2bc702e10a3d1dfb1ed3c142c3e Mon Sep 17 00:00:00 2001 From: lukemakeit <2302063437@qq.com> Date: Mon, 25 Sep 2023 16:30:54 +0800 Subject: [PATCH] =?UTF-8?q?feat(redis):=20redis=20=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E5=8E=9F=E5=9C=B0=E5=8D=87=E7=BA=A7=20#1209?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../example/redis_cluster_failover.example.md | 24 + .../example/redis_version_update.example.md | 21 + .../dbactuator/models/myredis/client.go | 26 + .../dbactuator/models/myredis/myredis.go | 41 ++ .../models/myredis/tendisplus_infoRepl.go | 68 ++- .../pkg/atomjobs/atomredis/redis_backup.go | 3 + .../atomredis/redis_cluster_failover.go | 483 ++++++++++++++++ .../pkg/atomjobs/atomredis/redis_switch.go | 4 + .../atomredis/redis_version_update.go | 486 ++++++++++++++++ .../dbactuator/pkg/jobmanager/jobmanager.go | 2 + .../db-tools/dbactuator/pkg/util/redisutil.go | 12 + .../db-tools/dbactuator/pkg/util/util.go | 23 + .../db_services/redis/redis_dts/constants.py | 20 + .../db_services/redis/redis_dts/util.py | 64 ++- dbm-ui/backend/flow/consts.py | 2 + .../bamboo/scene/common/get_file_list.py | 22 +- .../scene/redis/redis_cluster_data_copy.py | 80 +-- .../scene/redis/redis_cluster_scene_mss.py | 2 +- .../redis_cluster_version_update_online.py | 544 ++++++++++++++++++ .../backend/flow/engine/controller/redis.py | 8 + .../components/collections/redis/redis_dts.py | 8 + dbm-ui/backend/flow/urls.py | 2 + .../flow/utils/redis/redis_act_playload.py | 133 ++++- .../backend/flow/utils/redis/redis_db_meta.py | 17 +- .../flow/utils/redis/redis_proxy_util.py | 33 +- dbm-ui/backend/flow/utils/redis/redis_util.py | 86 +++ dbm-ui/backend/flow/views/redis_cluster.py | 26 + 27 files changed, 2152 insertions(+), 88 deletions(-) create mode 100644 dbm-services/redis/db-tools/dbactuator/example/redis_cluster_failover.example.md create mode 100644 dbm-services/redis/db-tools/dbactuator/example/redis_version_update.example.md create mode 100644 dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_cluster_failover.go create mode 100644 dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_version_update.go create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_version_update_online.py diff --git a/dbm-services/redis/db-tools/dbactuator/example/redis_cluster_failover.example.md b/dbm-services/redis/db-tools/dbactuator/example/redis_cluster_failover.example.md new file mode 100644 index 0000000000..4b608194e9 --- /dev/null +++ b/dbm-services/redis/db-tools/dbactuator/example/redis_cluster_failover.example.md @@ -0,0 +1,24 @@ +### redis cluster failover +redis cluster failover +`./dbactuator_redis --uid={{uid}} --root_id={{root_id}} --node_id={{node_id}} --version_id={{version_id}} --atom-job-list="redis_cluster_failover" --data_dir=/path/to/data --backup_dir=/path/to/backup --payload='{{payload_base64}}'` + +`--data_dir`、`--backup_dir` 可以留空. + + +原始payload +```json +{ + "redis_password":"xxxx", + "redis_master_slave_pairs":[ + { + "master": {"ip":"a.a.a.a","port":"30000"}, + "slave": {"ip":"b.b.b.b","port":"30000"} + }, + { + "master": {"ip":"a.a.a.a","port":"30001"}, + "slave": {"ip":"b.b.b.b","port":"30001"} + } + ], + "force":false +} +``` \ No newline at end of file diff --git a/dbm-services/redis/db-tools/dbactuator/example/redis_version_update.example.md b/dbm-services/redis/db-tools/dbactuator/example/redis_version_update.example.md new file mode 100644 index 0000000000..f361736176 --- /dev/null +++ b/dbm-services/redis/db-tools/dbactuator/example/redis_version_update.example.md @@ -0,0 +1,21 @@ +### redis version update +redis版本更新 +`./dbactuator_redis --uid={{uid}} --root_id={{root_id}} --node_id={{node_id}} --version_id={{version_id}} --atom-job-list="redis_version_update" --data_dir=/path/to/data --backup_dir=/path/to/backup --payload='{{payload_base64}}'` + +`--data_dir`、`--backup_dir` 可以留空. + +前置工作: +- 将`redis-6.2.7.tar.gz`下载到`/data/install`目录下; + + +原始payload +```json +{ + "pkg":"redis-6.2.7.tar.gz", + "pkg_md5":"1fc9e5c3a044ce523844a6f2717e5ac3", + "ip":"127.0.0.1", + "ports":[30000,30001,30002], //端口不连续 + "password":"xxx", + "role":"redis_master" // or redis_slave +} +``` \ No newline at end of file diff --git a/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go b/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go index 1d0e12da77..ae5c56a900 100644 --- a/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go +++ b/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go @@ -1751,6 +1751,32 @@ func (db *RedisClient) ClusterReset() (err error) { return nil } +// ClusterFailOver 执行cluster failover,opt: force or takeover +func (db *RedisClient) ClusterFailOver(opt string) (err error) { + var ret string + if opt == "" { + ret, err = db.InstanceClient.ClusterFailover(context.Background()).Result() + } else if strings.ToLower(opt) == "force" || strings.ToLower(opt) == "takeover" { + cmds := []string{"cluster", "failover", opt} + tmpRet, err := db.DoCommand(cmds, 0) + if err != nil { + return err + } + ret = tmpRet.(string) + } + if err != nil { + err = fmt.Errorf("ClusterFailOver fail,err:%v,addr:%s", err, db.Addr) + mylog.Logger.Error(err.Error()) + return + } + if strings.ToLower(ret) != "ok" { + err = fmt.Errorf("ClusterFailOver fail,ret:%s,addr:%s", ret, db.Addr) + mylog.Logger.Error(err.Error()) + return + } + return nil +} + // GetClusterNodesStr 获取tendisplus集群cluster nodes命令结果,并返回字符串 func (db *RedisClient) GetClusterNodesStr() (ret string, err error) { ret, err = db.InstanceClient.ClusterNodes(context.TODO()).Result() diff --git a/dbm-services/redis/db-tools/dbactuator/models/myredis/myredis.go b/dbm-services/redis/db-tools/dbactuator/models/myredis/myredis.go index d797498747..cc222347aa 100644 --- a/dbm-services/redis/db-tools/dbactuator/models/myredis/myredis.go +++ b/dbm-services/redis/db-tools/dbactuator/models/myredis/myredis.go @@ -106,3 +106,44 @@ func LocalRedisConnectTest(ip string, ports []int, password string) (err error) } return } + +// CheckMultiRedisConnected 检查多个redis是否可连接 +func CheckMultiRedisConnected(addrs []string, password string) (err error) { + l01 := make([]*connTestItem, 0, len(addrs)) + for _, addr := range addrs { + ip, port, err := util.AddrToIpPort(addr) + if err != nil { + return err + } + l01 = append(l01, &connTestItem{ + IP: ip, + Port: port, + Password: password, + }) + } + // 并发测试 + wg := sync.WaitGroup{} + for _, item := range l01 { + test01 := item + wg.Add(1) + go func(test01 *connTestItem) { + defer wg.Done() + cli01, err := NewRedisClientWithTimeout(test01.addr(), test01.Password, 0, + consts.TendisTypeRedisInstance, 10*time.Second) + if err != nil { + test01.Err = err + return + } + cli01.Close() + }(test01) + } + wg.Wait() + + for _, item := range l01 { + test01 := item + if test01.Err != nil { + return test01.Err + } + } + return +} diff --git a/dbm-services/redis/db-tools/dbactuator/models/myredis/tendisplus_infoRepl.go b/dbm-services/redis/db-tools/dbactuator/models/myredis/tendisplus_infoRepl.go index 3e0d7a0053..35454b7999 100644 --- a/dbm-services/redis/db-tools/dbactuator/models/myredis/tendisplus_infoRepl.go +++ b/dbm-services/redis/db-tools/dbactuator/models/myredis/tendisplus_infoRepl.go @@ -12,15 +12,22 @@ import ( "dbm-services/redis/db-tools/dbactuator/pkg/consts" ) -// InfoReplSlave Tendisplus master中执行info replication结果中slave状态 +// InfoReplSlave master中执行info replication结果中slave状态 // 如: slave0:ip=luketest03-redis-rdsplus4-1.luketest03-svc.dmc,port=30000,state=online,offset=930327677,lag=0 type InfoReplSlave struct { - Name string `json:"name"` - IP string `json:"ip"` - Port int `json:"port"` - State string `json:"state"` - Offset int64 `json:"offset"` - Lag int64 `json:"lag"` + Name string `json:"name"` + IP string `json:"ip"` + Port int `json:"port"` + State string `json:"state"` + Offset int64 `json:"offset"` + Seq int64 `json:"seq"` + Lag int64 `json:"lag"` + BinlogLag int64 `json:"binlog_lag"` +} + +// Addr addr字符串 +func (slave *InfoReplSlave) Addr() string { + return slave.IP + ":" + strconv.Itoa(slave.Port) } func (slave *InfoReplSlave) decode(line string) error { @@ -40,10 +47,14 @@ func (slave *InfoReplSlave) decode(line string) error { slave.Port, _ = strconv.Atoi(list02[1]) } else if list02[0] == "state" { slave.State = list02[1] + } else if list02[0] == "seq" { + slave.Seq, _ = strconv.ParseInt(list02[1], 10, 64) } else if list02[0] == "offset" { slave.Offset, _ = strconv.ParseInt(list02[1], 10, 64) } else if list02[0] == "lag" { slave.Lag, _ = strconv.ParseInt(list02[1], 10, 64) + } else if list02[0] == "binlog_lag" { + slave.BinlogLag, _ = strconv.ParseInt(list02[1], 10, 64) } } return nil @@ -411,3 +422,46 @@ func (db *RedisClient) TendisSSDInfoSlaves() (ret TendisSSDInfoSlavesData, err e } return } + +// GetInfoReplSlaves 获取info replication中slave列表 +func (db *RedisClient) GetInfoReplSlaves() (slaveList []*InfoReplSlave, slaveMap map[string]*InfoReplSlave, err error) { + var replRet string + if db.DbType == consts.TendisTypeRedisCluster { + replRet, err = db.ClusterClient.Info(context.TODO(), "replication").Result() + } else { + replRet, err = db.InstanceClient.Info(context.TODO(), "replication").Result() + } + if err != nil { + err = fmt.Errorf("info replication fail,err:%v,aadr:%s", err, db.Addr) + mylog.Logger.Error(err.Error()) + return + } + infoList := strings.Split(replRet, "\n") + slaveReg := regexp.MustCompile(`^slave\d+$`) + slaveMap = make(map[string]*InfoReplSlave) + for _, infoItem := range infoList { + infoItem = strings.TrimSpace(infoItem) + if strings.HasPrefix(infoItem, "#") { + continue + } + if len(infoItem) == 0 { + continue + } + list01 := strings.SplitN(infoItem, ":", 2) + if len(list01) < 2 { + continue + } + list01[0] = strings.TrimSpace(list01[0]) + list01[1] = strings.TrimSpace(list01[1]) + if slaveReg.MatchString(list01[0]) == true { + slave01 := &InfoReplSlave{} + err = slave01.decode(infoItem) + if err != nil { + return + } + slaveList = append(slaveList, slave01) + slaveMap[slave01.Addr()] = slave01 + } + } + return +} diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go index 45df7a4131..1fef31bae8 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go @@ -419,6 +419,7 @@ func (task *BackupTask) newConnect() { if task.Err != nil { return } + mylog.Logger.Info("redis(%s) role:%s,dataDir:%s,dbType:%s", task.Addr(), task.Role, task.DataDir, task.DbType) // 获取数据量大小 if task.DbType == consts.TendisTypeRedisInstance { task.DataSize, task.Err = task.Cli.RedisInstanceDataSize() @@ -439,11 +440,13 @@ func (task *BackupTask) PrecheckDisk() { bakDiskUsg, err := util.GetLocalDirDiskUsg(task.BackupDir) task.Err = err if task.Err != nil { + mylog.Logger.Error("redis:%s backupDir:%s GetLocalDirDiskUsg fail,err:%v", task.Addr(), task.BackupDir, err) return } dataDiskUsg, err := util.GetLocalDirDiskUsg(task.DataDir) task.Err = err if task.Err != nil { + mylog.Logger.Error("redis:%s dataDir:%s GetLocalDirDiskUsg fail,err:%v", task.Addr(), task.DataDir, err) return } // 磁盘空间使用已有85%,则报错 diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_cluster_failover.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_cluster_failover.go new file mode 100644 index 0000000000..26e31d449e --- /dev/null +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_cluster_failover.go @@ -0,0 +1,483 @@ +package atomredis + +import ( + "encoding/json" + "fmt" + "strconv" + "sync" + "time" + + "github.com/go-playground/validator/v10" + + "dbm-services/redis/db-tools/dbactuator/models/myredis" + "dbm-services/redis/db-tools/dbactuator/mylog" + "dbm-services/redis/db-tools/dbactuator/pkg/consts" + "dbm-services/redis/db-tools/dbactuator/pkg/jobruntime" + "dbm-services/redis/db-tools/dbactuator/pkg/util" +) + +type instItem struct { + IP string `json:"ip" validate:"required"` + Port int `json:"port" validate:"required"` +} + +// Addr TODO +func (item instItem) Addr() string { + return fmt.Sprintf("%s:%d", item.IP, item.Port) +} + +type masterSlavePair struct { + Master instItem `json:"master" validate:"required"` + Slave instItem `json:"slave" validate:"required"` +} + +// RedisClusterFailoverParam redis cluster failover param +type RedisClusterFailoverParam struct { + RedisPassword string `json:"redis_password" validate:"required"` + RedisMasterSlavePairs []masterSlavePair `json:"redis_master_slave_pairs" validate:"required"` // {"master_ip:master_port","slave_ip:slave_port"} + // 如果是强制failover,则不检查集群状态是否ok,不检查所有slave的link状态是否ok,failover时加上force 参数,failover后也不检查new_slave(old_master)同步是否跟上 + Force bool `json:"force"` +} + +// RedisClusterFailover redis cluster failover +type RedisClusterFailover struct { + runtime *jobruntime.JobGenericRuntime + params RedisClusterFailoverParam +} + +// 无实际作用,仅确保实现了 jobruntime.JobRunner 接口 +var _ jobruntime.JobRunner = (*RedisClusterFailover)(nil) + +// NewRedisClusterFailover new +func NewRedisClusterFailover() jobruntime.JobRunner { + return &RedisClusterFailover{} +} + +// Init 初始化 +func (job *RedisClusterFailover) Init(m *jobruntime.JobGenericRuntime) error { + job.runtime = m + err := json.Unmarshal([]byte(job.runtime.PayloadDecoded), &job.params) + if err != nil { + job.runtime.Logger.Error(fmt.Sprintf("json.Unmarshal failed,err:%+v", err)) + return err + } + // 参数有效性检查 + validate := validator.New() + err = validate.Struct(job.params) + if err != nil { + if _, ok := err.(*validator.InvalidValidationError); ok { + job.runtime.Logger.Error("RedisClusterFailover Init params validate failed,err:%v,params:%+v", + err, job.params) + return err + } + for _, err := range err.(validator.ValidationErrors) { + job.runtime.Logger.Error("RedisClusterFailover Init params validate failed,err:%v,params:%+v", + err, job.params) + return err + } + } + return nil +} + +// Name 原子任务名 +func (job *RedisClusterFailover) Name() string { + return "redis_cluster_failover" +} + +// Run Command 执行 +func (job *RedisClusterFailover) Run() (err error) { + if job.params.Force { + err = job.checkAllSlaveConnected() + if err != nil { + return + } + err = job.checkAllRedisInCluster() + if err != nil { + return + } + } else { + err = job.checkAllRedisConnected() + if err != nil { + return + } + err = job.checkClusterStatus() + if err != nil { + return + } + err = job.checkAllRedisInCluster() + if err != nil { + return + } + err = job.checkAllSlaveLinkStatus() + if err != nil { + return + } + } + err = job.groupRunFailoverAndWaitReplicateStateOk() + if err != nil { + return + } + return nil +} + +func (job *RedisClusterFailover) checkAllRedisConnected() (err error) { + allRedisAddr := make([]string, 0, len(job.params.RedisMasterSlavePairs)*2) + for _, ms := range job.params.RedisMasterSlavePairs { + allRedisAddr = append(allRedisAddr, ms.Master.Addr(), ms.Slave.Addr()) + } + err = myredis.CheckMultiRedisConnected(allRedisAddr, job.params.RedisPassword) + if err != nil { + job.runtime.Logger.Error("checkAllRedisConnected failed,err:%v", err) + return + } + job.runtime.Logger.Info("checkAllRedisConnected success") + return nil +} + +// checkAllSlaveConnected 检查所有 slave 是否可连接 +func (job *RedisClusterFailover) checkAllSlaveConnected() (err error) { + allSlaveAddr := make([]string, 0, len(job.params.RedisMasterSlavePairs)) + for _, ms := range job.params.RedisMasterSlavePairs { + allSlaveAddr = append(allSlaveAddr, ms.Slave.Addr()) + } + err = myredis.CheckMultiRedisConnected(allSlaveAddr, job.params.RedisPassword) + if err != nil { + job.runtime.Logger.Error("checkAllSlaveConnected failed,err:%v", err) + return + } + job.runtime.Logger.Info("checkAllSlaveConnected success") + return nil +} + +// checkClusterStatus 连接第一个 master,检查集群状态是否 ok +func (job *RedisClusterFailover) checkClusterStatus() (err error) { + firstMasterAddr := "" + for _, ms := range job.params.RedisMasterSlavePairs { + firstMasterAddr = ms.Master.Addr() + break + } + rc, err := myredis.NewRedisClientWithTimeout(firstMasterAddr, job.params.RedisPassword, 0, + consts.TendisTypeRedisInstance, 5*time.Second) + if err != nil { + return + } + defer rc.Close() + clusterStatus, err := rc.ClusterInfo() + if err != nil { + return + } + if clusterStatus.ClusterState != consts.ClusterStateOK { + err = fmt.Errorf("cluster state not ok,clusterState:%s", clusterStatus.ClusterState) + job.runtime.Logger.Error("checkClusterStatus failed,err:%v", err) + return + } + job.runtime.Logger.Info("checkClusterStatus success") + return nil +} + +// checkAllRedisInCluster 连接第一个 slave,获取cluster nodes 信息,确保所有redis节点都在集群中 +func (job *RedisClusterFailover) checkAllRedisInCluster() (err error) { + firstSlaveAddr := "" + for _, ms := range job.params.RedisMasterSlavePairs { + firstSlaveAddr = ms.Slave.Addr() + break + } + rc, err := myredis.NewRedisClientWithTimeout(firstSlaveAddr, job.params.RedisPassword, 0, + consts.TendisTypeRedisInstance, 5*time.Second) + if err != nil { + return + } + defer rc.Close() + addrToClusterNode, err := rc.GetAddrMapToNodes() + if err != nil { + return + } + for _, ms := range job.params.RedisMasterSlavePairs { + if _, ok := addrToClusterNode[ms.Master.Addr()]; !ok { + err = fmt.Errorf("redis node:%s not in cluster,firstMasterAddr:%s", ms.Master.Addr(), firstSlaveAddr) + job.runtime.Logger.Error("checkAllRedisInCluster failed,err:%v", err) + return + } + if _, ok := addrToClusterNode[ms.Slave.Addr()]; !ok { + err = fmt.Errorf("redis node:%s not in cluster,firstMasterAddr:%s", ms.Slave.Addr(), firstSlaveAddr) + job.runtime.Logger.Error("checkAllRedisInCluster failed,err:%v", err) + return + } + } + job.runtime.Logger.Info("checkAllRedisInCluster success") + return nil +} + +// checkAllSlaveLinkStatus 连接所有redis slave, +// 根据 info replication 获取其role 信息, +// 如果role 是 slave,则获取其 master addr, +// 确保其 master addr 与 job.params.RedisMasterSlavePair 中的 master addr 一致, +// 如果一致则继续检查master_link_status==up、master_last_binlog_seconds_ago<10 ,不一致报错; +// 如果 role 是 master,则获取其 slave 列表, +// 如果job.params.RedisMasterSlavePair 中的 master addr 在 slave 列表中, +// 则代表已经 cluster failover过了,否则报错; +func (job *RedisClusterFailover) checkAllSlaveLinkStatus() (err error) { + var replMasterHost, replMasterPort, replMasterAddr, replLinkStatus string + var masterLastIOSecAgo int + for _, ms := range job.params.RedisMasterSlavePairs { + rc, err := myredis.NewRedisClientWithTimeout(ms.Slave.Addr(), job.params.RedisPassword, 0, + consts.TendisTypeRedisInstance, 5*time.Second) + if err != nil { + return err + } + defer rc.Close() + infoRepl, err := rc.Info("replication") + if err != nil { + return err + } + if infoRepl["role"] == consts.RedisSlaveRole { + replMasterHost = infoRepl["master_host"] + replMasterPort = infoRepl["master_port"] + replLinkStatus = infoRepl["master_link_status"] + replMasterAddr = fmt.Sprintf("%s:%s", replMasterHost, replMasterPort) + if replMasterAddr != ms.Master.Addr() { + err = fmt.Errorf("slave:%s master addr:%s not equal to master addr:%s", ms.Slave.Addr(), + replMasterAddr, ms.Master.Addr()) + job.runtime.Logger.Error("checkAllSlaveLinkStatus failed,err:%v", err) + return err + } + if replLinkStatus != consts.MasterLinkStatusUP { + err = fmt.Errorf("slave:%s master link status:%s not equal to up", ms.Slave.Addr(), replLinkStatus) + job.runtime.Logger.Error("checkAllSlaveLinkStatus failed,err:%v", err) + return err + } + masterLastIOSecAgo, err = strconv.Atoi(infoRepl["master_last_io_seconds_ago"]) + if err != nil { + err = fmt.Errorf("slave:%s master last io seconds ago:%s convert to int failed,err:%v", + ms.Slave.Addr(), infoRepl["master_last_io_seconds_ago"], err) + job.runtime.Logger.Error("checkAllSlaveLinkStatus failed,err:%v", err) + return err + } + if masterLastIOSecAgo > 10 { + err = fmt.Errorf("slave:%s master last binlog seconds ago:%d greater than 10", ms.Slave.Addr(), masterLastIOSecAgo) + job.runtime.Logger.Error("checkAllSlaveLinkStatus failed,err:%v", err) + return err + } + } else if infoRepl["role"] == consts.RedisMasterRole { + _, slaveMap, err := rc.GetInfoReplSlaves() + if err != nil { + return err + } + if _, ok := slaveMap[ms.Master.Addr()]; !ok { + err = fmt.Errorf("old_slave:%s now is master,but old_master not in new_slaves:[%s]", + ms.Slave.Addr(), util.ToString(slaveMap)) + job.runtime.Logger.Error("checkAllSlaveLinkStatus failed,err:%v", err) + return err + } + } else { + err = fmt.Errorf("redis node:%s role:%s not equal to slave or master", ms.Slave.Addr(), infoRepl["role"]) + job.runtime.Logger.Error("checkAllSlaveLinkStatus failed,err:%v", err) + return err + } + } + job.runtime.Logger.Info("checkAllSlaveLinkStatus success") + return nil +} + +// groupRunFailoverAndWaitReplicateStateOk 按照 slave_ip 分组, +// 生成多个 redisFailOverTask保存在 groupTasks=map[string][]*redisFailOverTask中,同时记录下slave_ip对应tasks 的最大数量 +// 例如:groupTasks=map[string][]*redisFailOverTask{"slave_ip1":[task1,task2],"slave_ip2":[task3,task4,task5]} +// 利用 groupTasks,实现相同 slave_ip 上 tasks 串行执行,多个 slave_ip 上 tasks 并行执行 +func (job *RedisClusterFailover) groupRunFailoverAndWaitReplicateStateOk() (err error) { + var slaveIP string + groupTasks := make(map[string][]*redisFailOverTask) + maxTaskCount := 0 + for _, ms := range job.params.RedisMasterSlavePairs { + slaveIP, _, err = util.AddrToIpPort(ms.Slave.Addr()) + if err != nil { + job.runtime.Logger.Error(err.Error()) + return err + } + if _, ok := groupTasks[slaveIP]; !ok { + groupTasks[slaveIP] = []*redisFailOverTask{} + } + task := &redisFailOverTask{ + MasterAddr: ms.Master.Addr(), + SlaveAddr: ms.Slave.Addr(), + RedisPassword: job.params.RedisPassword, + Force: job.params.Force, + } + groupTasks[slaveIP] = append(groupTasks[slaveIP], task) + if len(groupTasks[slaveIP]) > maxTaskCount { + maxTaskCount = len(groupTasks[slaveIP]) + } + } + job.runtime.Logger.Info("groupTasks:%+v,maxTaskCount:%d", groupTasks, maxTaskCount) + for i := 0; i < maxTaskCount; i++ { + currTasks := []*redisFailOverTask{} + for slaveIP := range groupTasks { + if len(groupTasks[slaveIP]) > i { + currTasks = append(currTasks, groupTasks[slaveIP][i]) + } + } + // 串行执行 cluster failover + for _, task := range currTasks { + taskItem := task + taskItem.RunFailOverAndWaitDone() + if taskItem.Err != nil { + return taskItem.Err + } + } + // 并行执行 waitReplicateStateOK + wg := sync.WaitGroup{} + for _, task := range currTasks { + taskItem := task + wg.Add(1) + go func(item *redisFailOverTask) { + defer wg.Done() + item.WaitReplicateStateOK() + }(taskItem) + } + wg.Wait() + for _, task := range currTasks { + taskItem := task + if taskItem.Err != nil { + return taskItem.Err + } + } + } + job.runtime.Logger.Info("groupRunFailoverAndWaitReplicateStateOk success") + return nil +} + +// Retry times +func (job *RedisClusterFailover) Retry() uint { + return 2 +} + +// Rollback rollback +func (job *RedisClusterFailover) Rollback() error { + return nil +} + +type redisFailOverTask struct { + MasterAddr string `json:"master_addr"` + SlaveAddr string `json:"slave_addr"` + RedisPassword string `json:"redis_password"` + Force bool `json:"force"` + slaveCli *myredis.RedisClient + Err error `json:"err"` +} + +// newSlaveCli 新建slaveCli +func (task *redisFailOverTask) newSlaveCli() { + task.slaveCli, task.Err = myredis.NewRedisClientWithTimeout(task.SlaveAddr, task.RedisPassword, 0, + consts.TendisTypeRedisInstance, 5*time.Second) +} + +// isFailOverDone 是否fail over 完成,如果slave 已经是 master,且master 已经是'它'的slave,则认为fail over 完成 +func (task *redisFailOverTask) isFailOverDone() (done bool) { + infoRepl, err := task.slaveCli.Info("replication") + if err != nil { + return + } + var slaveMap map[string]*myredis.InfoReplSlave + if infoRepl["role"] == consts.RedisMasterRole { + if task.Force { + // 如果是强制 failover,则不检查old_master 是否变成了 new_slave + done = true + return + } + _, slaveMap, task.Err = task.slaveCli.GetInfoReplSlaves() + if task.Err != nil { + return + } + if _, ok := slaveMap[task.MasterAddr]; ok { + done = true + return + } + } + return done +} + +// RunFailOver slave 上 s执行 cluster failover +func (task *redisFailOverTask) RunFailOver() { + if task.Force { + task.Err = task.slaveCli.ClusterFailOver("takeover") + } else { + task.Err = task.slaveCli.ClusterFailOver("") + } + if task.Err != nil { + return + } + return +} + +// RunFailOverAndWaitDone 执行 cluster failover后,直到slave 成为 master,且master 成为 slave +// 最大等待 60s,每隔 2s 检查一次,每隔10秒打印一次日志,如果超时,则报错 +func (task *redisFailOverTask) RunFailOverAndWaitDone() { + task.newSlaveCli() + if task.Err != nil { + return + } + defer task.slaveCli.Close() + done := task.isFailOverDone() + if done { + mylog.Logger.Info("slave:%s is already master,do nothing", task.SlaveAddr) + return + } + task.RunFailOver() + if task.Err != nil { + return + } + for i := 0; i < 30; i++ { + time.Sleep(2 * time.Second) + done = task.isFailOverDone() + if done { + mylog.Logger.Info("slave:%s is already master,'cluster failover' success", task.SlaveAddr) + return + } + if i%5 == 0 { + mylog.Logger.Info("waitFailOverDone slave:%s not become master,wait 2s", task.SlaveAddr) + } + } + task.Err = fmt.Errorf("waitFailOverDone slave:%s not become master,wait 60s timeout", task.SlaveAddr) + mylog.Logger.Error(task.Err.Error()) + return +} + +// WaitReplicateStateOK 等待master 成为 slave,'它'的master_host:master_port 等于 SlaveAddr,且 master_link_status为up +// 最大等待 2h,每隔 2s 检查一次,每隔30秒打印一次日志,如果超时,则报错; +func (task *redisFailOverTask) WaitReplicateStateOK() { + if task.Force { + mylog.Logger.Info(fmt.Sprintf("force failover,not wait old_master:%s replicate state ok", task.MasterAddr)) + return + } + var masterCli *myredis.RedisClient + masterCli, task.Err = myredis.NewRedisClientWithTimeout(task.MasterAddr, task.RedisPassword, 0, + consts.TendisTypeRedisInstance, 5*time.Second) + if task.Err != nil { + return + } + defer masterCli.Close() + for i := 0; i < 3600; i++ { + time.Sleep(2 * time.Second) + infoRepl, err := masterCli.Info("replication") + if err != nil { + task.Err = err + return + } + if infoRepl["role"] == consts.RedisSlaveRole { + currMasterAddr := fmt.Sprintf("%s:%s", infoRepl["master_host"], infoRepl["master_port"]) + if currMasterAddr == task.SlaveAddr && infoRepl["master_link_status"] == consts.MasterLinkStatusUP { + mylog.Logger.Info("master:%s become a slave of slave:%s and master_link_status==up", + task.MasterAddr, task.SlaveAddr) + masterCli.ConfigRewrite() + return + } + } + if i%15 == 0 { + mylog.Logger.Info("waitReplicateStateOK master:%s not become slave of slave:%s and master_link_status:%s,wait 30s", + task.MasterAddr, task.SlaveAddr, infoRepl["master_link_status"]) + } + } + task.Err = fmt.Errorf("waitReplicateStateOK master:%s not become slave of slave:%s,wait 2h timeout", + task.MasterAddr, task.SlaveAddr) + mylog.Logger.Error(task.Err.Error()) + return +} diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_switch.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_switch.go index 3208a039b6..bdedc83864 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_switch.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_switch.go @@ -266,6 +266,10 @@ func (job *RedisSwitch) doSlaveOfNoOne4NewMaster(ip string, port int, pass strin job.runtime.Logger.Error("[%s] exec slaveof No oNE for failed:%s", newMasterAddr, rst) return fmt.Errorf("[%s] slaveofNooNE:%s", newMasterAddr, rst) } + _, err = newMasterConn.ConfigRewrite() + if err != nil { + return fmt.Errorf("[%s] exec ConfigRewrite for failed:%+v", newMasterAddr, err) + } job.runtime.Logger.Info("[%s] exec slaveof No oNE for result:%s", newMasterAddr, rst) return nil } diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_version_update.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_version_update.go new file mode 100644 index 0000000000..808a972405 --- /dev/null +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_version_update.go @@ -0,0 +1,486 @@ +package atomredis + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/go-playground/validator/v10" + + "dbm-services/redis/db-tools/dbactuator/models/myredis" + "dbm-services/redis/db-tools/dbactuator/pkg/common" + "dbm-services/redis/db-tools/dbactuator/pkg/consts" + "dbm-services/redis/db-tools/dbactuator/pkg/jobruntime" + "dbm-services/redis/db-tools/dbactuator/pkg/util" +) + +// RedisVersionUpdateParams redis 版本更新参数 +type RedisVersionUpdateParams struct { + common.MediaPkg + IP string `json:"ip" validate:"required"` + Ports []int `json:"ports" validate:"required"` + Password string `json:"password" validate:"required"` + Role string `json:"role" validate:"required"` // redis_master or redis_slave +} + +// RedisVersionUpdate TODO +type RedisVersionUpdate struct { + runtime *jobruntime.JobGenericRuntime + params RedisVersionUpdateParams + localPkgBaseName string + AddrMapCli map[string]*myredis.RedisClient `json:"addr_map_cli"` +} + +// 无实际作用,仅确保实现了 jobruntime.JobRunner 接口 +var _ jobruntime.JobRunner = (*RedisVersionUpdate)(nil) + +// NewRedisVersionUpdate new +func NewRedisVersionUpdate() jobruntime.JobRunner { + return &RedisVersionUpdate{} +} + +// Init prepare run env +func (job *RedisVersionUpdate) Init(m *jobruntime.JobGenericRuntime) error { + job.runtime = m + err := json.Unmarshal([]byte(job.runtime.PayloadDecoded), &job.params) + if err != nil { + job.runtime.Logger.Error(fmt.Sprintf("json.Unmarshal failed,err:%+v", err)) + return err + } + // 参数有效性检查 + validate := validator.New() + err = validate.Struct(job.params) + if err != nil { + if _, ok := err.(*validator.InvalidValidationError); ok { + job.runtime.Logger.Error("RedisVersionUpdate Init params validate failed,err:%v,params:%+v", + err, job.params) + return err + } + for _, err := range err.(validator.ValidationErrors) { + job.runtime.Logger.Error("RedisVersionUpdate Init params validate failed,err:%v,params:%+v", + err, job.params) + return err + } + } + if len(job.params.Ports) == 0 { + err = fmt.Errorf("RedisVersionUpdate Init ports(%+v) is empty", job.params.Ports) + job.runtime.Logger.Error(err.Error()) + return err + } + return nil +} + +// Name 原子任务名 +func (job *RedisVersionUpdate) Name() string { + return "redis_version_update" +} + +// Run Command Run +func (job *RedisVersionUpdate) Run() (err error) { + err = myredis.LocalRedisConnectTest(job.params.IP, job.params.Ports, job.params.Password) + if err != nil { + return err + } + err = job.allInstsAbleToConnect() + if err != nil { + return err + } + defer job.allInstDisconnect() + + if job.params.Role == consts.MetaRoleRedisMaster { + err = job.isAllInstanceMaster() + if err != nil { + return err + } + } else if job.params.Role == consts.MetaRoleRedisSlave { + err = job.isAllInstanceSlave() + if err != nil { + return err + } + } else { + err = fmt.Errorf("role:%s not support", job.params.Role) + job.runtime.Logger.Error(err.Error()) + return err + } + err = job.getLocalRedisPkgBaseName() + if err != nil { + return err + } + err = job.params.Check() + if err != nil { + job.runtime.Logger.Error(err.Error()) + return err + } + err = job.checkRedisLocalPkgAndTargetPkgSameType() + if err != nil { + return err + } + // 当前/usr/local/redis 指向版本不是 目标版本 + if job.localPkgBaseName != job.params.GePkgBaseName() { + err = job.untarMedia() + if err != nil { + return err + } + // 先 stop 所有 redis + for _, port := range job.params.Ports { + err = job.checkAndBackupRedis(port) + if err != nil { + return + } + err = job.stopRedis(port) + if err != nil { + return err + } + } + // 更新 /usr/local/redis 软链接 + err = job.updateFileLink() + if err != nil { + return err + } + // 再 start 所有 redis + for _, port := range job.params.Ports { + err = job.startRedis(port) + if err != nil { + return err + } + } + } + // 当前 /usr/local/redis 指向版本已经是 目标版本 + // 检查每个redis 运行版本是否是目标版本,如果不是则重启 + for _, port := range job.params.Ports { + addr := fmt.Sprintf("%s:%d", job.params.IP, port) + cli := job.AddrMapCli[addr] + ok, err := job.isRedisRuntimeVersionOK(cli) + if err != nil { + return err + } + if ok { + // 当前 redis 运行版本已经是目标版本 + continue + } + err = job.checkAndBackupRedis(port) + if err != nil { + return err + } + // 当前 redis 运行版本不是目标版本 + err = job.stopRedis(port) + if err != nil { + return err + } + err = job.startRedis(port) + if err != nil { + return err + } + } + + return nil +} + +func (job *RedisVersionUpdate) getLocalRedisPkgBaseName() (err error) { + redisSoftLink := filepath.Join(consts.UsrLocal, "redis") + _, err = os.Stat(redisSoftLink) + if err != nil && os.IsNotExist(err) { + err = fmt.Errorf("redis soft link(%s) not exist", redisSoftLink) + job.runtime.Logger.Error(err.Error()) + return err + } + realLink, err := os.Readlink(redisSoftLink) + if err != nil { + err = fmt.Errorf("readlink redis soft link(%s) failed,err:%+v", redisSoftLink, err) + job.runtime.Logger.Error(err.Error()) + return err + } + job.localPkgBaseName = filepath.Base(realLink) + job.runtime.Logger.Info("before update,%s->%s", redisSoftLink, realLink) + return nil +} + +// checkRedisLocalPkgAndTargetPkgSameType 检查reids本地包与目标包是同一类型,避免 cache redis 传的是 tendisplus 的包 +func (job *RedisVersionUpdate) checkRedisLocalPkgAndTargetPkgSameType() (err error) { + targetPkgName := job.params.GePkgBaseName() + targetDbType := util.GetRedisDbTypeByPkgName(targetPkgName) + localDbType := util.GetRedisDbTypeByPkgName(job.localPkgBaseName) + if targetDbType != localDbType { + err = fmt.Errorf("/usr/local/redis->%s cannot update to %s", job.localPkgBaseName, targetPkgName) + job.runtime.Logger.Error(err.Error()) + return err + } + return nil +} + +// allInstsAbleToConnect 检查所有实例可连接 +func (job *RedisVersionUpdate) allInstsAbleToConnect() (err error) { + instsAddrs := make([]string, 0, len(job.params.Ports)) + job.AddrMapCli = make(map[string]*myredis.RedisClient, len(job.params.Ports)) + for _, port := range job.params.Ports { + instsAddrs = append(instsAddrs, fmt.Sprintf("%s:%d", job.params.IP, port)) + } + for _, addr := range instsAddrs { + cli, err := myredis.NewRedisClientWithTimeout(addr, job.params.Password, 0, + consts.TendisTypeRedisInstance, 5*time.Second) + if err != nil { + return err + } + job.AddrMapCli[addr] = cli + } + job.runtime.Logger.Info("all redis instances able to connect,(%+v)", instsAddrs) + return nil +} + +// allInstDisconnect 所有实例断开连接 +func (job *RedisVersionUpdate) allInstDisconnect() { + for _, cli := range job.AddrMapCli { + cli.Close() + } +} + +func (job *RedisVersionUpdate) isAllInstanceMaster() (err error) { + for _, item := range job.AddrMapCli { + cli := item + repls, err := cli.Info("replication") + if err != nil { + return err + } + if repls["role"] != consts.RedisMasterRole { + err = fmt.Errorf("redis instance(%s) is not master", cli.Addr) + job.runtime.Logger.Error(err.Error()) + return err + } + // 是否要检查 master 是否还有 slave? + } + return nil +} +func (job *RedisVersionUpdate) isAllInstanceSlave() (err error) { + for _, item := range job.AddrMapCli { + cli := item + repls, err := cli.Info("replication") + if err != nil { + return err + } + if repls["role"] != consts.RedisSlaveRole { + err = fmt.Errorf("redis instance(%s) is not slave", cli.Addr) + job.runtime.Logger.Error(err.Error()) + return err + } + if repls["master_link_status"] != consts.MasterLinkStatusUP { + err = fmt.Errorf("redis instance(%s) master_link_status:%s is not UP", cli.Addr, repls["master_link_status"]) + job.runtime.Logger.Error(err.Error()) + return err + } + master_last_io_seconds_ago, err := strconv.Atoi(repls["master_last_io_seconds_ago"]) + if err != nil { + err = fmt.Errorf("redis instance(%s) master_last_io_seconds_ago:%s is not int", cli.Addr, + repls["master_last_io_seconds_ago"]) + job.runtime.Logger.Error(err.Error()) + return err + } + if master_last_io_seconds_ago > 20 { + err = fmt.Errorf("redis instance(%s) master_last_io_seconds_ago:%d is greater than 20", cli.Addr, + master_last_io_seconds_ago) + job.runtime.Logger.Error(err.Error()) + return err + } + job.runtime.Logger.Info( + "redis instance(%s) is slave,master(%s:%s),master_link_status:%s,master_last_io_seconds_ago:%d", + cli.Addr, repls["master_host"], repls["master_port"], + repls["master_link_status"], master_last_io_seconds_ago) + } + return nil +} + +// untarMedia 解压介质 +func (job *RedisVersionUpdate) untarMedia() (err error) { + err = job.params.Check() + if err != nil { + job.runtime.Logger.Error(err.Error()) + return err + } + pkgAbsPath := job.params.GetAbsolutePath() + untarCmd := fmt.Sprintf("tar -zxf %s -C %s", pkgAbsPath, consts.UsrLocal) + job.runtime.Logger.Info(untarCmd) + _, err = util.RunBashCmd(untarCmd, "", nil, 10*time.Minute) + if err != nil { + return err + } + job.runtime.Logger.Info("untar %s success", pkgAbsPath) + return nil +} + +// updateFileLink 更新 /usr/local/redis 软链接 +func (job *RedisVersionUpdate) updateFileLink() (err error) { + pkgBaseName := job.params.GePkgBaseName() + redisSoftLink := filepath.Join(consts.UsrLocal, "redis") + _, err = os.Stat(redisSoftLink) + if err == nil { + // 删除 /usr/local/redis 软链接 + err = os.Remove(redisSoftLink) + if err != nil { + err = fmt.Errorf("remove redis soft link(%s) failed,err:%+v", redisSoftLink, err) + job.runtime.Logger.Error(err.Error()) + return err + } + } + // 创建 /usr/local/redis -> /usr/local/$pkgBaseName 软链接 + err = os.Symlink(filepath.Join(consts.UsrLocal, pkgBaseName), redisSoftLink) + if err != nil { + err = fmt.Errorf("os.Symlink %s -> %s fail,err:%s", redisSoftLink, filepath.Join(consts.UsrLocal, pkgBaseName), err) + job.runtime.Logger.Error(err.Error()) + return + } + util.LocalDirChownMysql(redisSoftLink) + util.LocalDirChownMysql(redisSoftLink + "/") + job.runtime.Logger.Info("create softLink success,%s -> %s", redisSoftLink, filepath.Join(consts.UsrLocal, pkgBaseName)) + return nil +} + +// checkAndBackupRedis 如果有必要先备份reids +func (job *RedisVersionUpdate) checkAndBackupRedis(port int) (err error) { + // 如果是 master 且是 cache,则先备份 + addr := fmt.Sprintf("%s:%d", job.params.IP, port) + if job.params.Role != consts.MetaRoleRedisMaster { + job.runtime.Logger.Info("redis instance(%s) is not master,skip backup", addr) + return nil + } + cli := job.AddrMapCli[addr] + var dbType string + dbType, err = cli.GetTendisType() + if err != nil { + return err + } + if dbType != consts.TendisTypeRedisInstance { + job.runtime.Logger.Info("redis instance(%s) is not cache,skip backup", addr) + return nil + } + job.runtime.Logger.Info("redis instance(%s) is cache,start bgsave", addr) + err = cli.BgSaveAndWaitForFinish() + if err != nil { + return nil + } + return +} + +func (job *RedisVersionUpdate) stopRedis(port int) (err error) { + stopScript := filepath.Join(consts.UsrLocal, "redis", "bin", "stop-redis.sh") + // 先执行 stop-redis.sh 脚本,再检查端口是否还在使用 + job.runtime.Logger.Info(fmt.Sprintf("su %s -c \"%s\"", + consts.MysqlAaccount, stopScript+" "+strconv.Itoa(port)+" xxxx")) + _, err = util.RunLocalCmd("su", + []string{consts.MysqlAaccount, "-c", stopScript + " " + strconv.Itoa(port) + " " + job.params.Password}, + "", nil, 10*time.Minute) + if err != nil { + return err + } + maxRetryTimes := 5 + inUse := false + for maxRetryTimes >= 0 { + maxRetryTimes-- + inUse, err = util.CheckPortIsInUse(job.params.IP, strconv.Itoa(port)) + if err != nil { + job.runtime.Logger.Error(fmt.Sprintf("check %s:%d inUse failed,err:%v", job.params.IP, port, err)) + return err + } + if !inUse { + break + } + time.Sleep(2 * time.Second) + } + if inUse { + err = fmt.Errorf("stop redis instance(%s:%d) failed,port:%d still using", job.params.IP, port, port) + job.runtime.Logger.Error(err.Error()) + return err + } + job.runtime.Logger.Info("stop redis instance(%s:%d) success", job.params.IP, port) + return nil +} + +func (job *RedisVersionUpdate) startRedis(port int) (err error) { + startScript := filepath.Join(consts.UsrLocal, "redis", "bin", "start-redis.sh") + job.runtime.Logger.Info(fmt.Sprintf("su %s -c \"%s\" 2>/dev/null", + consts.MysqlAaccount, startScript+" "+strconv.Itoa(port))) + _, err = util.RunLocalCmd("su", + []string{consts.MysqlAaccount, "-c", startScript + " " + strconv.Itoa(port) + " 2>/dev/null"}, + "", nil, 10*time.Minute) + if err != nil { + return err + } + addr := fmt.Sprintf("%s:%d", job.params.IP, port) + cli, err := myredis.NewRedisClientWithTimeout(addr, job.params.Password, 0, + consts.TendisTypeRedisInstance, 10*time.Second) + if err != nil { + return err + } + job.AddrMapCli[addr] = cli + job.runtime.Logger.Info("start redis instance(%s:%d) success", job.params.IP, port) + + if job.params.Role == consts.MetaRoleRedisMaster { + return nil + } + // 多次检测直到 redis instance 成为 slave,且同步状态正常 + _, err = job.isReplStateOK(cli, 1*time.Minute) + if err != nil { + return err + } + return nil +} + +func (job *RedisVersionUpdate) isReplStateOK(cli *myredis.RedisClient, timeout time.Duration) (ok bool, err error) { + maxRetryTimes := timeout / (2 * time.Second) + if maxRetryTimes == 0 { + maxRetryTimes = 1 + } + for maxRetryTimes >= 0 { + maxRetryTimes-- + time.Sleep(2 * time.Second) + err = nil + repls, err := cli.Info("replication") + if err != nil { + return false, err + } + if repls["role"] != consts.RedisSlaveRole { + job.runtime.Logger.Info("redis instance(%s) role:%s is not slave", cli.Addr, repls["role"]) + continue + } + if repls["master_link_status"] != consts.MasterLinkStatusUP { + job.runtime.Logger.Info("redis instance(%s) master_link_status:%s is not UP", cli.Addr, repls["master_link_status"]) + continue + } + job.runtime.Logger.Info("redis instance(%s) is slave,master(%s:%s),master_link_status:%s", + cli.Addr, repls["master_host"], repls["master_port"], repls["master_link_status"]) + return true, nil + } + err = fmt.Errorf("cost %d seconds, redis instance(%s) is not slave", int(timeout.Seconds()), cli.Addr) + job.runtime.Logger.Error(err.Error()) + return false, err +} + +func (job *RedisVersionUpdate) isRedisRuntimeVersionOK(cli *myredis.RedisClient) (ok bool, err error) { + repls, err := cli.Info("server") + if err != nil { + return false, err + } + runtimeBaseVer, runtimeSubVer, err := util.VersionParse(repls["redis_version"]) + if err != nil { + return false, err + } + pkgBaseVer, pkgSubVer, err := util.VersionParse(job.params.GePkgBaseName()) + if err != nil { + return false, err + } + if runtimeBaseVer != pkgBaseVer || runtimeSubVer != pkgSubVer { + return false, nil + } + return true, nil +} + +// Retry times +func (job *RedisVersionUpdate) Retry() uint { + return 2 +} + +// Rollback rollback +func (job *RedisVersionUpdate) Rollback() error { + return nil +} diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go b/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go index 2238126edc..98e7baf1be 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go @@ -201,6 +201,8 @@ func (m *JobGenericManager) atomjobsMapperLoading() { m.atomJobMapper[atomredis.NewRedisDataStructure().Name()] = atomredis.NewRedisDataStructure m.atomJobMapper[atomredis.NewClusterMeetCheckFinish().Name()] = atomredis.NewClusterMeetCheckFinish m.atomJobMapper[atomredis.NewRedisDtsOnlineSwitch().Name()] = atomredis.NewRedisDtsOnlineSwitch + m.atomJobMapper[atomredis.NewRedisVersionUpdate().Name()] = atomredis.NewRedisVersionUpdate + m.atomJobMapper[atomredis.NewRedisClusterFailover().Name()] = atomredis.NewRedisClusterFailover // scene needs. m.atomJobMapper[atomproxy.NewTwemproxySceneCheckBackends().Name()] = atomproxy.NewTwemproxySceneCheckBackends m.atomJobMapper[atomredis.NewRedisSceneSyncCheck().Name()] = atomredis.NewRedisSceneSyncCheck diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/util/redisutil.go b/dbm-services/redis/db-tools/dbactuator/pkg/util/redisutil.go index f7fdb7c4ed..288867d6a0 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/util/redisutil.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/util/redisutil.go @@ -101,3 +101,15 @@ func StartBkDbmon() (err error) { return } + +// GetRedisDbTypeByPkgName 根据包名推断 dbtype +func GetRedisDbTypeByPkgName(pkgName string) (dbType string) { + if strings.Contains(pkgName, "tendisplus") { + dbType = consts.TendisTypeTendisplusInsance + } else if strings.Contains(pkgName, "2.8.17") && strings.Contains(pkgName, "-rocksdb-") { + dbType = consts.TendisTypeTendisSSDInsance + } else { + dbType = consts.TendisTypeRedisInstance + } + return +} diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/util/util.go b/dbm-services/redis/db-tools/dbactuator/pkg/util/util.go index 0c9f9e52b9..6a70603bb4 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/util/util.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/util/util.go @@ -2,6 +2,7 @@ package util import ( + "encoding/json" "errors" "fmt" "net" @@ -280,3 +281,25 @@ func IsDbmSysKeys(key string) bool { func NewNotFoundErr() error { return errors.New(NotFound) } + +// AddrToIpPort 将ip:port转换为ip,port +func AddrToIpPort(addr string) (ip string, port int, err error) { + ipPort := strings.Split(addr, ":") + if len(ipPort) != 2 { + err = fmt.Errorf("invalid addr:%s", addr) + return + } + ip = ipPort[0] + port, err = strconv.Atoi(ipPort[1]) + if err != nil { + err = fmt.Errorf("invalid addr:%s,err:%v", addr, err) + return + } + return +} + +// ToString string +func ToString(param interface{}) string { + ret, _ := json.Marshal(param) + return string(ret) +} diff --git a/dbm-ui/backend/db_services/redis/redis_dts/constants.py b/dbm-ui/backend/db_services/redis/redis_dts/constants.py index d18ae9176e..a04e88c67a 100644 --- a/dbm-ui/backend/db_services/redis/redis_dts/constants.py +++ b/dbm-ui/backend/db_services/redis/redis_dts/constants.py @@ -209,3 +209,23 @@ class DtsOperateType(str, StructuredEnum): sed -i "/$line/d" /etc/resolv.conf done <<< "$lines" """ + +# 删除 redis config 文件中 slaveof 配置 +REDIS_CONF_DEL_SLAVEOF = """ +source /etc/profile +ports="{}" + +while read -r port +do + # skip empty line + if [[ "$line" =~ ^[[:space:]]*$ ]]; then + continue + fi + if [[ ! -e $REDIS_DATA_DIR/redis/$port ]] + then + echo "$REDIS_DATA_DIR/redis/$port not exist" + exit -1 + fi + sed -e '/^slaveof/d' $REDIS_DATA_DIR/redis/$port/*.conf +done <<< "$ports" +""" diff --git a/dbm-ui/backend/db_services/redis/redis_dts/util.py b/dbm-ui/backend/db_services/redis/redis_dts/util.py index 57be7de5cd..84294f06e5 100644 --- a/dbm-ui/backend/db_services/redis/redis_dts/util.py +++ b/dbm-ui/backend/db_services/redis/redis_dts/util.py @@ -17,6 +17,7 @@ from typing import Dict, List, Tuple from django.db.models import Q +from django.utils.translation import ugettext as _ from backend.components import DBConfigApi, DRSApi from backend.components.dbconfig.constants import FormatType, LevelName @@ -177,8 +178,67 @@ def get_cluster_info_by_id( } except Exception as e: traceback.print_exc() - logger.error(f"get cluster info by domain failed {e}, cluster_id: {cluster_id}") - raise Exception(f"get cluster info by domain failed {e}, cluster_id: {cluster_id}") + logger.error(f"get cluster info by id failed {e}, cluster_id: {cluster_id}") + raise Exception(f"get cluster info by id failed {e}, cluster_id: {cluster_id}") + + +def common_cluster_precheck(bk_biz_id: int, cluster_id: int): + try: + cluster = Cluster.objects.get(bk_biz_id=bk_biz_id, id=cluster_id) + except Cluster.DoesNotExist: + raise Exception(_("redis集群 {} 不存在").format(cluster_id)) + + not_running_proxy = cluster.proxyinstance_set.exclude(status=InstanceStatus.RUNNING) + if not_running_proxy.exists(): + raise Exception( + _("redis集群 {} 存在 {} 个状态非 running 的 proxy").format(cluster.immute_domain, len(not_running_proxy)) + ) + + not_running_redis = cluster.storageinstance_set.exclude(status=InstanceStatus.RUNNING) + if not_running_redis.exists(): + raise Exception( + _("redis集群 {} 存在 {} 个状态非 running 的 redis").format(cluster.immute_domain, len(not_running_redis)) + ) + + cluster_info = get_cluster_info_by_id(bk_biz_id=bk_biz_id, cluster_id=cluster_id) + proxy_addrs = [r.ip_port for r in cluster.proxyinstance_set.all()] + try: + DRSApi.redis_rpc( + { + "addresses": proxy_addrs, + "db_num": 0, + "password": cluster_info["cluster_password"], + "command": "ping", + "bk_cloud_id": cluster_info["bk_cloud_id"], + } + ) + except Exception: + raise Exception(_("redis集群:{} proxy:{} ping失败").format(cluster.immute_domain, proxy_addrs)) + + redis_addrs = [r.ip_port for r in cluster.storageinstance_set.all()] + try: + DRSApi.redis_rpc( + { + "addresses": redis_addrs, + "db_num": 0, + "password": cluster_info["redis_password"], + "command": "ping", + "bk_cloud_id": cluster_info["bk_cloud_id"], + } + ) + except Exception: + raise Exception(_("redis集群:{} redis:{} ping失败").format(cluster.immute_domain, redis_addrs)) + master_insts = cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_MASTER.value) + if not master_insts: + raise Exception(_("redis集群 {} 没有master??").format(cluster.immute_domain)) + for master_obj in master_insts: + if not master_obj.as_ejector or not master_obj.as_ejector.first(): + raise Exception( + _("redis集群{} master {} 没有 slave").format( + cluster.immute_domain, + master_obj.ip_port, + ) + ) def get_cluster_one_running_master(bk_biz_id: int, cluster_id: int) -> dict: diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py index 9a909034d8..a082aeaf68 100644 --- a/dbm-ui/backend/flow/consts.py +++ b/dbm-ui/backend/flow/consts.py @@ -377,6 +377,8 @@ class RedisActuatorActionEnum(str, StructuredEnum): REMOVE_DTS_SERVER = EnumField("remove_dts_server", _("remove_dts_server")) DATA_STRUCTURE = EnumField("data_structure", _("data_structure")) CLUSTER_MEET_CHECK = EnumField("clustermeet_checkfinish", _("clustermeet_checkfinish")) + VERSION_UPDATE = EnumField("version_update", _("version_update")) + CLUSTER_FAILOVER = EnumField("cluster_failover", _("cluster_failover")) class EsActuatorActionEnum(str, StructuredEnum): diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/common/get_file_list.py b/dbm-ui/backend/flow/engine/bamboo/scene/common/get_file_list.py index c1dc2f1406..7f75002d80 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/common/get_file_list.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/common/get_file_list.py @@ -15,6 +15,7 @@ from backend.db_package.models import Package from backend.db_services.version.constants import PredixyVersion, TwemproxyVersion from backend.flow.consts import CLOUD_SSL_PATH, MediumEnum +from backend.flow.utils.redis.redis_util import get_latest_redis_package_by_version class GetFileList(object): @@ -157,12 +158,7 @@ def redis_cluster_apply_backend(self, db_version: str) -> list: """ 部署redis,所有节点需要的redis pkg包 """ - pkg_type = MediumEnum.Redis - if db_version.startswith("TendisSSD"): - pkg_type = MediumEnum.TendisSsd - if db_version.startswith("Tendisplus"): - pkg_type = MediumEnum.TendisPlus - redis_pkg = Package.get_latest_package(version=db_version, pkg_type=pkg_type, db_type=DBType.Redis) + redis_pkg = get_latest_redis_package_by_version(db_version) redis_tool_pkg = Package.get_latest_package( version=MediumEnum.Latest, pkg_type=MediumEnum.RedisTools, db_type=DBType.Redis ) @@ -176,6 +172,20 @@ def redis_cluster_apply_backend(self, db_version: str) -> list: f"{env.BKREPO_PROJECT}/{env.BKREPO_BUCKET}/{bkdbmon_pkg.path}", ] + def redis_cluster_version_update(self, db_version: str) -> list: + """ + redis集群版本升级 + """ + redis_pkg = get_latest_redis_package_by_version(db_version) + bkdbmon_pkg = Package.get_latest_package( + version=MediumEnum.Latest, pkg_type=MediumEnum.DbMon, db_type=DBType.Redis + ) + return [ + f"{env.BKREPO_PROJECT}/{env.BKREPO_BUCKET}/{self.actuator_pkg.path}", + f"{env.BKREPO_PROJECT}/{env.BKREPO_BUCKET}/{redis_pkg.path}", + f"{env.BKREPO_PROJECT}/{env.BKREPO_BUCKET}/{bkdbmon_pkg.path}", + ] + def redis_actuator_backend(self) -> list: """ 下发redis actuator diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_data_copy.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_data_copy.py index e357216bee..52478ac8de 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_data_copy.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_data_copy.py @@ -22,7 +22,6 @@ from backend.constants import IP_PORT_DIVIDER from backend.db_meta.enums import InstanceRole, InstanceStatus from backend.db_meta.models import AppCache, Cluster -from backend.db_package.models import Package from backend.db_proxy.constants import ExtensionType from backend.db_proxy.models import DBExtension from backend.db_services.redis.redis_dts.constants import ( @@ -43,6 +42,7 @@ ) from backend.db_services.redis.redis_dts.models import TbTendisDTSJob, TbTendisDtsTask from backend.db_services.redis.redis_dts.util import ( + common_cluster_precheck, complete_redis_dts_kwargs_dst_data, complete_redis_dts_kwargs_src_data, get_cluster_info_by_id, @@ -57,7 +57,7 @@ is_tendisssd_instance_type, is_twemproxy_proxy_type, ) -from backend.flow.consts import ConfigFileEnum, MediumEnum, StateType, WriteContextOpType +from backend.flow.consts import ConfigFileEnum, StateType, WriteContextOpType from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList from backend.flow.engine.bamboo.scene.redis.atom_jobs.redis_dts import ( @@ -86,6 +86,7 @@ from backend.flow.utils.redis.redis_proxy_util import ( check_cluster_proxy_backends_consistent, get_cache_backup_mode, + get_db_versions_by_cluster_type, get_twemproxy_cluster_server_shards, ) from backend.utils.time import datetime2str @@ -236,14 +237,7 @@ def data_copy_precheck(self): src_cluster = Cluster.objects.get(bk_biz_id=bk_biz_id, id=int(info["src_cluster"])) except Cluster.DoesNotExist: raise Exception("src_cluster {} does not exist".format(info["src_cluster"])) - # 源集群是否每个running的master都有slave - running_masters = src_cluster.storageinstance_set.filter( - status=InstanceStatus.RUNNING.value, instance_role=InstanceRole.REDIS_MASTER.value - ) - for master in running_masters: - if not master.as_ejector or not master.as_ejector.first(): - master_inst = "{}:{}".format(master.machine.ip, master.port) - raise Exception(_("源集群{}存在master:{}没有slave").format(info["src_cluster"], master_inst)) + common_cluster_precheck(bk_biz_id, src_cluster.id) # 检查源集群 bk_cloud_id 是否有dns nameserver self.__get_dns_nameserver(src_cluster.bk_cloud_id) @@ -252,8 +246,9 @@ def data_copy_precheck(self): try: dst_cluster = Cluster.objects.get(bk_biz_id=bk_biz_id, id=int(info["dst_cluster"])) except Cluster.DoesNotExist: - raise Exception("dst_cluster {} does not exist".format(info["dst_cluster"])) + raise Exception(_("目标集群 {} 不存在").format(info["dst_cluster"])) + common_cluster_precheck(bk_biz_id, dst_cluster.id) # 检查目标集群 bk_cloud_id 是否有dns nameserver self.__get_dns_nameserver(dst_cluster.bk_cloud_id) @@ -268,10 +263,10 @@ def data_copy_precheck(self): ) except TbTendisRollbackTasks.DoesNotExist: raise Exception( - "rollback task(src_cluster:{} recovery_time_point:{} \ - destroyed_status:0) does not exist".format( - info["src_cluster"], info["recovery_time_point"] - ) + _( + "回档任务(src_cluster:{} recovery_time_point:{} \ + destroyed_status:0) 不存在?" + ).format(info["src_cluster"], info["recovery_time_point"]) ) # 数据构造临时集群是否可访问 proxy_password_bytes = ast.literal_eval(rollback_task.temp_proxy_password) @@ -288,9 +283,7 @@ def data_copy_precheck(self): ) except Exception as e: raise Exception( - "rollback task(temp_cluster_proxy:{}) redis ping failed".format( - rollback_task.temp_cluster_proxy - ) + _("回档临时环境如何(temp_cluster_proxy:{}) ping 失败").format(rollback_task.temp_cluster_proxy) ) try: DRSApi.redis_rpc( @@ -348,24 +341,6 @@ def get_dst_cluster_install_param(self, info: dict) -> dict: install_param["resource_spec"] = info["resource_spec"] return install_param - def get_db_versions_by_cluster_type(self, cluster_type: str) -> list: - if is_redis_instance_type(cluster_type): - ret = Package.objects.filter(db_type=DBType.Redis.value, pkg_type=MediumEnum.Redis.value).values_list( - "version", flat=True - ) - return list(ret) - elif is_tendisplus_instance_type(cluster_type): - ret = Package.objects.filter(db_type=DBType.Redis.value, pkg_type=MediumEnum.TendisPlus.value).values_list( - "version", flat=True - ) - return list(ret) - elif is_tendisssd_instance_type(cluster_type): - ret = Package.objects.filter(db_type=DBType.Redis.value, pkg_type=MediumEnum.TendisSsd.value).values_list( - "version", flat=True - ) - return list(ret) - raise Exception("cluster_type:{} not a redis cluster type?".format(cluster_type)) - def shard_num_or_cluster_type_update_precheck(self): src_cluster_set: set = set() bk_biz_id = self.data["bk_biz_id"] @@ -374,11 +349,8 @@ def shard_num_or_cluster_type_update_precheck(self): raise Exception(_("源集群{}重复了").format(info["src_cluster"])) src_cluster_set.add(info["src_cluster"]) - src_cluster: Cluster = None - try: - src_cluster = Cluster.objects.get(bk_biz_id=bk_biz_id, id=int(info["src_cluster"])) - except Cluster.DoesNotExist: - raise Exception("src_cluster {} does not exist".format(info["src_cluster"])) + common_cluster_precheck(bk_biz_id, int(info["src_cluster"])) + src_cluster = Cluster.objects.get(bk_biz_id=bk_biz_id, id=int(info["src_cluster"])) # 检查源集群 bk_cloud_id 是否有dns nameserver self.__get_dns_nameserver(src_cluster.bk_cloud_id) @@ -386,51 +358,47 @@ def shard_num_or_cluster_type_update_precheck(self): if self.data["ticket_type"] == DtsBillType.REDIS_CLUSTER_SHARD_NUM_UPDATE.value: if info["current_shard_num"] == info["cluster_shard_num"]: raise Exception( - "current_shard_num:{} == cluster_shard_num:{}".format( - info["current_shard_num"], info["cluster_shard_num"] - ), + _("集群当前分片数:{} 等于 目标分片数:{}").format(info["current_shard_num"], info["cluster_shard_num"]), ) running_masters = src_cluster.storageinstance_set.filter( status=InstanceStatus.RUNNING.value, instance_role=InstanceRole.REDIS_MASTER.value ) if running_masters.count() == info["cluster_shard_num"]: raise Exception( - "src_cluster:{} running_masters:{} == cluster_shard_num:{}".format( + _("集群:{} 当前running_master个数:{} 等于 目标分片数:{}").format( src_cluster.immute_domain, running_masters.count(), info["cluster_shard_num"] ), ) if info.get("db_version", "") != "": - if info["db_version"] not in self.get_db_versions_by_cluster_type(src_cluster.cluster_type): + if info["db_version"] not in get_db_versions_by_cluster_type(src_cluster.cluster_type): raise Exception( - "src_cluster:{} db_version:{} not in src_cluster_type:{} db_versions:{}".format( + _("集群:{} 目标版本:{} 不在 集群类型:{} 版本列表:{}中").format( src_cluster.immute_domain, info["db_version"], src_cluster.cluster_type, - self.get_db_versions_by_cluster_type(src_cluster.cluster_type), + get_db_versions_by_cluster_type(src_cluster.cluster_type), ) ) elif self.data["ticket_type"] == DtsBillType.REDIS_CLUSTER_TYPE_UPDATE.value: if info["current_cluster_type"] == info["target_cluster_type"]: raise Exception( - "current_cluster_type:{} == target_cluster_type:{}".format( - info["current_cluster_type"], info["target_cluster_type"] - ), + _("当前集群类型:{} == 目标集群类型:{}").format(info["current_cluster_type"], info["target_cluster_type"]), ) if info["target_cluster_type"] == src_cluster.cluster_type: raise Exception( - "target_cluster_type:{} == src_cluster:{} cluster_type:{}".format( + _("目标集群类型:{} == 集群:{} 当前类型:{}").format( info["target_cluster_type"], src_cluster.immute_domain, src_cluster.cluster_type ), ) if info.get("db_version", "") == "": - raise Exception("src_cluster:{} db_version is empty".format(info["src_cluster"])) - if info["db_version"] not in self.get_db_versions_by_cluster_type(info["target_cluster_type"]): + raise Exception(_("集群:{} 目标版本为空").format(info["src_cluster"])) + if info["db_version"] not in get_db_versions_by_cluster_type(info["target_cluster_type"]): raise Exception( - "src_cluster:{} db_version:{} not in target_cluster_type:{} db_versions:{}".format( + _("集群:{} 目标版本:{} 不在 集群类型:{} 版本列表:{}中").format( info["src_cluster"], info["db_version"], info["target_cluster_type"], - self.get_db_versions_by_cluster_type(info["target_cluster_type"]), + get_db_versions_by_cluster_type(info["target_cluster_type"]), ) ) # 检查所有 src proxys backends 一致 diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_mss.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_mss.py index af8f262509..ed6b679aab 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_mss.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_mss.py @@ -218,7 +218,7 @@ def generate_ms_switch_flow(self, flow_data, act_kwargs, ms_switch, force=False) for master_ip in master_ips: sub_kwargs = deepcopy(act_kwargs) sub_kwargs.cluster["meta_update_ip"] = master_ip - sub_kwargs.cluster["meta_udpate_ports"] = act_kwargs.cluster["master_ports"][master_ip] + sub_kwargs.cluster["meta_update_ports"] = act_kwargs.cluster["master_ports"][master_ip] sub_kwargs.cluster["meta_update_status"] = InstanceStatus.UNAVAILABLE.value sub_kwargs.cluster["meta_func_name"] = RedisDBMeta.instances_failover_4_scene.__name__ sub_acts.append( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_version_update_online.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_version_update_online.py new file mode 100644 index 0000000000..8c1f3e6e0b --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_version_update_online.py @@ -0,0 +1,544 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging.config +import re +from collections import defaultdict +from dataclasses import asdict +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.components import DRSApi +from backend.configuration.constants import DBType +from backend.db_meta.api.cluster import nosqlcomm +from backend.db_meta.enums import InstanceRole, InstanceStatus +from backend.db_meta.models import AppCache, Cluster +from backend.db_package.models import Package +from backend.db_services.redis.redis_dts.constants import REDIS_CONF_DEL_SLAVEOF +from backend.db_services.redis.redis_dts.util import common_cluster_precheck, get_cluster_info_by_id +from backend.db_services.redis.util import is_redis_cluster_protocal, is_twemproxy_proxy_type +from backend.flow.consts import ( + DEFAULT_LAST_IO_SECOND_AGO, + DEFAULT_MASTER_DIFF_TIME, + MediumEnum, + SyncType, + WriteContextOpType, +) +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.engine.bamboo.scene.redis.atom_jobs.redis_makesync import RedisMakeSyncAtomJob +from backend.flow.plugins.components.collections.redis.exec_actuator_script import ExecuteDBActuatorScriptComponent +from backend.flow.plugins.components.collections.redis.exec_shell_script import ExecuteShellScriptComponent +from backend.flow.plugins.components.collections.redis.get_redis_payload import GetRedisActPayloadComponent +from backend.flow.plugins.components.collections.redis.redis_config import RedisConfigComponent +from backend.flow.plugins.components.collections.redis.redis_db_meta import RedisDBMetaComponent +from backend.flow.plugins.components.collections.redis.trans_flies import TransFileComponent +from backend.flow.utils.redis.redis_act_playload import RedisActPayload +from backend.flow.utils.redis.redis_context_dataclass import ActKwargs, CommonContext +from backend.flow.utils.redis.redis_db_meta import RedisDBMeta +from backend.flow.utils.redis.redis_proxy_util import ( + get_cache_backup_mode, + get_db_versions_by_cluster_type, + get_twemproxy_cluster_server_shards, +) +from backend.flow.utils.redis.redis_util import get_latest_redis_package_by_version, version_equal + +logger = logging.getLogger("flow") + + +class RedisClusterVersionUpdateOnline(object): + """ + redis集群在线版本升级 + """ + + def __init__(self, root_id: str, data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param data : 单据传递过来的参数列表,是dict格式 + """ + self.root_id = root_id + self.data = data + self.precheck() + + def precheck(self): + """ + 1. 集群是否存在 + 2. 版本信息是否变化 + 3. 是否存在非 running 状态的 proxy; + 4. 是否存在非 running 状态的 redis; + 5. 连接 proxy 是否正常; + 6. 连接 redis 是否正常; + 7. 是否所有master 都有 slave; + """ + bk_biz_id = self.data["bk_biz_id"] + for input_item in self.data["infos"]: + if not input_item["target_version"]: + raise Exception(_("redis集群 {} 目标版本为空?").format(input_item["cluster_id"])) + common_cluster_precheck(bk_biz_id, input_item["cluster_id"]) + cluster = Cluster.objects.get(bk_biz_id=bk_biz_id, id=input_item["cluster_id"]) + + # 检查版本是否合法 + valid_versions = get_db_versions_by_cluster_type(cluster.cluster_type) + if input_item["target_version"] not in valid_versions: + raise Exception( + _("redis集群 {}目标版本 {} 不合法,合法的版本:{}").format( + cluster.immute_domain, + input_item["target_version"], + valid_versions, + ) + ) + + if cluster.major_version == input_item["target_version"]: + cluster_info = get_cluster_info_by_id(bk_biz_id=bk_biz_id, cluster_id=cluster.id) + self.is_minor_version_update(cluster, cluster_info["redis_password"], input_item["target_version"]) + + # 是否大版本相同情况下,小版本升级 + # 1. 获取一个running_master, 进而获取到 master_addr; + # 2. 通过 redis_rpc 函数执行 info server, 进而获取到当前 current_redis_version; + # 3. 通过 Package包获取当前 target_version 最新包的最新版本 package_latest_version; + # 4. 对比 current_redis_version 和 package_latest_version, 相同则 raise Exception; + def is_minor_version_update(self, cluster: Cluster, redis_password: str, target_version: str): + one_running_master = cluster.storageinstance_set.filter( + instance_role=InstanceRole.REDIS_MASTER.value, status=InstanceStatus.RUNNING + ).first() + if not one_running_master: + raise Exception(_("redis集群 {} 没有running_master??").format(cluster.immute_domain)) + master_addr = one_running_master.ip_port + resp = DRSApi.redis_rpc( + { + "addresses": [master_addr], + "db_num": 0, + "password": redis_password, + "command": "ping", + "bk_cloud_id": cluster.bk_cloud_id, + } + ) + current_redis_version = re.search(r"redis_version:(.*)\r\n", resp[0]["result"]).group(1) + redis_pkg = get_latest_redis_package_by_version(target_version) + if not redis_pkg: + raise Exception(_("目标版本:{} 没有找到其最新的Package信息?").format(target_version)) + package_redis_version = redis_pkg.name + is_equal, err = version_equal(current_redis_version, package_redis_version) + if err: + raise Exception( + _("redis当前版本:{} 与 package版本:{} 比较失败: {}").format( + current_redis_version, package_redis_version, cluster.immute_domain, err + ) + ) + if is_equal: + raise Exception( + _("redis集群: redis当前版本:{} 与 package版本:{} 相同,无需升级").format( + cluster.immute_domain, current_redis_version, package_redis_version + ) + ) + + def get_cluster_meta_data(self, bk_biz_id: int, cluster_id: int): + cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id) + + master_ports, slave_ports = defaultdict(list), defaultdict(list) + master_slave_pairs = [] + masterip_to_slaveip = {} + + for master_obj in cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_MASTER.value): + master_ports[master_obj.machine.ip].append(master_obj.port) + if master_obj.as_ejector and master_obj.as_ejector.first(): + my_slave_obj = master_obj.as_ejector.get().receiver + slave_ports[my_slave_obj.machine.ip].append(my_slave_obj.port) + masterip_to_slaveip[master_obj.machine.ip] = my_slave_obj.machine.ip + master_slave_pairs.append( + { + "master": {"ip": master_obj.machine.ip, "port": master_obj.port}, + "slave": {"ip": my_slave_obj.machine.ip, "port": my_slave_obj.port}, + } + ) + return { + "immute_domain": cluster.immute_domain, + "bk_biz_id": str(cluster.bk_biz_id), + "bk_cloud_id": cluster.bk_cloud_id, + "cluster_type": cluster.cluster_type, + "cluster_name": cluster.name, + "cluster_version": cluster.major_version, + "slave_ports": dict(slave_ports), + "master_ports": dict(master_ports), + "masterip_to_slaveip": masterip_to_slaveip, + "master_slave_pairs": master_slave_pairs, + } + + def version_update_flow(self): + redis_pipeline = Builder(root_id=self.root_id, data=self.data) + trans_files = GetFileList(db_type=DBType.Redis) + sub_pipelines = [] + bk_biz_id = self.data["bk_biz_id"] + + redis_pipeline = Builder(root_id=self.root_id, data=self.data) + sub_pipelines = [] + for input_item in self.data["infos"]: + act_kwargs = ActKwargs() + act_kwargs.set_trans_data_dataclass = CommonContext.__name__ + act_kwargs.is_update_trans_data = True + cluster_meta_data = self.get_cluster_meta_data(bk_biz_id, int(input_item["cluster_id"])) + cluster_info = get_cluster_info_by_id(bk_biz_id=bk_biz_id, cluster_id=input_item["cluster_id"]) + act_kwargs.bk_cloud_id = cluster_meta_data["bk_cloud_id"] + act_kwargs.cluster.update(cluster_info) + + sub_pipeline = SubBuilder(root_id=self.root_id, data=self.data) + sub_pipeline.add_act( + act_name=_("初始化配置"), act_component_code=GetRedisActPayloadComponent.code, kwargs=asdict(act_kwargs) + ) + all_ips = [] + all_ips.extend(list(cluster_meta_data["master_ports"].keys())) + all_ips.extend(list(cluster_meta_data["slave_ports"].keys())) + all_ips = list(set(all_ips)) + + act_kwargs.exec_ip = all_ips + act_kwargs.file_list = trans_files.redis_cluster_version_update(input_item["target_version"]) + sub_pipeline.add_act( + act_name=_("主从所有IP 下发介质包"), act_component_code=TransFileComponent.code, kwargs=asdict(act_kwargs) + ) + + act_kwargs.cluster = {} + acts_list = [] + for ip, ports in cluster_meta_data["slave_ports"].items(): + act_kwargs.exec_ip = ip + act_kwargs.cluster["ip"] = ip + act_kwargs.cluster["ports"] = ports + act_kwargs.cluster["password"] = cluster_info["redis_password"] + act_kwargs.cluster["target_version"] = input_item["target_version"] + act_kwargs.cluster["role"] = InstanceRole.REDIS_SLAVE.value + act_kwargs.get_redis_payload_func = ( + RedisActPayload.redis_cluster_version_update_online_payload.__name__ + ) + acts_list.append( + { + "act_name": _("old_slave:{} 版本升级").format(ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + sub_pipeline.add_parallel_acts(acts_list=acts_list) + twemproxy_server_shards = get_twemproxy_cluster_server_shards(bk_biz_id, int(input_item["cluster_id"]), {}) + + if is_redis_cluster_protocal(cluster_meta_data["cluster_type"]): + first_master_ip = list(cluster_meta_data["master_ports"].keys())[0] + act_kwargs.exec_ip = first_master_ip + act_kwargs.cluster = { + "redis_password": cluster_info["redis_password"], + "redis_master_slave_pairs": cluster_meta_data["master_slave_pairs"], + "force": False, + } + act_kwargs.get_redis_payload_func = RedisActPayload.redis_cluster_failover.__name__ + sub_pipeline.add_act( + act_name=_("{} 集群:{}执行 cluster failover").format( + first_master_ip, cluster_meta_data["cluster_name"] + ), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(act_kwargs), + ) + # old_master 升级 + act_kwargs.cluster = {} + acts_list = [] + for ip, ports in cluster_meta_data["master_ports"].items(): + act_kwargs.exec_ip = ip + act_kwargs.cluster["ip"] = ip + act_kwargs.cluster["ports"] = ports + act_kwargs.cluster["password"] = cluster_info["redis_password"] + act_kwargs.cluster["target_version"] = input_item["target_version"] + act_kwargs.cluster["role"] = InstanceRole.REDIS_SLAVE.value + act_kwargs.get_redis_payload_func = ( + RedisActPayload.redis_cluster_version_update_online_payload.__name__ + ) + acts_list.append( + { + "act_name": _("new slave:{} 版本升级").format(ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + sub_pipeline.add_parallel_acts(acts_list=acts_list) + elif is_twemproxy_proxy_type(cluster_meta_data["cluster_type"]): + first_master_ip = list(cluster_meta_data["master_ports"].keys())[0] + act_kwargs.exec_ip = first_master_ip + act_kwargs.cluster = {} + act_kwargs.cluster["cluster_id"] = int(input_item["cluster_id"]) + act_kwargs.cluster["immute_domain"] = cluster_meta_data["immute_domain"] + act_kwargs.cluster["cluster_type"] = cluster_meta_data["cluster_type"] + act_kwargs.cluster["switch_condition"] = { + "is_check_sync": True, # 不强制切换 + "slave_master_diff_time": DEFAULT_MASTER_DIFF_TIME, + "last_io_second_ago": DEFAULT_LAST_IO_SECOND_AGO, + "can_write_before_switch": True, + "sync_type": SyncType.SYNC_MS.value, + } + # 先将 old_slave 切换成 new_master + act_kwargs.cluster["switch_info"] = cluster_meta_data["master_slave_pairs"] + act_kwargs.get_redis_payload_func = RedisActPayload.redis_twemproxy_arch_switch_4_scene.__name__ + sub_pipeline.add_act( + act_name=_("集群:{} 主从切换").format(cluster_meta_data["cluster_name"]), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(act_kwargs), + ) + + act_kwargs.cluster["instances"] = nosqlcomm.other.get_cluster_proxies( + cluster_id=act_kwargs.cluster["cluster_id"] + ) + act_kwargs.get_redis_payload_func = RedisActPayload.redis_twemproxy_backends_4_scene.__name__ + sub_pipeline.add_act( + act_name=_("Redis-{}-检查切换状态").format(first_master_ip), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(act_kwargs), + ) + + # 将 master & slave 配置中 slaveof 配置清理 + acts_list = [] + act_kwargs.cluster = {} + for master_ip, master_ports in cluster_meta_data["master_ports"].items(): + slave_ip = cluster_meta_data["masterip_to_slaveip"][master_ip] + slave_ports = cluster_meta_data["slave_ports"][slave_ip] + + act_kwargs.exec_ip = master_ip + act_kwargs.write_op = WriteContextOpType.APPEND.value + ports_str = "\n".join(str(port) for port in master_ports) + act_kwargs.cluster["shell_command"] = REDIS_CONF_DEL_SLAVEOF.format(ports_str) + acts_list.append( + { + "act_name": _("old_master:{} 删除slaveof配置").format(master_ip), + "act_component_code": ExecuteShellScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + + act_kwargs.exec_ip = slave_ip + act_kwargs.write_op = WriteContextOpType.APPEND.value + ports_str = "\n".join(str(port) for port in slave_ports) + act_kwargs.cluster["shell_command"] = REDIS_CONF_DEL_SLAVEOF.format(ports_str) + acts_list.append( + { + "act_name": _("old_slave:{} 删除slaveof配置").format(master_ip), + "act_component_code": ExecuteShellScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + sub_pipeline.add_parallel_acts(acts_list=acts_list) + + # old_master 升级 + act_kwargs.cluster = {} + act_kwargs.write_op = None + acts_list = [] + for ip, ports in cluster_meta_data["master_ports"].items(): + act_kwargs.exec_ip = ip + act_kwargs.cluster["ip"] = ip + act_kwargs.cluster["ports"] = ports + act_kwargs.cluster["password"] = cluster_info["redis_password"] + act_kwargs.cluster["target_version"] = input_item["target_version"] + act_kwargs.cluster["role"] = InstanceRole.REDIS_MASTER.value + act_kwargs.get_redis_payload_func = ( + RedisActPayload.redis_cluster_version_update_online_payload.__name__ + ) + acts_list.append( + { + "act_name": _("new slave:{} 版本升级").format(ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + sub_pipeline.add_parallel_acts(acts_list=acts_list) + + # 清档old_master + acts_list = [] + for ip, ports in cluster_meta_data["master_ports"].items(): + act_kwargs.exec_ip = ip + act_kwargs.cluster = {} + act_kwargs.cluster["domain_name"] = cluster_meta_data["immute_domain"] + act_kwargs.cluster["db_version"] = cluster_meta_data["cluster_version"] + act_kwargs.cluster["cluster_type"] = cluster_meta_data["cluster_type"] + act_kwargs.cluster["ip"] = ip + act_kwargs.cluster["ports"] = ports + act_kwargs.cluster["force"] = False + act_kwargs.cluster["db_list"] = [0] + act_kwargs.cluster["flushall"] = True + act_kwargs.get_redis_payload_func = RedisActPayload.redis_flush_data_payload.__name__ + acts_list.append( + { + "act_name": _("old_master:{} 清档").format(ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + sub_pipeline.add_parallel_acts(acts_list=acts_list) + + # old_master 做 new_slave + child_pipelines = [] + act_kwargs.cluster = {} + act_kwargs.cluster["bk_biz_id"] = bk_biz_id + act_kwargs.cluster["bk_cloud_id"] = cluster_meta_data["bk_cloud_id"] + act_kwargs.cluster["immute_domain"] = cluster_meta_data["immute_domain"] + act_kwargs.cluster["cluster_type"] = cluster_meta_data["cluster_type"] + act_kwargs.cluster["cluster_name"] = cluster_meta_data["cluster_name"] + masterip_to_slaveip = cluster_meta_data["masterip_to_slaveip"] + for master_ip, ports in cluster_meta_data["master_ports"].items(): + master_ports = cluster_meta_data["master_ports"][master_ip] + slave_ip = masterip_to_slaveip[master_ip] + slave_ports = cluster_meta_data["slave_ports"][slave_ip] + sync_param = { + "sync_type": SyncType.SYNC_MS, + "origin_1": slave_ip, + "sync_dst1": master_ip, + "ins_link": [], + "server_shards": twemproxy_server_shards.get(slave_ip, {}), + "cache_backup_mode": get_cache_backup_mode(bk_biz_id, input_item["cluster_id"]), + } + for idx, port in enumerate(master_ports): + sync_param["ins_link"].append( + { + "origin_1": str(slave_ports[idx]), + "sync_dst1": str(port), + } + ) + sync_builder = RedisMakeSyncAtomJob( + root_id=self.root_id, ticket_data=self.data, sub_kwargs=act_kwargs, params=sync_param + ) + child_pipelines.append(sync_builder) + sub_pipeline.add_parallel_sub_pipeline(child_pipelines) + + # 修改元数据指向,并娜动CC模块 + act_kwargs.cluster = {} + act_kwargs.cluster["bk_biz_id"] = bk_biz_id + act_kwargs.cluster["bk_cloud_id"] = cluster_meta_data["bk_cloud_id"] + act_kwargs.cluster["immute_domain"] = cluster_meta_data["immute_domain"] + act_kwargs.cluster["cluster_type"] = cluster_meta_data["cluster_type"] + act_kwargs.cluster["cluster_name"] = cluster_meta_data["cluster_name"] + act_kwargs.cluster["switch_condition"] = { + "is_check_sync": True, # 不强制切换 + "slave_master_diff_time": DEFAULT_MASTER_DIFF_TIME, + "last_io_second_ago": DEFAULT_LAST_IO_SECOND_AGO, + "can_write_before_switch": True, + "sync_type": SyncType.SYNC_MS.value, + } + act_kwargs.cluster["sync_relation"] = [] + masterip_to_slaveip = cluster_meta_data["masterip_to_slaveip"] + for master_ip, ports in cluster_meta_data["master_ports"].items(): + master_ports = cluster_meta_data["master_ports"][master_ip] + slave_ip = masterip_to_slaveip[master_ip] + slave_ports = cluster_meta_data["slave_ports"][slave_ip] + for idx, port in enumerate(master_ports): + act_kwargs.cluster["sync_relation"].append( + { + "ejector": { + "ip": master_ip, + "port": int(port), + }, + "receiver": { + "ip": slave_ip, + "port": int(slave_ports[idx]), + }, + } + ) + act_kwargs.cluster["meta_func_name"] = RedisDBMeta.tendis_switch_4_scene.__name__ + sub_pipeline.add_act( + act_name=_("Redis-元数据切换"), act_component_code=RedisDBMetaComponent.code, kwargs=asdict(act_kwargs) + ) + + acts_list = [] + for master_ip, master_ports in cluster_meta_data["master_ports"].items(): + act_kwargs.cluster["meta_update_ip"] = master_ip + slave_ip = masterip_to_slaveip[master_ip] + act_kwargs.cluster["meta_update_ports"] = master_ports + act_kwargs.cluster["meta_update_status"] = InstanceStatus.RUNNING.value + act_kwargs.cluster["meta_func_name"] = RedisDBMeta.instances_failover_4_scene.__name__ + acts_list.append( + { + "act_name": _("master:{}-slave:{}-元数据交换".format(master_ip, slave_ip)), + "act_component_code": RedisDBMetaComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + sub_pipeline.add_parallel_acts(acts_list=acts_list) + + # 更新元数据中集群版本 + act_kwargs.cluster["bk_biz_id"] = bk_biz_id + act_kwargs.cluster["bk_cloud_id"] = cluster_meta_data["bk_cloud_id"] + act_kwargs.cluster["immute_domain"] = cluster_meta_data["immute_domain"] + act_kwargs.cluster["target_version"] = input_item["target_version"] + act_kwargs.cluster["meta_func_name"] = RedisDBMeta.redis_cluster_version_update.__name__ + sub_pipeline.add_act( + act_name=_("Redis-元数据更新集群版本"), act_component_code=RedisDBMetaComponent.code, kwargs=asdict(act_kwargs) + ) + + # 更新 dbconfig 中版本信息 + act_kwargs.cluster = { + "bk_biz_id": bk_biz_id, + "cluster_domain": cluster_meta_data["immute_domain"], + "current_version": cluster_meta_data["cluster_version"], + "target_version": input_item["target_version"], + "cluster_type": cluster_meta_data["cluster_type"], + } + act_kwargs.get_redis_payload_func = RedisActPayload.redis_cluster_version_update_dbconfig.__name__ + sub_pipeline.add_act( + act_name=_("Redis-更新dbconfig中集群版本"), + act_component_code=RedisConfigComponent.code, + kwargs=asdict(act_kwargs), + ) + + # 重装 dbmon + acts_list = [] + act_kwargs.cluster = {} + act_kwargs.cluster["servers"] = [ + { + "app": AppCache.get_app_attr(bk_biz_id, "db_app_abbr"), + "app_name": AppCache.get_app_attr(bk_biz_id, "bk_biz_name"), + "bk_biz_id": str(bk_biz_id), + "bk_cloud_id": int(cluster_meta_data["bk_cloud_id"]), + "cluster_name": cluster_meta_data["cluster_name"], + "cluster_type": cluster_meta_data["cluster_type"], + "cluster_domain": cluster_info["cluster_domain"], + } + ] + for ip, ports in cluster_meta_data["slave_ports"].items(): + act_kwargs.cluster["servers"][0]["server_ip"] = ip + act_kwargs.cluster["servers"][0]["server_ports"] = ports + act_kwargs.cluster["servers"][0]["meta_role"] = InstanceRole.REDIS_MASTER.value + act_kwargs.cluster["servers"][0]["server_shards"] = twemproxy_server_shards.get(ip, {}) + act_kwargs.cluster["servers"][0]["cache_backup_mode"] = get_cache_backup_mode( + bk_biz_id, int(input_item["cluster_id"]) + ) + act_kwargs.exec_ip = ip + act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ + acts_list.append( + { + "act_name": _("new_master {} 重装 dbmon").format(ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + for ip, ports in cluster_meta_data["master_ports"].items(): + act_kwargs.cluster["servers"][0]["server_ip"] = ip + act_kwargs.cluster["servers"][0]["server_ports"] = ports + act_kwargs.cluster["servers"][0]["meta_role"] = InstanceRole.REDIS_SLAVE.value + act_kwargs.cluster["servers"][0]["server_shards"] = twemproxy_server_shards.get(ip, {}) + act_kwargs.cluster["servers"][0]["cache_backup_mode"] = get_cache_backup_mode( + bk_biz_id, int(input_item["cluster_id"]) + ) + act_kwargs.exec_ip = ip + act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ + acts_list.append( + { + "act_name": _("new_slave {} 重装 dbmon").format(ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + sub_pipeline.add_parallel_acts(acts_list=acts_list) + + sub_pipelines.append( + sub_pipeline.build_sub_process(sub_name=_("集群{}版本在线升级".format(cluster_meta_data["cluster_name"]))) + ) + redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines) + redis_pipeline.run_pipeline() diff --git a/dbm-ui/backend/flow/engine/controller/redis.py b/dbm-ui/backend/flow/engine/controller/redis.py index 14ac5a8c0b..44bd2abc49 100644 --- a/dbm-ui/backend/flow/engine/controller/redis.py +++ b/dbm-ui/backend/flow/engine/controller/redis.py @@ -23,6 +23,7 @@ from backend.flow.engine.bamboo.scene.redis.redis_cluster_scene_cmr import RedisClusterCMRSceneFlow from backend.flow.engine.bamboo.scene.redis.redis_cluster_scene_mss import RedisClusterMSSSceneFlow from backend.flow.engine.bamboo.scene.redis.redis_cluster_shutdown import RedisClusterShutdownFlow +from backend.flow.engine.bamboo.scene.redis.redis_cluster_version_update_online import RedisClusterVersionUpdateOnline from backend.flow.engine.bamboo.scene.redis.redis_data_structure import RedisDataStructureFlow from backend.flow.engine.bamboo.scene.redis.redis_data_structure_task_delete import RedisDataStructureTaskDeleteFlow from backend.flow.engine.bamboo.scene.redis.redis_dbmon import RedisDbmonSceneFlow @@ -230,3 +231,10 @@ def redis_cluster_add_slave(self): """ flow = RedisClusterAddSlaveFlow(root_id=self.root_id, data=self.ticket_data) flow.add_slave_flow() + + def redis_cluster_version_update_online(self): + """ + redis 集群版本在线升级 + """ + flow = RedisClusterVersionUpdateOnline(root_id=self.root_id, data=self.ticket_data) + flow.version_update_flow() diff --git a/dbm-ui/backend/flow/plugins/components/collections/redis/redis_dts.py b/dbm-ui/backend/flow/plugins/components/collections/redis/redis_dts.py index 5a13d64f27..a663996182 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/redis/redis_dts.py +++ b/dbm-ui/backend/flow/plugins/components/collections/redis/redis_dts.py @@ -161,6 +161,7 @@ def check_all_src_slaves_running(self, src_data: dict) -> bool: self.log_error(_("源redis集群{}存在{}个非running状态的slave".format(src_data["cluster_addr"], unrunning_slave_cnt))) return False slaves_addr = [slave["ip"] + ":" + str(slave["port"]) for slave in src_data["slave_instances"]] + self.log_info("check_all_src_slaves_running slaves_addr:{}".format(slaves_addr)) DRSApi.redis_rpc( { "addresses": slaves_addr, @@ -240,6 +241,9 @@ def check_src_cluster_nodes_ok(self, dts_copy_type: str, src_data: dict) -> List # 获取集群cluster nodes信息 running_master = src_data["one_running_master"] master_addr = running_master["ip"] + ":" + str(running_master["port"]) + self.log_info( + "check src_cluster:{} cluster_nodes is ok,master_addr:{}".format(src_data["cluster_addr"], master_addr) + ) resp = DRSApi.redis_rpc( { "addresses": [master_addr], @@ -300,6 +304,9 @@ def check_src_cluster_state_ok(self, src_data: dict) -> bool: return True running_master = src_data["one_running_master"] master_addr = running_master["ip"] + ":" + str(running_master["port"]) + self.log_info( + "check src_cluster:{} cluster_state is ok,master_addr:{}".format(src_data["cluster_addr"], master_addr) + ) resp = DRSApi.redis_rpc( { "addresses": [master_addr], @@ -350,6 +357,7 @@ def check_dst_cluster_connected(self, bk_biz_id: int, dts_copy_type: str, dst_da cluster = Cluster.objects.get(id=dst_data["cluster_id"]) for proxy in cluster.proxyinstance_set.all(): dst_proxy_addrs.append(proxy.machine.ip + ":" + str(proxy.port)) + self.log_info("check dst_cluster:{} proxy:{} connect".format(dst_domain, dst_proxy_addrs)) DRSApi.redis_rpc( { "addresses": dst_proxy_addrs, diff --git a/dbm-ui/backend/flow/urls.py b/dbm-ui/backend/flow/urls.py index 89f2b0165f..b3e2151f3f 100644 --- a/dbm-ui/backend/flow/urls.py +++ b/dbm-ui/backend/flow/urls.py @@ -117,6 +117,7 @@ RedisClusterShardNumUpdateSceneApiView, RedisClusterShutdownSceneApiView, RedisClusterTypeUpdateSceneApiView, + RedisClusterVersionUpdateOnlineApiView, RedisDataStructureSceneApiView, RedisDataStructureTaskDeleteSceneApiView, RedisFlushDataSceneApiView, @@ -197,6 +198,7 @@ url(r"^scene/redis_data_structure$", RedisDataStructureSceneApiView.as_view()), url(r"^scene/redis_data_structure_task_delete$", RedisDataStructureTaskDeleteSceneApiView.as_view()), url(r"^scene/redis_cluster_add_slave$", RedisClusterAddSlaveApiView.as_view()), + url(r"^scene/redis_cluster_version_update_online$", RedisClusterVersionUpdateOnlineApiView.as_view()), # redis api url end # name_service start # name_service clb diff --git a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py index 6639097dfe..490ce7535f 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py +++ b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py @@ -47,6 +47,7 @@ NameSpaceEnum, RedisActuatorActionEnum, ) +from backend.flow.utils.redis.redis_util import get_latest_redis_package_by_version from backend.ticket.constants import TicketType logger = logging.getLogger("flow") @@ -191,7 +192,8 @@ def delete_redis_config(self, clusterMap: dict): ) return data - def redis_conf_names_by_cluster_type(self, cluster_type: str) -> list: + @staticmethod + def redis_conf_names_by_cluster_type(cluster_type: str) -> list: conf_names: list = ["requirepass"] if is_redis_instance_type(cluster_type) or is_tendisplus_instance_type(cluster_type): conf_names.append("cluster-enabled") @@ -360,7 +362,7 @@ def set_proxy_config(self, clusterMap: dict) -> Any: def dts_swap_proxy_config_version(self, clusterMap: dict) -> Any: """ - 交换源集群和目标集群 dbconfig 中的proxy版本信息,有可能 twemproxy集群 切换到 predixy集群 s + 交换源集群和目标集群 dbconfig 中的proxy版本信息,有可能 twemproxy集群 切换到 predixy集群 """ proxy_conf_names = ["password", "redis_password", "port"] logger.info(_("交换源集群和目标集群 dbconfig 中的proxy版本信息")) @@ -760,21 +762,27 @@ def redis_flush_data_payload(self, **kwargs) -> dict: redis清档 """ ip = kwargs["ip"] - redis_config = self.__get_cluster_config( - self.cluster["domain_name"], self.cluster["db_version"], ConfigTypeEnum.DBConf - ) - + params = kwargs["params"] + domain_name = params.get("domain_name", self.cluster["domain_name"]) + db_version = params.get("db_version", self.cluster["db_version"]) + cluster_type = params.get("cluster_type", self.cluster["cluster_type"]) + ports = params.get("ports", self.cluster[ip]) + force = params.get("force", self.cluster["force"]) + db_list = params.get("db_list", self.cluster["db_list"]) + flushall = params.get("flushall", self.cluster["flushall"]) + + redis_config = self.__get_cluster_config(domain_name, db_version, ConfigTypeEnum.DBConf) return { "db_type": DBActuatorTypeEnum.Redis.value, "action": DBActuatorTypeEnum.Redis.value + "_" + RedisActuatorActionEnum.FlushData.value, "payload": { "ip": ip, - "db_type": self.cluster["cluster_type"], - "ports": self.cluster[ip], - "is_force": self.cluster["force"], + "db_type": cluster_type, + "ports": ports, + "is_force": force, "password": redis_config["requirepass"], - "db_list": self.cluster["db_list"], - "is_flush_all": self.cluster["flushall"], + "db_list": db_list, + "is_flush_all": flushall, }, } @@ -1489,3 +1497,106 @@ def redis_cluster_forget_4_scene(self, **kwargs) -> dict: "forget_list": params["forget_instances"], }, } + + # redis 原地升级 + def redis_cluster_version_update_online_payload(self, **kwargs) -> dict: + params = kwargs["params"] + db_version = params["target_version"] + redis_pkg = get_latest_redis_package_by_version(db_version) + return { + "db_type": DBActuatorTypeEnum.Redis.value, + "action": DBActuatorTypeEnum.Redis.value + "_" + RedisActuatorActionEnum.VERSION_UPDATE.value, + "payload": { + "pkg": redis_pkg.name, + "pkg_md5": redis_pkg.md5, + "ip": params["ip"], + "ports": params["ports"], + "password": params["password"], + "role": params["role"], + }, + } + + # redis 原地升级更新dbconfig + def redis_cluster_version_update_dbconfig(self, cluster_map: dict): + # 如果版本没变化,不需要更新 + if cluster_map["current_version"] == cluster_map["target_version"]: + return + src_resp = DBConfigApi.query_conf_item( + params={ + "bk_biz_id": str(cluster_map["bk_biz_id"]), + "level_name": LevelName.CLUSTER, + "level_value": cluster_map["cluster_domain"], + "level_info": {"module": str(DEFAULT_DB_MODULE_ID)}, + "conf_file": cluster_map["current_version"], + "conf_type": ConfigTypeEnum.DBConf, + "namespace": cluster_map["cluster_type"], + "format": FormatType.MAP, + } + ) + conf_names = self.redis_conf_names_by_cluster_type(cluster_map["cluster_type"]) + conf_items = [] + for conf_name in conf_names: + if conf_name in src_resp["content"]: + conf_items.append( + {"conf_name": conf_name, "conf_value": src_resp["content"][conf_name], "op_type": OpType.UPDATE} + ) + remove_items = [{"conf_name": conf_name, "op_type": OpType.REMOVE} for conf_name in conf_names] + upsert_param = { + "conf_file_info": { + "conf_file": "", # 需要替换成真实值 + "conf_type": ConfigTypeEnum.DBConf, + "namespace": "", # 需要替换成真实值 + }, + "conf_items": [], # 需要替换成真实值 + "level_info": {"module": str(DEFAULT_DB_MODULE_ID)}, + "confirm": DEFAULT_CONFIG_CONFIRM, + "req_type": ReqType.SAVE_AND_PUBLISH, + "bk_biz_id": str(cluster_map["bk_biz_id"]), + "level_name": LevelName.CLUSTER, + "level_value": "", # 需要替换成真实值 + } + # 先删除 + upsert_param["conf_file_info"]["namespace"] = cluster_map["cluster_type"] + upsert_param["conf_file_info"]["conf_file"] = cluster_map["current_version"] + upsert_param["conf_items"] = remove_items + upsert_param["level_value"] = cluster_map["cluster_domain"] + logger.info(_("删除集群:{} redis配置,upsert_param:{}".format(cluster_map["cluster_domain"], upsert_param))) + DBConfigApi.upsert_conf_item(upsert_param) + + # 再写入 + upsert_param["conf_file_info"]["namespace"] = cluster_map["cluster_type"] + upsert_param["conf_file_info"]["conf_file"] = cluster_map["target_version"] + upsert_param["conf_items"] = conf_items + upsert_param["level_value"] = cluster_map["cluster_domain"] + logger.info(_("更新集群:{} redis配置 为 目标集群的配置,upsert_param:{}".format(cluster_map["cluster_domain"], upsert_param))) + DBConfigApi.upsert_conf_item(upsert_param) + + # redis cluster failover + def redis_cluster_failover(self, **kwargs) -> dict: + """ + params: + { + "redis_password":"xxxx", + "redis_master_slave_pairs":[ + { + "master": {"ip":"a.a.a.a","port":"30000"}, + "slave": {"ip":"b.b.b.b","port":"30000"} + }, + { + "master": {"ip":"a.a.a.a","port":"30001"}, + "slave": {"ip":"b.b.b.b","port":"30001"} + } + ], + "force":false + } + """ + params = kwargs["params"] + return { + "db_type": DBActuatorTypeEnum.Redis.value, + "action": DBActuatorTypeEnum.Redis.value + "_" + RedisActuatorActionEnum.CLUSTER_FAILOVER.value, + "payload": { + "redis_password": params["redis_password"], + "redis_master_slave_pairs": params["redis_master_slave_pairs"], + "force": False, + }, + } diff --git a/dbm-ui/backend/flow/utils/redis/redis_db_meta.py b/dbm-ui/backend/flow/utils/redis/redis_db_meta.py index 454cacfc07..4e7a625c56 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_db_meta.py +++ b/dbm-ui/backend/flow/utils/redis/redis_db_meta.py @@ -420,11 +420,11 @@ def instances_status_update(self) -> bool: machine_obj = Machine.objects.get(ip=self.cluster["meta_update_ip"]) if machine_obj.access_layer == AccessLayer.PROXY.value: ProxyInstance.objects.filter( - machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_udpate_ports"] + machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_update_ports"] ).update(status=self.cluster["meta_update_status"]) else: StorageInstance.objects.filter( - machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_udpate_ports"] + machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_update_ports"] ).update(status=self.cluster["meta_update_status"]) return True @@ -432,7 +432,7 @@ def instances_failover_4_scene(self) -> bool: """1.修改状态、2.切换角色""" self.instances_status_update() with atomic(): - for port in self.cluster["meta_udpate_ports"]: + for port in self.cluster["meta_update_ports"]: old_master = StorageInstance.objects.get(machine__ip=self.cluster["meta_update_ip"], port=port) old_slave = old_master.as_ejector.get().receiver StorageInstanceTuple.objects.get(ejector=old_master, receiver=old_slave).delete(keep_parents=True) @@ -759,3 +759,14 @@ def dts_online_switch_swap_two_cluster_storage(self): RedisCCTopoOperator(dst_cluster).transfer_instances_to_cluster_module(src_storageinstances) return True + + def redis_cluster_version_update(self): + """ + 更新集群版本(major_version) + """ + cluster = Cluster.objects.get( + bk_cloud_id=self.cluster["bk_cloud_id"], immute_domain=self.cluster["immute_domain"] + ) + cluster.major_version = self.cluster["target_version"] + cluster.save(update_fields=["major_version"]) + return True diff --git a/dbm-ui/backend/flow/utils/redis/redis_proxy_util.py b/dbm-ui/backend/flow/utils/redis/redis_proxy_util.py index dc0da319a0..d94482b602 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_proxy_util.py +++ b/dbm-ui/backend/flow/utils/redis/redis_proxy_util.py @@ -13,13 +13,23 @@ from collections import defaultdict from typing import Dict, List, Tuple +from django.utils.translation import ugettext as _ + from backend.components import DBConfigApi, DRSApi from backend.components.dbconfig.constants import FormatType, LevelName +from backend.configuration.constants import DBType from backend.constants import IP_PORT_DIVIDER from backend.db_meta.enums import ClusterType, InstanceRole from backend.db_meta.models import Cluster -from backend.db_services.redis.util import is_predixy_proxy_type, is_redis_instance_type, is_twemproxy_proxy_type -from backend.flow.consts import DEFAULT_DB_MODULE_ID, ConfigFileEnum, ConfigTypeEnum +from backend.db_package.models import Package +from backend.db_services.redis.util import ( + is_predixy_proxy_type, + is_redis_instance_type, + is_tendisplus_instance_type, + is_tendisssd_instance_type, + is_twemproxy_proxy_type, +) +from backend.flow.consts import DEFAULT_DB_MODULE_ID, ConfigFileEnum, ConfigTypeEnum, MediumEnum logger = logging.getLogger("flow") @@ -307,3 +317,22 @@ def get_cache_backup_mode(bk_biz_id: int, cluster_id: int) -> str: return resp["content"].get("cache_backup_mode", "") except Exception: return "aof" + + +def get_db_versions_by_cluster_type(cluster_type: str) -> list: + if is_redis_instance_type(cluster_type): + ret = Package.objects.filter(db_type=DBType.Redis.value, pkg_type=MediumEnum.Redis.value).values_list( + "version", flat=True + ) + return list(ret) + elif is_tendisplus_instance_type(cluster_type): + ret = Package.objects.filter(db_type=DBType.Redis.value, pkg_type=MediumEnum.TendisPlus.value).values_list( + "version", flat=True + ) + return list(ret) + elif is_tendisssd_instance_type(cluster_type): + ret = Package.objects.filter(db_type=DBType.Redis.value, pkg_type=MediumEnum.TendisSsd.value).values_list( + "version", flat=True + ) + return list(ret) + raise Exception(_("集群类型:{} 不是一个 redis 集群类型?").format(cluster_type)) diff --git a/dbm-ui/backend/flow/utils/redis/redis_util.py b/dbm-ui/backend/flow/utils/redis/redis_util.py index d3c951e1fc..d20f2826dc 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_util.py +++ b/dbm-ui/backend/flow/utils/redis/redis_util.py @@ -10,6 +10,10 @@ """ import re +from backend.configuration.constants import DBType +from backend.db_package.models import Package +from backend.flow.consts import MediumEnum + def domain_without_port(domain): end_port_reg = re.compile(r"(\:\d+$)|(#\d+$)") @@ -23,3 +27,85 @@ def check_domain(domain): if match: return True return False + + +def convert_version_to_uint(version): + version = version.strip() + if not version: + return 0, None + list01 = version.split(".") + billion = "" + thousand = "" + single = "" + if len(list01) == 0: + err = ValueError(f"version:{version} format not correct") + return 0, err + billion = list01[0] + thousand = list01[1] if len(list01) >= 2 else "" + single = list01[2] if len(list01) >= 3 else "" + total = 0 + if billion: + try: + b = int(billion) + total += b * 1000000 + except ValueError as e: + err = ValueError(f"convertVersionToUint int() fail, err:{e}, billion:{billion}, version:{version}") + return 0, err + if thousand: + try: + t = int(thousand) + total += t * 1000 + except ValueError as e: + err = ValueError(f"convertVersionToUint int() fail, err:{e}, thousand:{thousand}, version:{version}") + return 0, err + if single: + try: + s = int(single) + total += s + except ValueError as e: + err = ValueError(f"convertVersionToUint int() fail, err:{e}, single:{single}, version:{version}") + return 0, err + return total, None + + +# redis-6.2.7.tar.gz => (6002007, 0, None) +# redis-2.8.17-rocksdb-v1.3.10.tar.gz => (2008017, 1003010, None) +def version_parse(version): + reg01 = re.compile(r"[\d+.]+") + rets = reg01.findall(version) + if len(rets) == 0: + err = ValueError(f"TendisVersionParse version:{version} format not correct") + return 0, 0, err + base_version = 0 + sub_version = 0 + if len(rets) >= 1: + base_version, err = convert_version_to_uint(rets[0]) + if err: + return 0, 0, err + if len(rets) >= 2: + sub_version, err = convert_version_to_uint(rets[1]) + if err: + return 0, 0, err + return base_version, sub_version, None + + +# 判断两个版本是否相等 +def version_equal(version1, version2): + base_version1, sub_version1, err = version_parse(version1) + if err: + return False, err + base_version2, sub_version2, err = version_parse(version2) + if err: + return False, err + return base_version1 == base_version2 and sub_version1 == sub_version2, None + + +# 根据db_version 获取 redis 最新 Package +def get_latest_redis_package_by_version(db_version): + pkg_type = MediumEnum.Redis + if db_version.startswith("TendisSSD"): + pkg_type = MediumEnum.TendisSsd + if db_version.startswith("Tendisplus"): + pkg_type = MediumEnum.TendisPlus + redis_pkg = Package.get_latest_package(version=db_version, pkg_type=pkg_type, db_type=DBType.Redis) + return redis_pkg diff --git a/dbm-ui/backend/flow/views/redis_cluster.py b/dbm-ui/backend/flow/views/redis_cluster.py index eeb58ba37d..958b1c4c1f 100644 --- a/dbm-ui/backend/flow/views/redis_cluster.py +++ b/dbm-ui/backend/flow/views/redis_cluster.py @@ -538,3 +538,29 @@ def post(request): root_id = uuid.uuid1().hex RedisController(root_id=root_id, ticket_data=request.data).redis_cluster_add_slave() return Response({"root_id": root_id}) + + +class RedisClusterVersionUpdateOnlineApiView(FlowTestView): + """ + api: /apis/v1/flow/scene/redis_cluster_version_update_online + params: + { + "bk_biz_id": 3, + "ticket_type":"REDIS_CLUSTER_VERSION_UPDATE_ONLINE", + "created_by":"admin", + "uid":"1111", + "infos": [ + { + "cluster_id": 1, + "current_version": "Redis-5", + "target_version": "Redis-6", + } + ] + } + """ + + @staticmethod + def post(request): + root_id = uuid.uuid1().hex + RedisController(root_id=root_id, ticket_data=request.data).redis_cluster_version_update_online() + return Response({"root_id": root_id})