Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dbm-services): 重启服务后重载重启前的运行中的模拟执行任务 #8123 #8176

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions dbm-services/go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,8 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/boombuler/barcode v1.0.0 h1:s1TvRnXwL2xJRaccrdcBQMZxq6X7DvsMogtmJeHDdrc=
github.com/bshuster-repo/logrus-logstash-hook v0.4.1 h1:pgAtgj+A31JBVtEHu2uHuEx0n+2ukqUJnS2vVe5pQNA=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd h1:rFt+Y/IK1aEZkEHchZRSq9OQbsSzIT/OrI8YFFmRIng=
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b h1:otBG+dV+YK+Soembjv71DPz3uX/V/6MMlSyD9JBQ6kQ=
Expand Down Expand Up @@ -1116,8 +1118,6 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9mImVMaBPw9L/0TBHU8=
github.com/fsouza/fake-gcs-server v1.17.0 h1:OeH75kBZcZa3ZE+zz/mFdJ2btt9FgqfjI7gIh9+5fvk=
Expand Down Expand Up @@ -1199,8 +1199,6 @@ github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/enterprise-certificate-proxy v0.2.3 h1:yk9/cqRKtT9wXZSsRH9aurXEpJX+U6FLtpYTdC3R06k=
Expand Down Expand Up @@ -1453,8 +1451,6 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e h1:aoZm08cpOy4WuID//EZDgc
github.com/posener/complete v1.2.3 h1:NP0eAhjcjImqslEwo/1hq7gpajME0fTLTezBKDqfXqo=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 h1:0XM1XL/OFFJjXsYXlG30spTkV/E9+gmd5GD1w2HE8xM=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
Expand Down
7 changes: 7 additions & 0 deletions dbm-services/mysql/db-simulation/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type AppConfig struct {
TdbctlPodResource TdbctlPodResource `yaml:"tdbctlPodResource"`
SimulationNodeLables []LabelItem `yaml:"simulationNodeLables"`
SimulationtaintLables []LabelItem `yaml:"simulationtaintLables"`
Redis RedisDb `yaml:"redis"`
}

// BkRepoConfig bkrepo config
Expand Down Expand Up @@ -106,6 +107,12 @@ type ImgConfig struct {
Image string `yaml:"image"`
}

// RedisDb redis
type RedisDb struct {
Addr string `yaml:"addr"`
Password string `yaml:"password"`
}

func init() {
viper.AutomaticEnv()
GAppConfig.ListenAddr = "0.0.0.0:80"
Expand Down
241 changes: 223 additions & 18 deletions dbm-services/mysql/db-simulation/app/service/simulation_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,36 @@
package service

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"os"
"regexp"
"slices"
"strings"
"time"

"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

util "dbm-services/common/go-pubpkg/cmutil"
"dbm-services/common/go-pubpkg/logger"
"dbm-services/mysql/db-simulation/app"
"dbm-services/mysql/db-simulation/app/config"
"dbm-services/mysql/db-simulation/model"
)

// DelPod 控制运行模拟执行后是否删除拉起的Pod的开关
// 用于保留现场排查问题
var DelPod = true

// HeartbeatInterval 心跳时间间隔
var HeartbeatInterval = 15

// BaseParam 请求模拟执行的基础参数
type BaseParam struct {
//nolint
Expand All @@ -49,6 +61,24 @@ type BaseParam struct {
ExcuteObjects []ExcuteSQLFileObjV2 `json:"execute_objects" binding:"gt=0,dive,required"`
}

// BuildTendbPodName build tendb pod name
func (b BaseParam) BuildTendbPodName() string {
podName := fmt.Sprintf("tendb-%s-%s", strings.ToLower(b.MySQLVersion),
replaceUnderSource(b.TaskId))
return podName
}

// BuildSpiderPodName build spider pod name
func (b BaseParam) BuildSpiderPodName() string {
podName := fmt.Sprintf("spider-%s-%s", strings.ToLower(b.MySQLVersion),
replaceUnderSource(b.TaskId))
return podName
}

func replaceUnderSource(str string) string {
return strings.ReplaceAll(str, "_", "-")
}

// BuildStartArgs mysql pod start args
func (b BaseParam) BuildStartArgs() []string {
if len(b.MySQLStartConfigs) == 0 {
Expand Down Expand Up @@ -114,13 +144,12 @@ var SpiderTaskChan chan SimulationTask
// CtrlChan 并发控制
var ctrlChan chan struct{}

var rdb *redis.Client

func init() {
TaskChan = make(chan SimulationTask, 100)
SpiderTaskChan = make(chan SimulationTask, 100)
ctrlChan = make(chan struct{}, 30)
}

func init() {
timer := time.NewTicker(60 * time.Second)
go func() {
for {
Expand All @@ -134,6 +163,128 @@ func init() {
}
}
}()
logger.Info("redis addr %s", config.GAppConfig.Redis.Addr)
rdb = redis.NewClient(&redis.Options{
Addr: config.GAppConfig.Redis.Addr,
Password: config.GAppConfig.Redis.Password,
DB: 0,
})
go func() {
// 等待一个周期的原因,避免重载执行正常的任务
time.Sleep(time.Duration(HeartbeatInterval) * time.Second)
reloadRunningTaskFromdb()
rdb.Close()
}()
}

// ReloadParam reload running task from db
type ReloadParam struct {
BaseParam
SpiderVersion *string `json:"spider_version,omitempty"`
}

// reloadRunningTaskFromdb 重载重启服务前运行的任务 加锁是避免多个服务同时启动,避免相同任务被重载
func reloadRunningTaskFromdb() {
key := "simulation:reload:lock"
locker := redislock.New(rdb)
ctx := context.Background()
rlock, err := locker.Obtain(ctx, key, 60*time.Second, nil)
if err != nil {
logger.Error("obtain lock failed %v", err)
return
}
defer func() {
// nolint
rlock.Release(ctx)
}()
var tks []model.TbSimulationTask
if err := model.DB.Model(model.TbSimulationTask{}).Where(
//nolint
"phase not in (?) and create_time > DATE_SUB(NOW(),INTERVAL 6 HOUR) and time_to_sec(timediff(heartbeat_time,now())) > ? ",
[]string{model.PhaseDone, model.PhaseReloading}, HeartbeatInterval).Scan(&tks).Error; err != nil {
logger.Error("get running task failed %s", err.Error())
return
}
if len(tks) == 0 {
logger.Info("no need reload running task")
return
}
for _, tk := range tks {
var err error
var req model.TbRequestRecord
var p ReloadParam
var podName string
if err = model.DB.Model(model.TbRequestRecord{}).Where("request_id = ? ", tk.RequestID).Find(&req).
Error; err != nil {
logger.Error("get request content failed %s", err.Error())
//nolint
model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "")
continue
}
if err = json.Unmarshal([]byte(req.RequestBody), &p); err != nil {
logger.Error("get request content failed %s", err.Error())
//nolint
model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "")
continue
}
if p.SpiderVersion == nil {
podName = p.BuildTendbPodName()
} else {
podName = p.BuildSpiderPodName()
}
// delete old pod
var gracePeriodSeconds int64
if slices.Contains([]string{model.PhaseCreatePod, model.PhaseLoadSchema, model.PhaseRunning}, tk.Phase) {
err = Kcs.Cli.CoreV1().Pods(Kcs.Namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
})
if err != nil {
logger.Error("delete pod failed %s", err.Error())
//nolint
model.CompleteTask(tk.TaskId, tk.MySQLVersion, model.TaskFailed, "", "", "")
continue
}
}
// wait pod delete
time.Sleep(3 * time.Second)
logger.Info("loading task %s", tk.TaskId)
model.UpdatePhase(tk.TaskId, tk.MySQLVersion, model.PhaseReloading)
if p.SpiderVersion != nil {
SpiderTaskChan <- p.BuildTsk(tk.RequestID)
} else {
TaskChan <- p.BuildTsk(tk.RequestID)
}
}
}

// BuildTsk build read load task
func (r ReloadParam) BuildTsk(requestId string) (tsk SimulationTask) {
tsk = SimulationTask{
RequestId: requestId,
DbPodSets: NewDbPodSets(),
BaseParam: &r.BaseParam,
Version: r.MySQLVersion,
}
version := r.MySQLVersion
img, err := GetImgFromMySQLVersion(version)
if err != nil {
logger.Error("GetImgFromMySQLVersion %s failed:%s", version, err.Error())
return
}
tsk.DbImage = img
if r.SpiderVersion != nil {
tsk.SpiderImage, tsk.TdbCtlImage = GetSpiderAndTdbctlImg(*r.SpiderVersion, LatestVersion)
}
tsk.BaseInfo = &MySQLPodBaseInfo{
PodName: fmt.Sprintf("tendb-%s-%s", strings.ToLower(version),
replaceUnderSource(r.TaskId)),
Lables: map[string]string{"task_id": replaceUnderSource(r.TaskId),
"request_id": requestId},
RootPwd: r.TaskId[0:4],
Args: r.BuildStartArgs(),
Charset: r.MySQLCharSet,
}
return tsk
}

func run(task SimulationTask, tkType string) {
Expand All @@ -154,6 +305,21 @@ func run(task SimulationTask, tkType string) {
return
}
}()
doneChan := make(chan struct{})
ticker := time.NewTicker(time.Duration(HeartbeatInterval) * time.Second)
go func() {
for {
select {
case <-ticker.C:
model.UpdateHeartbeat(task.TaskId)
case <-doneChan:
logger.Info("simulation run done")
return
}
}
}()
// 关闭协程
defer func() { ticker.Stop(); doneChan <- struct{}{} }()
xlogger := task.getXlogger()
// create Pod
model.UpdatePhase(task.TaskId, task.MySQLVersion, model.PhaseCreatePod)
Expand Down Expand Up @@ -201,21 +367,7 @@ func (t *SimulationTask) getDbsExcludeSysDb() (err error) {
func (t *SimulationTask) SimulationRun(containerName string, xlogger *logger.Logger) (sstdout, sstderr string,
err error) {
logger.Info("will execute in %s", containerName)
doneChan := make(chan struct{})
ticker := time.NewTicker(30 * time.Second)
go func() {
for {
select {
case <-ticker.C:
model.UpdateHeartbeat(t.TaskId, sstderr, sstdout)
case <-doneChan:
logger.Info("simulation run done")
return
}
}
}()
// 关闭协程
defer func() { ticker.Stop(); doneChan <- struct{}{} }()

model.UpdatePhase(t.TaskId, t.MySQLVersion, model.PhaseLoadSchema)
// Load schema SQL
sstdout, sstderr, err = t.loadSchemaSQL(containerName)
Expand Down Expand Up @@ -383,3 +535,56 @@ func (t *SimulationTask) executeMultFilesObject(e ExcuteSQLFileObjV2, containerN
}
return
}

// GetImgFromMySQLVersion 根据版本获取模拟执行运行的镜像配置
func GetImgFromMySQLVersion(version string) (img string, err error) {
img, errx := model.GetImageName("mysql", version)
if errx == nil {
logger.Info("get image from db img config: %s", img)
return img, nil
}
switch {
case regexp.MustCompile("5.5").MatchString(version):
return config.GAppConfig.Image.Tendb55Img, nil
case regexp.MustCompile("5.6").MatchString(version):
return config.GAppConfig.Image.Tendb56Img, nil
case regexp.MustCompile("5.7").MatchString(version):
return config.GAppConfig.Image.Tendb57Img, nil
case regexp.MustCompile("8.0").MatchString(version):
return config.GAppConfig.Image.Tendb80Img, nil
default:
return "", fmt.Errorf("not match any version")
}
}

// GetSpiderAndTdbctlImg TODO
func GetSpiderAndTdbctlImg(spiderVersion, tdbctlVersion string) (spiderImg, tdbctlImg string) {
return getSpiderImg(spiderVersion), getTdbctlImg(tdbctlVersion)
}

const (
// LatestVersion latest version
LatestVersion = "latest"
)

func getSpiderImg(version string) (img string) {
if lo.IsEmpty(version) {
version = LatestVersion
}
img, errx := model.GetImageName("spider", version)
if errx == nil {
return img
}
return config.GAppConfig.Image.SpiderImg
}

func getTdbctlImg(version string) (img string) {
if lo.IsEmpty(version) {
version = LatestVersion
}
img, errx := model.GetImageName("tdbctl", version)
if errx == nil {
return img
}
return config.GAppConfig.Image.TdbCtlImg
}
4 changes: 4 additions & 0 deletions dbm-services/mysql/db-simulation/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ require (
)

require (
github.com/bsm/redislock v0.9.4 // indirect
github.com/bytedance/sonic v1.10.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
Expand Down Expand Up @@ -61,6 +64,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/redis/go-redis/v9 v9.0.3 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down
Loading