Skip to content

Commit

Permalink
fix(mysql): mysql-monitor移除context timeout TencentBlueKing#8730
Browse files Browse the repository at this point in the history
  • Loading branch information
xfwduke committed Dec 20, 2024
1 parent b553a46 commit d8b1a92
Show file tree
Hide file tree
Showing 52 changed files with 440 additions and 305 deletions.
2 changes: 2 additions & 0 deletions dbm-services/common/reverse-api/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cmd
cmd/*
23 changes: 23 additions & 0 deletions dbm-services/common/reverse-api/apis/common/list_nginx_addrs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package common

import (
"dbm-services/common/reverse-api/config"
"dbm-services/common/reverse-api/internal"
"encoding/json"

"github.com/pkg/errors"
)

func ListNginxAddrs(bkCloudId int) ([]string, error) {
data, err := internal.ReverseCall(config.ReverseApiCommonListNginxAddrs, bkCloudId)
if err != nil {
return nil, errors.Wrap(err, "failed to call ListNginxAddrs")
}

var addrs []string
if err := json.Unmarshal(data, &addrs); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal ListNginxAddrs")
}

return addrs, nil
}
56 changes: 56 additions & 0 deletions dbm-services/common/reverse-api/apis/mysql/list_instance_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package mysql

import (
"dbm-services/common/reverse-api/config"
"dbm-services/common/reverse-api/internal"
"encoding/json"

"github.com/pkg/errors"
)

const (
AccessLayerStorage string = "storage"
AccessLayerProxy string = "proxy"
)

type instanceAddr struct {
Ip string `json:"ip"`
Port int `json:"port"`
}

type commonInstanceInfo struct {
instanceAddr
ImmuteDomain string `json:"immute_domain"`
Phase string `json:"phase"`
Status string `json:"status"`
AccessLayer string `json:"access_layer"`
MachineType string `json:"machine_type"`
}

type StorageInstanceInfo struct {
commonInstanceInfo
IsStandBy bool `json:"is_stand_by"`
InstanceRole string `json:"instance_role"`
InstanceInnerRole string `json:"instance_inner_role"`
Receivers []instanceAddr `json:"receivers"`
Ejectors []instanceAddr `json:"ejectors"`
}

type ProxyInstanceInfo struct {
commonInstanceInfo
StorageInstanceList []instanceAddr `json:"storage_instance_list"`
}

func ListInstanceInfo(bkCloudId int, ports ...int) ([]byte, string, error) {
data, err := internal.ReverseCall(config.ReverseApiMySQLListInstanceInfo, bkCloudId, ports...)
if err != nil {
return nil, "", errors.Wrap(err, "failed to call ListInstanceInfo")
}
var r []commonInstanceInfo
err = json.Unmarshal(data, &r)
if err != nil {
return nil, "", errors.Wrap(err, "failed to unmarshal ListInstanceInfo")
}

return data, r[0].AccessLayer, nil
}
6 changes: 6 additions & 0 deletions dbm-services/common/reverse-api/config/apis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package config

const (
ReverseApiCommonListNginxAddrs = "common/list_nginx_addrs"
ReverseApiMySQLListInstanceInfo = "mysql/list_instance_info"
)
11 changes: 11 additions & 0 deletions dbm-services/common/reverse-api/config/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package config

const CommonConfigDir = "/home/mysql/common_config"
const NginxProxyAddrsFileName = "nginx_proxy.list"
const ReverseApiBase = "apis/proxypass/reverse_api"

type ReverseApiName string

func (c ReverseApiName) String() string {
return string(c)
}
3 changes: 3 additions & 0 deletions dbm-services/common/reverse-api/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module dbm-services/common/reverse-api

go 1.21.11
1 change: 1 addition & 0 deletions dbm-services/common/reverse-api/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package reverse_api
11 changes: 11 additions & 0 deletions dbm-services/common/reverse-api/internal/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package internal

import "encoding/json"

type apiResponse struct {
Result bool `json:"result"`
Code int `json:"code"`
Message string `json:"message"`
Errors string `json:"errors"`
Data json.RawMessage `json:"data"`
}
103 changes: 103 additions & 0 deletions dbm-services/common/reverse-api/internal/reverse_call.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package internal

import (
"bufio"
"dbm-services/common/reverse-api/config"
"encoding/json"
errs "errors"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"

"github.com/pkg/errors"
)

func ReverseCall(api config.ReverseApiName, bkCloudId int, ports ...int) (data []byte, err error) {
addrs, err := readNginxProxyAddrs()
if err != nil {
return nil, errors.Wrap(err, "failed to read nginx proxy addresses")
}

var errCollect []error
for _, addr := range addrs {
apiPath, _ := url.JoinPath(config.ReverseApiBase, api.String(), "/")
ep := url.URL{
Scheme: "http",
Host: addr,
Path: apiPath,
}

req, err := http.NewRequest(http.MethodGet, ep.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create request")
}

q := req.URL.Query()
q.Add("bk_cloud_id", strconv.Itoa(bkCloudId))
for _, port := range ports {
q.Add("port", strconv.Itoa(port))
}
req.URL.RawQuery = q.Encode()

data, err = do(req)
if err == nil {
return data, nil
}
errCollect = append(errCollect, err)
}

return nil, errs.Join(errCollect...)
}

func do(request *http.Request) (data []byte, err error) {
resp, err := http.DefaultClient.Do(request)
if err != nil {
return nil, errors.Wrap(err, "failed to send request")
}
defer func() {
_ = resp.Body.Close()
}()

b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "failed to read response body")
}

if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(b))
}

var r apiResponse
err = json.Unmarshal(b, &r)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal response body")
}

if !r.Result {
return nil, errors.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, r.Errors)
}

return r.Data, nil
}

func readNginxProxyAddrs() (addrs []string, err error) {
f, err := os.Open(filepath.Join(config.CommonConfigDir, config.NginxProxyAddrsFileName))
if err != nil {
return nil, errors.Wrap(err, "failed to open nginx proxy addrs")
}
defer func() {
_ = f.Close()
}()

scanner := bufio.NewScanner(f)
for scanner.Scan() {
addrs = append(addrs, scanner.Text())
}
if err := scanner.Err(); err != nil {
return nil, errors.Wrap(err, "failed to read nginx proxy addrs")
}
return addrs, nil
}
1 change: 1 addition & 0 deletions dbm-services/go.work
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use (
mongodb/db-tools/mongo-toolkit-go
mongodb/db-tools/dbactuator
common/db-dns/dns-api/pkg
common/reverse-api
)

replace github.com/go-sql-driver/mysql => github.com/go-sql-driver/mysql v1.7.1
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"encoding/json"
"log/slog"

"github.com/pkg/errors"
)
Expand All @@ -15,6 +16,7 @@ type JobDefine struct {
Creator string `json:"creator"`
Enable bool `json:"enable"`
WorkDir string `json:"work_dir"`
Overlap bool `json:"overlap"`
}

// CreateOrReplace TODO
Expand All @@ -26,6 +28,8 @@ func (m *Manager) CreateOrReplace(job JobDefine, permanent bool) (int, error) {
Job: job,
Permanent: permanent,
}
slog.Info("CreateOrReplace", slog.Any("job", job))

resp, err := m.do("/create_or_replace", "POST", body)
if err != nil {
return 0, errors.Wrap(err, "manager call /create_or_replace")
Expand Down
45 changes: 30 additions & 15 deletions dbm-services/mysql/db-tools/mysql-crond/pkg/config/job_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ExternalJob struct {
Schedule string `yaml:"schedule" json:"schedule" binding:"required" validate:"required"`
Creator string `yaml:"creator" json:"creator" binding:"required" validate:"required"`
WorkDir string `yaml:"work_dir" json:"work_dir"`
Overlap bool `yaml:"overlap" json:"overlap"` // 是否允许作业重叠执行, 默认 false
// JobID 这个 id 主要用于追溯哪个 cron job (如果有) 调起本 external job
JobID cron.EntryID `yaml:"-" json:"-"`
ch chan struct{}
Expand Down Expand Up @@ -87,23 +88,35 @@ func (j *ExternalJob) run() {

// Run TODO
func (j *ExternalJob) Run() {
select {
case v := <-j.ch:
slog.Info(
"run job",
slog.String("name", j.Name),
slog.Bool("overlap", j.Overlap),
)

if j.Overlap {
j.run()
j.ch <- v
default:
slog.Warn("skip job", slog.String("name", j.Name))
err := SendEvent(
mysqlCrondEventName,
fmt.Sprintf("%s skipt for last round use too much time", j.Name),
map[string]interface{}{
"job_name": j.Name,
},
)
if err != nil {
slog.Error("send event", slog.String("error", err.Error()))
} else {
select {
case v := <-j.ch:
j.run()
j.ch <- v
default:
slog.Warn("skip job", slog.String("name", j.Name))
err := SendEvent(
mysqlCrondEventName,
fmt.Sprintf("%s skipt for last round use too much time", j.Name),
map[string]interface{}{
"job_name": j.Name,
},
)
if err != nil {
slog.Error("send event", slog.String("error", err.Error()))
}
}
}

slog.Info("finish job", slog.String("name", j.Name))
}

// SetupChannel TODO
Expand Down Expand Up @@ -165,7 +178,9 @@ func InitJobsConfig() error {
panic(err)
}

j.SetupChannel()
if !j.Overlap {
j.SetupChannel()
}
}
return nil
}
6 changes: 6 additions & 0 deletions dbm-services/mysql/db-tools/mysql-crond/pkg/crond/crond.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package crond

import (
"dbm-services/mysql/db-tools/mysql-crond/pkg/third_party"
"log/slog"
"sync"

Expand Down Expand Up @@ -91,6 +92,11 @@ func Start() error {
slog.Info("add heart beat job", slog.Int("entry id", int(entryID)))
}

// 第三方
for _, rg := range third_party.ThirdPartyRegisters {
rg(cronJob)
}

cronJob.Start()
slog.Info("crond start")
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,9 @@ func CreateOrReplace(j *config.ExternalJob, permanent bool) (int, error) {
)
return 0, err
}
slog.Info(
"create or replace job",
slog.Any("job", j),
)
return entryID, nil
}
Loading

0 comments on commit d8b1a92

Please sign in to comment.