Skip to content

Commit

Permalink
refactor(dbm-services): dbactor 代码优化 TencentBlueKing#8586
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Dec 13, 2024
1 parent 3190702 commit 9f7c111
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -659,9 +659,9 @@ func (c *CutOverCtx) getSlaveSpiderServers() (slaveSpiderServers []native.Server
}

// CheckSpiderAppProcesslist TODO
func (ctx *CutOverCtx) CheckSpiderAppProcesslist() (err error) {
for addr, spider_conn := range ctx.spidersConn {
pls, err := spider_conn.ShowApplicationProcesslist(ctx.sysUsers)
func (c *CutOverCtx) CheckSpiderAppProcesslist() (err error) {
for addr, spider_conn := range c.spidersConn {
pls, err := spider_conn.ShowApplicationProcesslist(c.sysUsers)
if err != nil {
return err
}
Expand All @@ -672,8 +672,8 @@ func (ctx *CutOverCtx) CheckSpiderAppProcesslist() (err error) {
return
}

func (ctx *CutOverCtx) lockaAllSpidersWrite() (err error) {
for addr, lockConn := range ctx.spidersLockConn {
func (c *CutOverCtx) lockaAllSpidersWrite() (err error) {
for addr, lockConn := range c.spidersLockConn {
_, err = lockConn.ExecContext(context.Background(), "set lock_wait_timeout = 10;")
if err != nil {
return fmt.Errorf("set lock_wait_timeout at %s failed,err:%w", addr, err)
Expand All @@ -693,8 +693,8 @@ func (ctx *CutOverCtx) lockaAllSpidersWrite() (err error) {
}

// Unlock TODO
func (ctx *CutOverCtx) Unlock() (err error) {
for addr, lockConn := range ctx.spidersLockConn {
func (c *CutOverCtx) Unlock() (err error) {
for addr, lockConn := range c.spidersLockConn {
err = cmutil.Retry(cmutil.RetryConfig{
Times: 3,
DelayTime: 1 * time.Second,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ func (c *CheckTdbctlWithSpideSchemaComp) RunCheck() (err error) {
for tb := range map1 {
if _, exist := map2[tb]; !exist {
msg := fmt.Sprintf("%s frm文件在 tdbctl 中不存在,请确认\n", tb)
globalErrs = append(globalErrs, fmt.Errorf(msg))
globalErrs = append(globalErrs, fmt.Errorf("%s", msg))
logger.Error(msg)
}
}
if dbdirSpiderCount != dbdirTdbctlCount {
msg := fmt.Sprintf("【%s】库上的表数量不一致,【tdbctl】节点上表总数量:%d, 【spider】节点上表的总数量 count:%d\n", db, dbdirTdbctlCount,
dbdirSpiderCount)
globalErrs = append(globalErrs, fmt.Errorf(msg))
globalErrs = append(globalErrs, fmt.Errorf("%s", msg))
logger.Error(msg)
}
spiderDbSchemaCountMap[db] = dbdirSpiderCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (i *InitClusterRoutingComp) OnlyInitTdbctl() (err error) {
execSQLs = append(execSQLs, "set tc_admin=1;")
execSQLs = append(execSQLs, i.getTdbctlRouterSqls()...)
execSQLs = append(execSQLs, "tdbctl enable primary;")
if _, err := i.tdbCtlConn.ExecMore(execSQLs); err != nil {
if _, err = i.tdbCtlConn.ExecMore(execSQLs); err != nil {
logger.Error("tdbctl create node failed:[%s]", err.Error())
return err
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func (i *InitClusterRoutingComp) AddAppendDeployInitRouter() (err error) {
execSQLs = append(execSQLs, i.getRemoteRouterSqls()...)
execSQLs = append(execSQLs, i.getSpiderRouterSqls()...)
execSQLs = append(execSQLs, "tdbctl enable primary;")
if _, err := i.tdbCtlConn.ExecMore(execSQLs); err != nil {
if _, err = i.tdbCtlConn.ExecMore(execSQLs); err != nil {
logger.Error("tdbctl create node failed:[%s]", err.Error())
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"fmt"
"time"

"github.com/jmoiron/sqlx"

"dbm-services/common/go-pubpkg/cmutil"
"dbm-services/common/go-pubpkg/logger"
"dbm-services/mysql/db-tools/dbactuator/pkg/components"
Expand Down Expand Up @@ -47,6 +49,7 @@ type CheckObject struct {
}
type tableSchemaCheckCtx struct {
tdbCtlConn *native.TdbctlDbWork
xconn *sqlx.Conn
version string
}

Expand Down Expand Up @@ -92,16 +95,21 @@ func (r *TableSchemaCheckComp) Init() (err error) {
return err
}
r.tdbCtlConn = &native.TdbctlDbWork{DbWorker: *conn}
r.xconn, err = r.tdbCtlConn.GetSqlxDb().Connx(context.Background())
if err != nil {
logger.Error("get xconn error: %v")
return err
}
// init checksum table schema
if _, err = r.tdbCtlConn.ExecMore([]string{"set tc_admin = 0;", "use infodba_schema;",
TsccSchemaChecksum}); err != nil {
logger.Error("init tscc_schema_checksum error: %v", err)
return
return err
}
r.version, err = r.tdbCtlConn.SelectVersion()
if err != nil {
logger.Error("get version error: %v", err)
return
return err
}
return err
}
Expand Down Expand Up @@ -146,18 +154,13 @@ func (r *TableSchemaCheckComp) checkAll() (err error) {
}

func (r *TableSchemaCheckComp) atomUpdateDbTables(dbName string) (err error) {
xconn, err := r.tdbCtlConn.GetSqlxDb().Connx(context.Background())
if err != nil {
return err
}
defer xconn.Close()
_, err = xconn.ExecContext(context.Background(), "set tc_admin = 1;")
_, err = r.xconn.ExecContext(context.Background(), "set tc_admin = 1;")
if err != nil {
logger.Error("set tc_admin = 1 failed error: %v", err)
return err
}
var result native.SchemaCheckResults
if err = xconn.SelectContext(context.Background(), &result, fmt.Sprintf("TDBCTL CHECK DATABASE `%s`;",
if err = r.xconn.SelectContext(context.Background(), &result, fmt.Sprintf("TDBCTL CHECK DATABASE `%s`;",
dbName)); err != nil {
logger.Error("check table schema: %s", err.Error())
return err
Expand Down Expand Up @@ -193,21 +196,15 @@ func (r *TableSchemaCheckComp) atomUpdateTables(dbName string, tables []string)
if len(tables) == 0 {
return nil
}
xconn, err := r.tdbCtlConn.GetSqlxDb().Connx(context.Background())
if err != nil {
logger.Error("get sqlx conn failed: %s", err.Error())
return err
}
defer xconn.Close()
_, err = xconn.ExecContext(context.Background(), "set tc_admin = 1;")
_, err = r.xconn.ExecContext(context.Background(), "set tc_admin = 1;")
if err != nil {
logger.Error("set tc_admin = 1 failed error: %v", err)
return err
}
for _, table := range tables {
// check table schema
var result native.SchemaCheckResults
if err = xconn.SelectContext(context.Background(), &result, fmt.Sprintf("tdbctl check `%s`.`%s`;", dbName,
if err = r.xconn.SelectContext(context.Background(), &result, fmt.Sprintf("tdbctl check `%s`.`%s`;", dbName,
table)); err != nil {
logger.Error("check table schema error: %s", err.Error())
return err
Expand All @@ -221,7 +218,7 @@ func (r *TableSchemaCheckComp) atomUpdateTables(dbName string, tables []string)

func (r *TableSchemaCheckComp) atomUpdateCheckResult(db, tbl string, inconsistentItems []native.SchemaCheckResult) (
err error) {
_, err = r.tdbCtlConn.Exec("set tc_admin=0;")
_, err = r.xconn.ExecContext(context.TODO(), "set tc_admin=0;")
if err != nil {
logger.Error("set tc_admin=0 failed error: %v", err)
return err
Expand All @@ -237,7 +234,8 @@ func (r *TableSchemaCheckComp) atomUpdateCheckResult(db, tbl string, inconsisten
return
}
}
if _, err = r.tdbCtlConn.Exec("replace into infodba_schema.tscc_schema_checksum values(?,?,?,?,?)", db,
if _, err = r.xconn.ExecContext(context.TODO(), "replace into infodba_schema.tscc_schema_checksum values(?,?,?,?,?)",
db,
tbl, status,
checkResult,
time.Now()); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type TableSchemaRepairParam struct {
}
type tableSchemaRepairCtx struct {
tdbCtlConn *native.TdbctlDbWork
taskdir string
svrNameServersMap map[SVRNAME]native.Server
primarySpts []native.Server
spiderSpts []native.Server
Expand Down Expand Up @@ -108,7 +107,7 @@ func (r *TableSchemaRepairComp) RunAutoFix() (err error) {
logger.Error("get abnormal schema checksum failed, err: %v", err)
return err
}
if len(abnormalChecksums) <= 0 {
if len(abnormalChecksums) == 0 {
logger.Info("no abnormal table structure check record was found,bye~")
return nil
}
Expand Down

0 comments on commit 9f7c111

Please sign in to comment.