Skip to content

Commit

Permalink
fix(dbm-services): check table schema use one conn TencentBlueKing#8566
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Dec 12, 2024
1 parent 9fe2ad6 commit 96f804b
Showing 1 changed file with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package tscc

import (
"context"
"encoding/json"
"fmt"
"log/slog"
Expand Down Expand Up @@ -49,7 +50,8 @@ const (
)

type tableSchemaConsistencyCheck struct {
ctldb *sqlx.DB
ctldb *sqlx.DB
ctlconn *sqlx.Conn
}

// Name TODO
Expand Down Expand Up @@ -110,16 +112,23 @@ func (c *tableSchemaConsistencyCheck) Run() (msg string, err error) {
slog.Info("is not primary tdbctl,no need to run")
return "", nil
}
ctlconn, err := c.ctldb.Connx(context.TODO())
if err != nil {
slog.Error("connect to ctl failed", slog.String("error", err.Error()))
return
}
c.ctlconn = ctlconn
initSQLs := []string{"set tc_admin = 0;", "use infodba_schema;"}
initSQLs = append(initSQLs, schemas...)

for _, sqlStr := range initSQLs {
if _, err = c.ctldb.Exec(sqlStr); err != nil {
if _, err = c.ctlconn.ExecContext(context.TODO(), sqlStr); err != nil {
slog.Error("exec init sql:", sqlStr, "err: %v", sqlStr, err)
return
}
}
var count int
err = c.ctldb.Get(&count, "select count(*) from tscc_pending_execute_tbl")
err = c.ctlconn.GetContext(context.TODO(), &count, "select count(*) from tscc_pending_execute_tbl")
if err != nil {
slog.Error("query pending execute check table failed", slog.String("error", err.Error()))
return
Expand All @@ -131,7 +140,7 @@ func (c *tableSchemaConsistencyCheck) Run() (msg string, err error) {
slog.Error("get check tables failed", slog.String("error", err.Error()))
return msg, err
}
_, err = c.ctldb.Exec(c.ctldb.Rebind(query), args...)
_, err = c.ctlconn.ExecContext(context.TODO(), c.ctlconn.Rebind(query), args...)
if err != nil {
return msg, err
}
Expand All @@ -143,7 +152,7 @@ func (c *tableSchemaConsistencyCheck) Run() (msg string, err error) {
for {
var tblRows []TsccPendingExecuteTbl
var subErrCnt int
err = c.ctldb.Select(&tblRows, "select * from tscc_pending_execute_tbl limit 500")
err = c.ctlconn.SelectContext(context.TODO(), &tblRows, "select * from tscc_pending_execute_tbl limit 500")
if err != nil {
slog.Error("failed to query the table to be verified", slog.String("error", err.Error()))
errChan <- err
Expand All @@ -155,8 +164,9 @@ func (c *tableSchemaConsistencyCheck) Run() (msg string, err error) {
}
for _, tblRow := range tblRows {
var result SchemaCheckResults
c.ctldb.Exec("set tc_admin = 1;")
err = c.ctldb.Select(&result, fmt.Sprintf("tdbctl check table `%s`.`%s`;", tblRow.Db, tblRow.Tbl))
c.ctlconn.ExecContext(context.TODO(), "set tc_admin = 1;")
err = c.ctlconn.SelectContext(context.TODO(), &result, fmt.Sprintf("tdbctl check table `%s`.`%s`;", tblRow.Db,
tblRow.Tbl))
if err != nil {
errChan <- err
subErrCnt++
Expand All @@ -168,7 +178,7 @@ func (c *tableSchemaConsistencyCheck) Run() (msg string, err error) {
slog.Error("exec tdbctl check table failed", slog.String("error", err.Error()))
continue
}
c.ctldb.Exec("set tc_admin = 0;")
c.ctlconn.ExecContext(context.TODO(), "set tc_admin = 0;")
inconsistentItems := result.CheckResult()
if err = c.atomUpdateCheckResult(tblRow.Db, tblRow.Tbl, inconsistentItems); err == nil {
slog.Info("update checkresult ok")
Expand Down Expand Up @@ -203,7 +213,7 @@ func (c *tableSchemaConsistencyCheck) atomUpdateCheckResult(db, tbl string, inco
err error) {
var status string
status = SchemaCheckOk
tx, err := c.ctldb.Begin()
tx, err := c.ctlconn.BeginTx(context.TODO(), nil)
if err != nil {
return err
}
Expand Down

0 comments on commit 96f804b

Please sign in to comment.