Skip to content

Commit

Permalink
feat(dbm-services): 重启服务后重载重启前的运行中的模拟执行任务 TencentBlueKing#8123
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Nov 26, 2024
1 parent 5968e7a commit 3ad7be7
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 87 deletions.
7 changes: 7 additions & 0 deletions dbm-services/mysql/db-simulation/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type AppConfig struct {
TdbctlPodResource TdbctlPodResource `yaml:"tdbctlPodResource"`
SimulationNodeLables []LabelItem `yaml:"simulationNodeLables"`
SimulationtaintLables []LabelItem `yaml:"simulationtaintLables"`
Redis RedisDb `yaml:"redis"`
}

// BkRepoConfig bkrepo config
Expand Down Expand Up @@ -106,6 +107,12 @@ type ImgConfig struct {
Image string `yaml:"image"`
}

// RedisDb redis
type RedisDb struct {
Addr string `yaml:"addr"`
Password string `yaml:"password"`
}

func init() {
viper.AutomaticEnv()
GAppConfig.ListenAddr = "0.0.0.0:80"
Expand Down
251 changes: 233 additions & 18 deletions dbm-services/mysql/db-simulation/app/service/simulation_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,36 @@
package service

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"os"
"regexp"
"slices"
"strings"
"time"

"github.com/go-redis/redis/v8"
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"dbm-services/common/go-pubpkg/cmutil"
util "dbm-services/common/go-pubpkg/cmutil"
"dbm-services/common/go-pubpkg/logger"
"dbm-services/mysql/db-simulation/app"
"dbm-services/mysql/db-simulation/app/config"
"dbm-services/mysql/db-simulation/model"
)

// DelPod 控制运行模拟执行后是否删除拉起的Pod的开关
// 用于保留现场排查问题
var DelPod = true

// HeartbeatInterval 心跳时间间隔
var HeartbeatInterval = 15

// BaseParam 请求模拟执行的基础参数
type BaseParam struct {
//nolint
Expand All @@ -49,6 +61,24 @@ type BaseParam struct {
ExcuteObjects []ExcuteSQLFileObjV2 `json:"execute_objects" binding:"gt=0,dive,required"`
}

// BuildTendbPodName build tendb pod name
func (b BaseParam) BuildTendbPodName() string {
podName := fmt.Sprintf("tendb-%s-%s", strings.ToLower(b.MySQLVersion),
replaceUnderSource(b.TaskId))
return podName
}

// BuildSpiderPodName build spider pod name
func (b BaseParam) BuildSpiderPodName() string {
podName := fmt.Sprintf("spider-%s-%s", strings.ToLower(b.MySQLVersion),
replaceUnderSource(b.TaskId))
return podName
}

func replaceUnderSource(str string) string {
return strings.ReplaceAll(str, "_", "-")
}

// BuildStartArgs mysql pod start args
func (b BaseParam) BuildStartArgs() []string {
if len(b.MySQLStartConfigs) == 0 {
Expand Down Expand Up @@ -114,13 +144,12 @@ var SpiderTaskChan chan SimulationTask
// CtrlChan 并发控制
var ctrlChan chan struct{}

var rdb *redis.Client

func init() {
TaskChan = make(chan SimulationTask, 100)
SpiderTaskChan = make(chan SimulationTask, 100)
ctrlChan = make(chan struct{}, 30)
}

func init() {
timer := time.NewTicker(60 * time.Second)
go func() {
for {
Expand All @@ -134,6 +163,138 @@ func init() {
}
}
}()
logger.Info("redis addr %s", config.GAppConfig.Redis.Addr)
rdb = redis.NewClient(&redis.Options{
Addr: config.GAppConfig.Redis.Addr,
Password: config.GAppConfig.Redis.Password,
DB: 0,
})
go func() {
// 等待一个周期的原因,避免重载执行正常的任务
time.Sleep(time.Duration(HeartbeatInterval) * time.Second)
reloadRunningTaskFromdb()
rdb.Close()
}()
}

// ReloadParam TODO
type ReloadParam struct {
BaseParam
SpiderVersion *string `json:"spider_version,omitempty"`
}

// reloadRunningTaskFromdb 重载重启服务前运行的任务 加锁是避免多个服务同时启动,避免相同任务被重载
func reloadRunningTaskFromdb() {
randkey := cmutil.RandomString(10)
key := "simulation:reload:lock"
ok, err := rdb.SetNX(context.TODO(), key, randkey, 60*time.Second).Result()
if err != nil {
logger.Error("set lock failed %s", err.Error())
return
}
if !ok {
logger.Info("set lock failed")
return
}
defer func() {
luaStript := `if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end`
v, err := rdb.Eval(context.TODO(), luaStript, []string{key}, []interface{}{randkey}).Int()
if err != nil {
logger.Error("del lock failed %s", err.Error())
return
}
if v != 1 {
logger.Error("unlock failed,key is %s,val %s", key, randkey)
}
}()
var tks []model.TbSimulationTask
if err := model.DB.Model(model.TbSimulationTask{}).Where(
//nolint
"phase not in (?) and create_time > DATE_SUB(NOW(),INTERVAL 6 HOUR) and time_to_sec(timediff(heartbeat_time,now())) > ? ",
[]string{model.PhaseDone, model.PhaseReloading}, HeartbeatInterval).Scan(&tks).Error; err != nil {
logger.Error("get running task failed %s", err.Error())
return
}
if len(tks) == 0 {
logger.Info("no need reload running task")
return
}
for _, tk := range tks {
var err error
var req model.TbRequestRecord
var p ReloadParam
var podName string
if err = model.DB.Model(model.TbRequestRecord{}).Where("request_id = ? ", tk.RequestID).Find(&req).
Error; err != nil {
logger.Error("get request content failed %s", err.Error())
//nolint
model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "")
continue
}
if err = json.Unmarshal([]byte(req.RequestBody), &p); err != nil {
logger.Error("get request content failed %s", err.Error())
//nolint
model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "")
continue
}
if p.SpiderVersion == nil {
podName = p.BuildTendbPodName()
} else {
podName = p.BuildSpiderPodName()
}
// delete old pod
var gracePeriodSeconds int64
if slices.Contains([]string{model.PhaseCreatePod, model.PhaseLoadSchema, model.PhaseRunning}, tk.Phase) {
err = Kcs.Cli.CoreV1().Pods(Kcs.Namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
})
if err != nil {
logger.Error("delete pod failed %s", err.Error())
//nolint
model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "")
continue
}
}
// wait pod delete
time.Sleep(3 * time.Second)
logger.Info("loading task %s", tk.TaskId)
model.UpdatePhase(tk.TaskId, tk.MySQLVersion, model.PhaseReloading)
if p.SpiderVersion != nil {
SpiderTaskChan <- p.BuildTsk(tk.RequestID)
} else {
TaskChan <- p.BuildTsk(tk.RequestID)
}
}
}

// BuildTsk build read load task
func (r ReloadParam) BuildTsk(requestId string) (tsk SimulationTask) {
tsk = SimulationTask{
RequestId: requestId,
DbPodSets: NewDbPodSets(),
BaseParam: &r.BaseParam,
Version: r.MySQLVersion,
}
version := r.MySQLVersion
img, err := GetImgFromMySQLVersion(version)
if err != nil {
logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error())
return
}
tsk.DbImage = img
if r.SpiderVersion != nil {
tsk.SpiderImage, tsk.TdbCtlImage = GetSpiderAndTdbctlImg(*r.SpiderVersion, LatestVersion)
}
tsk.BaseInfo = &MySQLPodBaseInfo{
PodName: fmt.Sprintf("tendb-%s-%s", strings.ToLower(version),
replaceUnderSource(r.TaskId)),
Lables: map[string]string{"task_id": replaceUnderSource(r.TaskId),
"request_id": requestId},
RootPwd: r.TaskId[0:4],
Args: r.BuildStartArgs(),
Charset: r.MySQLCharSet,
}
return tsk
}

func run(task SimulationTask, tkType string) {
Expand All @@ -154,6 +315,21 @@ func run(task SimulationTask, tkType string) {
return
}
}()
doneChan := make(chan struct{})
ticker := time.NewTicker(time.Duration(HeartbeatInterval) * time.Second)
go func() {
for {
select {
case <-ticker.C:
model.UpdateHeartbeat(task.TaskId)
case <-doneChan:
logger.Info("simulation run done")
return
}
}
}()
// 关闭协程
defer func() { ticker.Stop(); doneChan <- struct{}{} }()
xlogger := task.getXlogger()
// create Pod
model.UpdatePhase(task.TaskId, task.MySQLVersion, model.PhaseCreatePod)
Expand Down Expand Up @@ -201,21 +377,7 @@ func (t *SimulationTask) getDbsExcludeSysDb() (err error) {
func (t *SimulationTask) SimulationRun(containerName string, xlogger *logger.Logger) (sstdout, sstderr string,
err error) {
logger.Info("will execute in %s", containerName)
doneChan := make(chan struct{})
ticker := time.NewTicker(30 * time.Second)
go func() {
for {
select {
case <-ticker.C:
model.UpdateHeartbeat(t.TaskId, sstderr, sstdout)
case <-doneChan:
logger.Info("simulation run done")
return
}
}
}()
// 关闭协程
defer func() { ticker.Stop(); doneChan <- struct{}{} }()

model.UpdatePhase(t.TaskId, t.MySQLVersion, model.PhaseLoadSchema)
// Load schema SQL
sstdout, sstderr, err = t.loadSchemaSQL(containerName)
Expand Down Expand Up @@ -383,3 +545,56 @@ func (t *SimulationTask) executeMultFilesObject(e ExcuteSQLFileObjV2, containerN
}
return
}

// GetImgFromMySQLVersion 根据版本获取模拟执行运行的镜像配置
func GetImgFromMySQLVersion(version string) (img string, err error) {
img, errx := model.GetImageName("mysql", version)
if errx == nil {
logger.Info("get image from db img config: %s", img)
return img, nil
}
switch {
case regexp.MustCompile("5.5").MatchString(version):
return config.GAppConfig.Image.Tendb55Img, nil
case regexp.MustCompile("5.6").MatchString(version):
return config.GAppConfig.Image.Tendb56Img, nil
case regexp.MustCompile("5.7").MatchString(version):
return config.GAppConfig.Image.Tendb57Img, nil
case regexp.MustCompile("8.0").MatchString(version):
return config.GAppConfig.Image.Tendb80Img, nil
default:
return "", fmt.Errorf("not match any version")
}
}

// GetSpiderAndTdbctlImg TODO
func GetSpiderAndTdbctlImg(spiderVersion, tdbctlVersion string) (spiderImg, tdbctlImg string) {
return getSpiderImg(spiderVersion), getTdbctlImg(tdbctlVersion)
}

const (
// LatestVersion latest version
LatestVersion = "latest"
)

func getSpiderImg(version string) (img string) {
if lo.IsEmpty(version) {
version = LatestVersion
}
img, errx := model.GetImageName("spider", version)
if errx == nil {
return img
}
return config.GAppConfig.Image.SpiderImg
}

func getTdbctlImg(version string) (img string) {
if lo.IsEmpty(version) {
version = LatestVersion
}
img, errx := model.GetImageName("tdbctl", version)
if errx == nil {
return img
}
return config.GAppConfig.Image.TdbCtlImg
}
6 changes: 3 additions & 3 deletions dbm-services/mysql/db-simulation/handler/dbsimulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TendbSimulation(r *gin.Context) {
return
}
version := param.MySQLVersion
img, err := getImgFromMySQLVersion(version)
img, err := service.GetImgFromMySQLVersion(version)
if err != nil {
logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error())
SendResponse(r, err, nil, requestID)
Expand Down Expand Up @@ -103,7 +103,7 @@ func TendbClusterSimulation(r *gin.Context) {
return
}
version := param.MySQLVersion
img, err := getImgFromMySQLVersion(version)
img, err := service.GetImgFromMySQLVersion(version)
if err != nil {
logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error())
SendResponse(r, err, nil, RequestID)
Expand All @@ -126,7 +126,7 @@ func TendbClusterSimulation(r *gin.Context) {
logger.Info("the pwd %s", rootPwd)
}
tsk.DbImage = img
tsk.SpiderImage, tsk.TdbCtlImage = getSpiderAndTdbctlImg(param.SpiderVersion, LatestVersion)
tsk.SpiderImage, tsk.TdbCtlImage = service.GetSpiderAndTdbctlImg(param.SpiderVersion, service.LatestVersion)
tsk.BaseInfo = &service.MySQLPodBaseInfo{
PodName: fmt.Sprintf("spider-%s-%s", strings.ToLower(version),
replaceUnderSource(param.TaskId)),
Expand Down
Loading

0 comments on commit 3ad7be7

Please sign in to comment.