From c6874ee6aa87a1cac92e0991479d0a1d07d86cac Mon Sep 17 00:00:00 2001 From: noname0443 Date: Wed, 11 Oct 2023 16:12:04 +0300 Subject: [PATCH] Small refactoring (#43) * Changed 'Disabled' const * Small refactoring * A couple of methods transformed to functions --- internal/mysql/cluster.go | 4 +- internal/mysql/data.go | 2 +- internal/mysql/node.go | 89 +++++++++++++++++++++------------------ internal/mysql/util.go | 6 ++- internal/util/consts.go | 2 +- 5 files changed, 56 insertions(+), 47 deletions(-) diff --git a/internal/mysql/cluster.go b/internal/mysql/cluster.go index bf9c7842..9c6f25b8 100644 --- a/internal/mysql/cluster.go +++ b/internal/mysql/cluster.go @@ -17,10 +17,10 @@ type Cluster struct { sync.Mutex config *config.Config logger *log.Logger - haNodes map[string]*Node - cascadeNodes map[string]*Node local *Node dcs dcs.DCS + haNodes map[string]*Node + cascadeNodes map[string]*Node } func (c *Cluster) IsHAHost(hostname string) bool { diff --git a/internal/mysql/data.go b/internal/mysql/data.go index d38b21ca..fedc605c 100644 --- a/internal/mysql/data.go +++ b/internal/mysql/data.go @@ -41,8 +41,8 @@ type NodeConfiguration struct { } type ResetupStatus struct { - Status bool UpdateTime time.Time + Status bool } type replicationSettings struct { diff --git a/internal/mysql/node.go b/internal/mysql/node.go index e6343880..bc59abd0 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -31,9 +31,9 @@ import ( type Node struct { config *config.Config logger *log.Logger - host string db *sqlx.DB version *Version + host string } var ( @@ -147,8 +147,8 @@ func (n *Node) getQuery(name string) string { func (n *Node) traceQuery(query string, arg interface{}, result interface{}, err error) { query = queryOnliner.ReplaceAllString(query, " ") msg := fmt.Sprintf("node %s running query '%s' with args %#v, result: %#v, error: %v", n.host, query, arg, result, err) - msg = strings.Replace(msg, n.config.MySQL.Password, "********", -1) - msg = strings.Replace(msg, n.config.MySQL.ReplicationPassword, "********", -1) + msg = strings.ReplaceAll(msg, n.config.MySQL.Password, "********") + msg = strings.ReplaceAll(msg, n.config.MySQL.ReplicationPassword, "********") n.logger.Debug(msg) } @@ -158,15 +158,9 @@ func (n *Node) queryRow(queryName string, arg interface{}, result interface{}) e } func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result interface{}, timeout time.Duration) error { - if arg == nil { - arg = struct{}{} - } - query := n.getQuery(queryName) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - rows, err := n.db.NamedQueryContext(ctx, query, arg) - if err == nil { - defer func() { _ = rows.Close() }() + return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error { + var err error + if rows.Next() { err = rows.StructScan(result) } else { @@ -175,32 +169,45 @@ func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result int err = sql.ErrNoRows } } - } - n.traceQuery(query, arg, result, err) - return err + + return err + }, timeout) } // nolint: unparam func (n *Node) queryRows(queryName string, arg interface{}, scanner func(*sqlx.Rows) error) error { + return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error { + var err error + + for rows.Next() { + err = scanner(rows) + if err != nil { + break + } + } + + return err + }, n.config.DBTimeout) +} + +func (n *Node) processQuery(queryName string, arg interface{}, rowsProcessor func(*sqlx.Rows) error, timeout time.Duration) error { if arg == nil { arg = struct{}{} } - query := n.getQuery(queryName) - ctx, cancel := context.WithTimeout(context.Background(), n.config.DBTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + + query := n.getQuery(queryName) rows, err := n.db.NamedQueryContext(ctx, query, arg) n.traceQuery(query, arg, rows, err) if err != nil { return err } + defer func() { _ = rows.Close() }() - for rows.Next() { - err = scanner(rows) - if err != nil { - break - } - } - return err + + return rowsProcessor(rows) } // nolint: unparam @@ -265,8 +272,8 @@ func (n *Node) getRunningQueryIds(excludeUsers []string, timeout time.Duration) type schemaname string func escape(s string) string { - s = strings.Replace(s, `\`, `\\`, -1) - s = strings.Replace(s, `'`, `\'`, -1) + s = strings.ReplaceAll(s, `\`, `\\`) + s = strings.ReplaceAll(s, `'`, `\'`) return s } @@ -372,18 +379,6 @@ func (n *Node) GetDiskUsage() (used uint64, total uint64, err error) { return } -func (n *Node) isTestFileSystemReadonly(f string) (bool, error) { - data, err := os.ReadFile(f) - if err != nil { - return false, err - } - value, err := strconv.ParseBool(strings.TrimSpace(string(data))) - if err != nil { - return false, fmt.Errorf("error while parce test file: %s", err) - } - return value, nil -} - func getFlagsFromProcMounts(file, filesystem string) (string, error) { for _, line := range strings.Split(file, "\n") { components := strings.Split(line, " ") @@ -407,7 +402,7 @@ func getFlagsFromProcMounts(file, filesystem string) (string, error) { func (n *Node) IsFileSystemReadonly() (bool, error) { if n.config.TestFilesystemReadonlyFile != "" { - return n.isTestFileSystemReadonly(n.config.TestFilesystemReadonlyFile) + return isTestFileSystemReadonly(n.config.TestFilesystemReadonlyFile) } if !n.IsLocal() { return false, ErrNotLocalNode @@ -431,6 +426,18 @@ func (n *Node) IsFileSystemReadonly() (bool, error) { } } +func isTestFileSystemReadonly(f string) (bool, error) { + data, err := os.ReadFile(f) + if err != nil { + return false, err + } + value, err := strconv.ParseBool(strings.TrimSpace(string(data))) + if err != nil { + return false, fmt.Errorf("error while parce test file: %s", err) + } + return value, nil +} + func (n *Node) GetDaemonStartTime() (time.Time, error) { if !n.IsLocal() { return time.Time{}, ErrNotLocalNode @@ -900,7 +907,7 @@ func (n *Node) UpdateExternalCAFile() error { } if data != string(oldDataByte) { n.logger.Infof("saving new CA file to %s", fileName) - err := n.SaveCAFile(data, fileName) + err := SaveCAFile(data, fileName) if err != nil { return err } @@ -920,7 +927,7 @@ func (n *Node) UpdateExternalCAFile() error { return nil } -func (n *Node) SaveCAFile(data string, path string) error { +func SaveCAFile(data string, path string) error { rootCertPool := x509.NewCertPool() byteData := []byte(data) if ok := rootCertPool.AppendCertsFromPEM(byteData); !ok { diff --git a/internal/mysql/util.go b/internal/mysql/util.go index a2ea92cf..1ea18fa3 100644 --- a/internal/mysql/util.go +++ b/internal/mysql/util.go @@ -13,8 +13,10 @@ var dubiousErrorNumbers = []uint16{ 1698, // Symbol: ER_ACCESS_DENIED_NO_PASSWORD_ERROR; SQLSTATE: 28000 } -const channelDoesNotExists = 3074 // Symbol: ER_REPLICA_CHANNEL_DOES_NOT_EXIST; SQLSTATE: HY000 -const tableDoesNotExists = 1146 // Symbol: ER_NO_SUCH_TABLE; SQLSTATE: 42S02 +const ( + channelDoesNotExists = 3074 // Symbol: ER_REPLICA_CHANNEL_DOES_NOT_EXIST; SQLSTATE: HY000 + tableDoesNotExists = 1146 // Symbol: ER_NO_SUCH_TABLE; SQLSTATE: 42S02 +) // IsErrorDubious check that error may be caused by misconfiguration, mysync/scripts bugs // and not related to MySQL/network failure diff --git a/internal/util/consts.go b/internal/util/consts.go index ef421eb7..d62b3b1b 100644 --- a/internal/util/consts.go +++ b/internal/util/consts.go @@ -3,6 +3,6 @@ package util type ExternalReplicationType string const ( - Disabled ExternalReplicationType = "" + Disabled ExternalReplicationType = "off" MyExternalReplication ExternalReplicationType = "external" )