From 3413f46eb238695db0237a6ccd5db4147cf19614 Mon Sep 17 00:00:00 2001 From: Evgeniy Date: Fri, 6 Oct 2023 12:34:30 +0300 Subject: [PATCH] Small refactoring --- internal/mysql/cluster.go | 4 +-- internal/mysql/data.go | 16 +++++------ internal/mysql/node.go | 59 ++++++++++++++++++++++----------------- internal/util/util.go | 8 +++--- 4 files changed, 47 insertions(+), 40 deletions(-) diff --git a/internal/mysql/cluster.go b/internal/mysql/cluster.go index bf9c7842..9c6f25b8 100644 --- a/internal/mysql/cluster.go +++ b/internal/mysql/cluster.go @@ -17,10 +17,10 @@ type Cluster struct { sync.Mutex config *config.Config logger *log.Logger - haNodes map[string]*Node - cascadeNodes map[string]*Node local *Node dcs dcs.DCS + haNodes map[string]*Node + cascadeNodes map[string]*Node } func (c *Cluster) IsHAHost(hostname string) bool { diff --git a/internal/mysql/data.go b/internal/mysql/data.go index d38b21ca..5be47782 100644 --- a/internal/mysql/data.go +++ b/internal/mysql/data.go @@ -41,8 +41,8 @@ type NodeConfiguration struct { } type ResetupStatus struct { - Status bool UpdateTime time.Time + Status bool } type replicationSettings struct { @@ -50,39 +50,39 @@ type replicationSettings struct { SourceHost string `db:"SourceHost"` SourceUser string `db:"SourceUser"` SourcePassword string `db:"SourcePassword"` - SourcePort int `db:"SourcePort"` SourceSslCa string `db:"SourceSslCa"` - SourceDelay int `db:"SourceDelay"` ReplicationStatus sql.NullString `db:"ReplicationStatus"` + SourcePort int `db:"SourcePort"` + SourceDelay int `db:"SourceDelay"` } // SlaveStatusStruct contains SHOW SLAVE STATUS response type SlaveStatusStruct struct { MasterHost string `db:"Master_Host"` - MasterPort int `db:"Master_Port"` MasterLogFile string `db:"Master_Log_File"` - ReadMasterLogPos int64 `db:"Read_Master_Log_Pos"` SlaveIORunning string `db:"Slave_IO_Running"` SlaveSQLRunning string `db:"Slave_SQL_Running"` RetrievedGtidSet string `db:"Retrieved_Gtid_Set"` ExecutedGtidSet string `db:"Executed_Gtid_Set"` LastIOErrno int `db:"Last_IO_Errno"` LastSQLErrno int `db:"Last_SQL_Errno"` + MasterPort int `db:"Master_Port"` + ReadMasterLogPos int64 `db:"Read_Master_Log_Pos"` Lag sql.NullFloat64 `db:"Seconds_Behind_Master"` } // ReplicaStatusStruct contains SHOW REPLICA STATUS response type ReplicaStatusStruct struct { SourceHost string `db:"Source_Host"` - SourcePort int `db:"Source_Port"` SourceLogFile string `db:"Source_Log_File"` - ReadSourceLogPos int64 `db:"Read_Source_Log_Pos"` ReplicaIORunning string `db:"Replica_IO_Running"` ReplicaSQLRunning string `db:"Replica_SQL_Running"` RetrievedGtidSet string `db:"Retrieved_Gtid_Set"` ExecutedGtidSet string `db:"Executed_Gtid_Set"` + SourcePort int `db:"Source_Port"` LastIOErrno int `db:"Last_IO_Errno"` LastSQLErrno int `db:"Last_SQL_Errno"` + ReadSourceLogPos int64 `db:"Read_Source_Log_Pos"` Lag sql.NullFloat64 `db:"Seconds_Behind_Source"` } @@ -207,7 +207,7 @@ func (ss *ReplicaStatusStruct) GetReplicationLag() sql.NullFloat64 { return ss.Lag } -//GetReplicationLag +// GetReplicationLag // ReplicationState ... func (ss *SlaveStatusStruct) ReplicationState() string { diff --git a/internal/mysql/node.go b/internal/mysql/node.go index e6343880..6a694364 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -31,9 +31,9 @@ import ( type Node struct { config *config.Config logger *log.Logger - host string db *sqlx.DB version *Version + host string } var ( @@ -147,8 +147,8 @@ func (n *Node) getQuery(name string) string { func (n *Node) traceQuery(query string, arg interface{}, result interface{}, err error) { query = queryOnliner.ReplaceAllString(query, " ") msg := fmt.Sprintf("node %s running query '%s' with args %#v, result: %#v, error: %v", n.host, query, arg, result, err) - msg = strings.Replace(msg, n.config.MySQL.Password, "********", -1) - msg = strings.Replace(msg, n.config.MySQL.ReplicationPassword, "********", -1) + msg = strings.ReplaceAll(msg, n.config.MySQL.Password, "********") + msg = strings.ReplaceAll(msg, n.config.MySQL.ReplicationPassword, "********") n.logger.Debug(msg) } @@ -158,15 +158,9 @@ func (n *Node) queryRow(queryName string, arg interface{}, result interface{}) e } func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result interface{}, timeout time.Duration) error { - if arg == nil { - arg = struct{}{} - } - query := n.getQuery(queryName) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - rows, err := n.db.NamedQueryContext(ctx, query, arg) - if err == nil { - defer func() { _ = rows.Close() }() + return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error { + var err error + if rows.Next() { err = rows.StructScan(result) } else { @@ -175,32 +169,45 @@ func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result int err = sql.ErrNoRows } } - } - n.traceQuery(query, arg, result, err) - return err + + return err + }, timeout) } // nolint: unparam func (n *Node) queryRows(queryName string, arg interface{}, scanner func(*sqlx.Rows) error) error { + return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error { + var err error + + for rows.Next() { + err = scanner(rows) + if err != nil { + break + } + } + + return err + }, n.config.DBTimeout) +} + +func (n *Node) processQuery(queryName string, arg interface{}, rowsProcessor func(*sqlx.Rows) error, timeout time.Duration) error { if arg == nil { arg = struct{}{} } - query := n.getQuery(queryName) - ctx, cancel := context.WithTimeout(context.Background(), n.config.DBTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + + query := n.getQuery(queryName) rows, err := n.db.NamedQueryContext(ctx, query, arg) n.traceQuery(query, arg, rows, err) if err != nil { return err } + defer func() { _ = rows.Close() }() - for rows.Next() { - err = scanner(rows) - if err != nil { - break - } - } - return err + + return rowsProcessor(rows) } // nolint: unparam @@ -265,8 +272,8 @@ func (n *Node) getRunningQueryIds(excludeUsers []string, timeout time.Duration) type schemaname string func escape(s string) string { - s = strings.Replace(s, `\`, `\\`, -1) - s = strings.Replace(s, `'`, `\'`, -1) + s = strings.ReplaceAll(s, `\`, `\\`) + s = strings.ReplaceAll(s, `'`, `\'`) return s } diff --git a/internal/util/util.go b/internal/util/util.go index aa011125..e03fc668 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -37,24 +37,24 @@ func TouchFile(fname string) error { _, err := os.Stat(fname) if os.IsNotExist(err) { err := os.WriteFile(fname, []byte(""), 0644) - //file, err := os.Create(fname) + // file, err := os.Create(fname) if err != nil { return err } - //defer file.Close() + // defer file.Close() } return nil } func RunParallel(f func(string) error, arguments []string) map[string]error { type pair struct { - key string err error + key string } errs := make(chan pair, len(arguments)) for _, argValue := range arguments { go func(dbname string) { - errs <- pair{dbname, f(dbname)} + errs <- pair{key: dbname, err: f(dbname)} }(argValue) } result := make(map[string]error)