diff --git a/dbm-services/mysql/db-simulation/app/config/config.go b/dbm-services/mysql/db-simulation/app/config/config.go index 1a5500d997..09dba9f325 100644 --- a/dbm-services/mysql/db-simulation/app/config/config.go +++ b/dbm-services/mysql/db-simulation/app/config/config.go @@ -37,6 +37,7 @@ type AppConfig struct { TdbctlPodResource TdbctlPodResource `yaml:"tdbctlPodResource"` SimulationNodeLables []LabelItem `yaml:"simulationNodeLables"` SimulationtaintLables []LabelItem `yaml:"simulationtaintLables"` + Redis RedisDb `yaml:"redis"` } // BkRepoConfig bkrepo config @@ -106,6 +107,12 @@ type ImgConfig struct { Image string `yaml:"image"` } +// RedisDb redis +type RedisDb struct { + Addr string `yaml:"addr"` + Password string `yaml:"password"` +} + func init() { viper.AutomaticEnv() GAppConfig.ListenAddr = "0.0.0.0:80" diff --git a/dbm-services/mysql/db-simulation/app/service/simulation_task.go b/dbm-services/mysql/db-simulation/app/service/simulation_task.go index ad1c88f3ce..c6cdccd45b 100644 --- a/dbm-services/mysql/db-simulation/app/service/simulation_task.go +++ b/dbm-services/mysql/db-simulation/app/service/simulation_task.go @@ -11,17 +11,26 @@ package service import ( + "context" "crypto/sha256" + "encoding/json" "errors" "fmt" "os" "regexp" + "slices" "strings" "time" + "github.com/go-redis/redis/v8" + "github.com/samber/lo" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "dbm-services/common/go-pubpkg/cmutil" util "dbm-services/common/go-pubpkg/cmutil" "dbm-services/common/go-pubpkg/logger" "dbm-services/mysql/db-simulation/app" + "dbm-services/mysql/db-simulation/app/config" "dbm-services/mysql/db-simulation/model" ) @@ -29,6 +38,9 @@ import ( // 用于保留现场排查问题 var DelPod = true +// HeartbeatInterval 心跳时间间隔 +var HeartbeatInterval = 15 + // BaseParam 请求模拟执行的基础参数 type BaseParam struct { //nolint @@ -49,6 +61,24 @@ type BaseParam struct { ExcuteObjects []ExcuteSQLFileObjV2 `json:"execute_objects" binding:"gt=0,dive,required"` } +// BuildTendbPodName build tendb pod name +func (b BaseParam) BuildTendbPodName() string { + podName := fmt.Sprintf("tendb-%s-%s", strings.ToLower(b.MySQLVersion), + replaceUnderSource(b.TaskId)) + return podName +} + +// BuildSpiderPodName build spider pod name +func (b BaseParam) BuildSpiderPodName() string { + podName := fmt.Sprintf("spider-%s-%s", strings.ToLower(b.MySQLVersion), + replaceUnderSource(b.TaskId)) + return podName +} + +func replaceUnderSource(str string) string { + return strings.ReplaceAll(str, "_", "-") +} + // BuildStartArgs mysql pod start args func (b BaseParam) BuildStartArgs() []string { if len(b.MySQLStartConfigs) == 0 { @@ -114,13 +144,12 @@ var SpiderTaskChan chan SimulationTask // CtrlChan 并发控制 var ctrlChan chan struct{} +var rdb *redis.Client + func init() { TaskChan = make(chan SimulationTask, 100) SpiderTaskChan = make(chan SimulationTask, 100) ctrlChan = make(chan struct{}, 30) -} - -func init() { timer := time.NewTicker(60 * time.Second) go func() { for { @@ -134,6 +163,133 @@ func init() { } } }() + logger.Info("redis addr %s", config.GAppConfig.Redis.Addr) + rdb = redis.NewClient(&redis.Options{ + Addr: config.GAppConfig.Redis.Addr, + Password: config.GAppConfig.Redis.Password, + DB: 0, + }) + go func() { + time.Sleep(time.Duration(HeartbeatInterval) * time.Second) + reloadRunningTaskFromdb() + rdb.Close() + }() +} + +// ReloadParam TODO +type ReloadParam struct { + BaseParam + SpiderVersion *string `json:"spider_version,omitempty"` +} + +// reloadRunningTaskFromdb 重载重启服务前运行的任务 +func reloadRunningTaskFromdb() { + // 加锁是避免多个服务同时启动,避免相同任务被重载 + randkey := cmutil.RandomString(10) + key := "simulation:reload:lock" + ok, err := rdb.SetNX(context.TODO(), key, randkey, 60*time.Second).Result() + if err != nil { + logger.Error("set lock failed %s", err.Error()) + return + } + if !ok { + logger.Info("set lock failed") + return + } + defer func() { + luaStript := `if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end` + v, err := rdb.Eval(context.TODO(), luaStript, []string{key}, []interface{}{randkey}).Int() + if err != nil { + logger.Error("del lock failed %s", err.Error()) + return + } + if v != 1 { + logger.Error("unlock failed,key is %s,val %s", key, randkey) + } + }() + var tks []model.TbSimulationTask + if err := model.DB.Model(model.TbSimulationTask{}).Where( + //nolint + "phase not in (?) and create_time > DATE_SUB(NOW(),INTERVAL 6 HOUR) and heartbeat_time < DATE_SUB(NOW(),INTERVAL ? SECOND)", + []string{model.PhaseDone, model.PhaseReloading}, HeartbeatInterval).Scan(&tks).Error; err != nil { + logger.Error("get running task failed %s", err.Error()) + return + } + if len(tks) == 0 { + logger.Info("no need reload running task") + return + } + for _, tk := range tks { + var err error + var req model.TbRequestRecord + if err = model.DB.Model(model.TbRequestRecord{}).Where("request_id = ? ", tk.RequestID).Find(&req). + Error; err != nil { + logger.Error("get request content failed %s", err.Error()) + //nolint + model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "") + continue + } + var p ReloadParam + if err = json.Unmarshal([]byte(req.RequestBody), &p); err != nil { + logger.Error("get request content failed %s", err.Error()) + //nolint + model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "") + continue + } + var podName string + if p.SpiderVersion == nil { + podName = p.BuildTendbPodName() + } else { + podName = p.BuildSpiderPodName() + } + // delete old pod + if slices.Contains([]string{model.PhaseCreatePod, model.PhaseLoadSchema, model.PhaseRunning}, tk.Phase) { + err = Kcs.Cli.CoreV1().Pods(Kcs.Namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + if err != nil { + logger.Error("delete pod failed %s", err.Error()) + //nolint + model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "") + continue + } + } + model.UpdatePhase(tk.TaskId, tk.MySQLVersion, model.PhaseReloading) + tsk := p.BuildTsk(tk.RequestID) + if p.SpiderVersion != nil { + SpiderTaskChan <- tsk + } else { + TaskChan <- tsk + } + } +} + +// BuildTsk build read load task +func (r ReloadParam) BuildTsk(requestId string) (tsk SimulationTask) { + tsk = SimulationTask{ + RequestId: requestId, + DbPodSets: NewDbPodSets(), + BaseParam: &r.BaseParam, + Version: r.MySQLVersion, + } + version := r.MySQLVersion + img, err := GetImgFromMySQLVersion(version) + if err != nil { + logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error()) + return + } + tsk.DbImage = img + if r.SpiderVersion != nil { + tsk.SpiderImage, tsk.TdbCtlImage = GetSpiderAndTdbctlImg(*r.SpiderVersion, LatestVersion) + } + tsk.BaseInfo = &MySQLPodBaseInfo{ + PodName: fmt.Sprintf("tendb-%s-%s", strings.ToLower(version), + replaceUnderSource(r.TaskId)), + Lables: map[string]string{"task_id": replaceUnderSource(r.TaskId), + "request_id": requestId}, + RootPwd: r.TaskId[0:4], + Args: r.BuildStartArgs(), + Charset: r.MySQLCharSet, + } + return tsk } func run(task SimulationTask, tkType string) { @@ -154,6 +310,21 @@ func run(task SimulationTask, tkType string) { return } }() + doneChan := make(chan struct{}) + ticker := time.NewTicker(time.Duration(HeartbeatInterval) * time.Second) + go func() { + for { + select { + case <-ticker.C: + model.UpdateHeartbeat(task.TaskId) + case <-doneChan: + logger.Info("simulation run done") + return + } + } + }() + // 关闭协程 + defer func() { ticker.Stop(); doneChan <- struct{}{} }() xlogger := task.getXlogger() // create Pod model.UpdatePhase(task.TaskId, task.MySQLVersion, model.PhaseCreatePod) @@ -201,21 +372,7 @@ func (t *SimulationTask) getDbsExcludeSysDb() (err error) { func (t *SimulationTask) SimulationRun(containerName string, xlogger *logger.Logger) (sstdout, sstderr string, err error) { logger.Info("will execute in %s", containerName) - doneChan := make(chan struct{}) - ticker := time.NewTicker(30 * time.Second) - go func() { - for { - select { - case <-ticker.C: - model.UpdateHeartbeat(t.TaskId, sstderr, sstdout) - case <-doneChan: - logger.Info("simulation run done") - return - } - } - }() - // 关闭协程 - defer func() { ticker.Stop(); doneChan <- struct{}{} }() + model.UpdatePhase(t.TaskId, t.MySQLVersion, model.PhaseLoadSchema) // Load schema SQL sstdout, sstderr, err = t.loadSchemaSQL(containerName) @@ -383,3 +540,56 @@ func (t *SimulationTask) executeMultFilesObject(e ExcuteSQLFileObjV2, containerN } return } + +// GetImgFromMySQLVersion 根据版本获取模拟执行运行的镜像配置 +func GetImgFromMySQLVersion(version string) (img string, err error) { + img, errx := model.GetImageName("mysql", version) + if errx == nil { + logger.Info("get image from db img config: %s", img) + return img, nil + } + switch { + case regexp.MustCompile("5.5").MatchString(version): + return config.GAppConfig.Image.Tendb55Img, nil + case regexp.MustCompile("5.6").MatchString(version): + return config.GAppConfig.Image.Tendb56Img, nil + case regexp.MustCompile("5.7").MatchString(version): + return config.GAppConfig.Image.Tendb57Img, nil + case regexp.MustCompile("8.0").MatchString(version): + return config.GAppConfig.Image.Tendb80Img, nil + default: + return "", fmt.Errorf("not match any version") + } +} + +// GetSpiderAndTdbctlImg TODO +func GetSpiderAndTdbctlImg(spiderVersion, tdbctlVersion string) (spiderImg, tdbctlImg string) { + return getSpiderImg(spiderVersion), getTdbctlImg(tdbctlVersion) +} + +const ( + // LatestVersion latest version + LatestVersion = "latest" +) + +func getSpiderImg(version string) (img string) { + if lo.IsEmpty(version) { + version = LatestVersion + } + img, errx := model.GetImageName("spider", version) + if errx == nil { + return img + } + return config.GAppConfig.Image.SpiderImg +} + +func getTdbctlImg(version string) (img string) { + if lo.IsEmpty(version) { + version = LatestVersion + } + img, errx := model.GetImageName("tdbctl", version) + if errx == nil { + return img + } + return config.GAppConfig.Image.TdbCtlImg +} diff --git a/dbm-services/mysql/db-simulation/handler/dbsimulation.go b/dbm-services/mysql/db-simulation/handler/dbsimulation.go index 610b4d0268..22e6bd5a3b 100644 --- a/dbm-services/mysql/db-simulation/handler/dbsimulation.go +++ b/dbm-services/mysql/db-simulation/handler/dbsimulation.go @@ -61,7 +61,7 @@ func TendbSimulation(r *gin.Context) { return } version := param.MySQLVersion - img, err := getImgFromMySQLVersion(version) + img, err := service.GetImgFromMySQLVersion(version) if err != nil { logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error()) SendResponse(r, err, nil, requestID) @@ -103,7 +103,7 @@ func TendbClusterSimulation(r *gin.Context) { return } version := param.MySQLVersion - img, err := getImgFromMySQLVersion(version) + img, err := service.GetImgFromMySQLVersion(version) if err != nil { logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error()) SendResponse(r, err, nil, RequestID) @@ -126,7 +126,7 @@ func TendbClusterSimulation(r *gin.Context) { logger.Info("the pwd %s", rootPwd) } tsk.DbImage = img - tsk.SpiderImage, tsk.TdbCtlImage = getSpiderAndTdbctlImg(param.SpiderVersion, LatestVersion) + tsk.SpiderImage, tsk.TdbCtlImage = service.GetSpiderAndTdbctlImg(param.SpiderVersion, service.LatestVersion) tsk.BaseInfo = &service.MySQLPodBaseInfo{ PodName: fmt.Sprintf("spider-%s-%s", strings.ToLower(version), replaceUnderSource(param.TaskId)), diff --git a/dbm-services/mysql/db-simulation/handler/handler.go b/dbm-services/mysql/db-simulation/handler/handler.go index b237a2c8b5..54b86b5a48 100644 --- a/dbm-services/mysql/db-simulation/handler/handler.go +++ b/dbm-services/mysql/db-simulation/handler/handler.go @@ -14,14 +14,11 @@ package handler import ( "fmt" "net/http" - "regexp" "strings" "github.com/gin-gonic/gin" - "github.com/samber/lo" "dbm-services/common/go-pubpkg/logger" - "dbm-services/mysql/db-simulation/app/config" "dbm-services/mysql/db-simulation/app/service" "dbm-services/mysql/db-simulation/model" ) @@ -57,12 +54,12 @@ func CreateTmpSpiderPodCluster(r *gin.Context) { Charset: "utf8mb4", } var err error - ps.DbImage, err = getImgFromMySQLVersion(param.BackendVersion) + ps.DbImage, err = service.GetImgFromMySQLVersion(param.BackendVersion) if err != nil { logger.Error(err.Error()) return } - ps.SpiderImage, ps.TdbCtlImage = getSpiderAndTdbctlImg(param.SpiderVersion, LatestVersion) + ps.SpiderImage, ps.TdbCtlImage = service.GetSpiderAndTdbctlImg(param.SpiderVersion, service.LatestVersion) if err := ps.CreateClusterPod(); err != nil { logger.Error(err.Error()) return @@ -148,55 +145,3 @@ func SendResponse(r *gin.Context, err error, data interface{}, requestid string) RequestID: requestid, }) } - -// getImgFromMySQLVersion 根据版本获取模拟执行运行的镜像配置 -func getImgFromMySQLVersion(version string) (img string, err error) { - img, errx := model.GetImageName("mysql", version) - if errx == nil { - logger.Info("get image from db img config: %s", img) - return img, nil - } - switch { - case regexp.MustCompile("5.5").MatchString(version): - return config.GAppConfig.Image.Tendb55Img, nil - case regexp.MustCompile("5.6").MatchString(version): - return config.GAppConfig.Image.Tendb56Img, nil - case regexp.MustCompile("5.7").MatchString(version): - return config.GAppConfig.Image.Tendb57Img, nil - case regexp.MustCompile("8.0").MatchString(version): - return config.GAppConfig.Image.Tendb80Img, nil - default: - return "", fmt.Errorf("not match any version") - } -} - -func getSpiderAndTdbctlImg(spiderVersion, tdbctlVersion string) (spiderImg, tdbctlImg string) { - return getSpiderImg(spiderVersion), getTdbctlImg(tdbctlVersion) -} - -const ( - // LatestVersion latest version - LatestVersion = "latest" -) - -func getSpiderImg(version string) (img string) { - if lo.IsEmpty(version) { - version = LatestVersion - } - img, errx := model.GetImageName("spider", version) - if errx == nil { - return img - } - return config.GAppConfig.Image.SpiderImg -} - -func getTdbctlImg(version string) (img string) { - if lo.IsEmpty(version) { - version = LatestVersion - } - img, errx := model.GetImageName("tdbctl", version) - if errx == nil { - return img - } - return config.GAppConfig.Image.TdbCtlImg -} diff --git a/dbm-services/mysql/db-simulation/model/tb_simulation_task.go b/dbm-services/mysql/db-simulation/model/tb_simulation_task.go index b8f1c777c9..3c05773e39 100644 --- a/dbm-services/mysql/db-simulation/model/tb_simulation_task.go +++ b/dbm-services/mysql/db-simulation/model/tb_simulation_task.go @@ -48,6 +48,8 @@ const ( PhaseLoadSchema = "SchemaLoading" // PhaseRunning TODO PhaseRunning = "Running" + // PhaseReloading 重载任务 + PhaseReloading = "Reloading" // PhaseDone TODO PhaseDone = "Done" ) @@ -72,11 +74,9 @@ func CompleteTask(task_id, version, status, stderr, stdout, syserrMsg string) (e } // UpdateHeartbeat update task heartbeat -func UpdateHeartbeat(taskid, stderr, stdout string) { +func UpdateHeartbeat(taskid string) { err := DB.Model(TbSimulationTask{}).Where("task_id = ?", taskid).Updates( TbSimulationTask{ - Stdout: stdout, - Stderr: stderr, HeartbeatTime: time.Now(), }).Error if err != nil { @@ -108,11 +108,12 @@ func CreateTask(taskid, requestid, version string, billTaskId string) (err error return err } return DB.Create(&TbSimulationTask{ - TaskId: taskid, - RequestID: requestid, - BillTaskId: billTaskId, - MySQLVersion: version, - Phase: PhaseWaitting, - CreateTime: time.Now(), + TaskId: taskid, + RequestID: requestid, + BillTaskId: billTaskId, + MySQLVersion: version, + Phase: PhaseWaitting, + HeartbeatTime: time.Now(), + CreateTime: time.Now(), }).Error } diff --git a/helm-charts/bk-dbm/templates/configmaps/dbsimulation-configmap.yaml b/helm-charts/bk-dbm/templates/configmaps/dbsimulation-configmap.yaml index f0420558ed..5e6e87d8b0 100644 --- a/helm-charts/bk-dbm/templates/configmaps/dbsimulation-configmap.yaml +++ b/helm-charts/bk-dbm/templates/configmaps/dbsimulation-configmap.yaml @@ -58,6 +58,9 @@ data: name: "{{ $dbsimulationDB.name }}" host: "{{ $dbsimulationDB.host }}" port: "{{ $dbsimulationDB.port }}" + redis: + addr: "{{ .Values.externalRedis.host }}:{{ .Values.externalRedis.port }}" + password: {{ .Values.externalRedis.password }} debug: false {{- if index .Values "db-simulation" "tdbctlPodResource" }} tdbctlPodResource: