Skip to content

Commit

Permalink
Small refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
noname0443 committed Oct 6, 2023
1 parent b24907c commit 3413f46
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 40 deletions.
4 changes: 2 additions & 2 deletions internal/mysql/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ type Cluster struct {
sync.Mutex
config *config.Config
logger *log.Logger
haNodes map[string]*Node
cascadeNodes map[string]*Node
local *Node
dcs dcs.DCS
haNodes map[string]*Node
cascadeNodes map[string]*Node
}

func (c *Cluster) IsHAHost(hostname string) bool {
Expand Down
16 changes: 8 additions & 8 deletions internal/mysql/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,48 +41,48 @@ type NodeConfiguration struct {
}

type ResetupStatus struct {
Status bool
UpdateTime time.Time
Status bool
}

type replicationSettings struct {
ChannelName string `db:"ChannelName"`
SourceHost string `db:"SourceHost"`
SourceUser string `db:"SourceUser"`
SourcePassword string `db:"SourcePassword"`
SourcePort int `db:"SourcePort"`
SourceSslCa string `db:"SourceSslCa"`
SourceDelay int `db:"SourceDelay"`
ReplicationStatus sql.NullString `db:"ReplicationStatus"`
SourcePort int `db:"SourcePort"`
SourceDelay int `db:"SourceDelay"`
}

// SlaveStatusStruct contains SHOW SLAVE STATUS response
type SlaveStatusStruct struct {
MasterHost string `db:"Master_Host"`
MasterPort int `db:"Master_Port"`
MasterLogFile string `db:"Master_Log_File"`
ReadMasterLogPos int64 `db:"Read_Master_Log_Pos"`
SlaveIORunning string `db:"Slave_IO_Running"`
SlaveSQLRunning string `db:"Slave_SQL_Running"`
RetrievedGtidSet string `db:"Retrieved_Gtid_Set"`
ExecutedGtidSet string `db:"Executed_Gtid_Set"`
LastIOErrno int `db:"Last_IO_Errno"`
LastSQLErrno int `db:"Last_SQL_Errno"`
MasterPort int `db:"Master_Port"`
ReadMasterLogPos int64 `db:"Read_Master_Log_Pos"`
Lag sql.NullFloat64 `db:"Seconds_Behind_Master"`
}

// ReplicaStatusStruct contains SHOW REPLICA STATUS response
type ReplicaStatusStruct struct {
SourceHost string `db:"Source_Host"`
SourcePort int `db:"Source_Port"`
SourceLogFile string `db:"Source_Log_File"`
ReadSourceLogPos int64 `db:"Read_Source_Log_Pos"`
ReplicaIORunning string `db:"Replica_IO_Running"`
ReplicaSQLRunning string `db:"Replica_SQL_Running"`
RetrievedGtidSet string `db:"Retrieved_Gtid_Set"`
ExecutedGtidSet string `db:"Executed_Gtid_Set"`
SourcePort int `db:"Source_Port"`
LastIOErrno int `db:"Last_IO_Errno"`
LastSQLErrno int `db:"Last_SQL_Errno"`
ReadSourceLogPos int64 `db:"Read_Source_Log_Pos"`
Lag sql.NullFloat64 `db:"Seconds_Behind_Source"`
}

Expand Down Expand Up @@ -207,7 +207,7 @@ func (ss *ReplicaStatusStruct) GetReplicationLag() sql.NullFloat64 {
return ss.Lag
}

//GetReplicationLag
// GetReplicationLag

// ReplicationState ...
func (ss *SlaveStatusStruct) ReplicationState() string {
Expand Down
59 changes: 33 additions & 26 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
type Node struct {
config *config.Config
logger *log.Logger
host string
db *sqlx.DB
version *Version
host string
}

var (
Expand Down Expand Up @@ -147,8 +147,8 @@ func (n *Node) getQuery(name string) string {
func (n *Node) traceQuery(query string, arg interface{}, result interface{}, err error) {
query = queryOnliner.ReplaceAllString(query, " ")
msg := fmt.Sprintf("node %s running query '%s' with args %#v, result: %#v, error: %v", n.host, query, arg, result, err)
msg = strings.Replace(msg, n.config.MySQL.Password, "********", -1)
msg = strings.Replace(msg, n.config.MySQL.ReplicationPassword, "********", -1)
msg = strings.ReplaceAll(msg, n.config.MySQL.Password, "********")
msg = strings.ReplaceAll(msg, n.config.MySQL.ReplicationPassword, "********")
n.logger.Debug(msg)
}

Expand All @@ -158,15 +158,9 @@ func (n *Node) queryRow(queryName string, arg interface{}, result interface{}) e
}

func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result interface{}, timeout time.Duration) error {
if arg == nil {
arg = struct{}{}
}
query := n.getQuery(queryName)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
rows, err := n.db.NamedQueryContext(ctx, query, arg)
if err == nil {
defer func() { _ = rows.Close() }()
return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error {
var err error

if rows.Next() {
err = rows.StructScan(result)
} else {
Expand All @@ -175,32 +169,45 @@ func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result int
err = sql.ErrNoRows
}
}
}
n.traceQuery(query, arg, result, err)
return err

return err
}, timeout)
}

// nolint: unparam
func (n *Node) queryRows(queryName string, arg interface{}, scanner func(*sqlx.Rows) error) error {
return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error {
var err error

for rows.Next() {
err = scanner(rows)
if err != nil {
break
}
}

return err
}, n.config.DBTimeout)
}

func (n *Node) processQuery(queryName string, arg interface{}, rowsProcessor func(*sqlx.Rows) error, timeout time.Duration) error {
if arg == nil {
arg = struct{}{}
}
query := n.getQuery(queryName)
ctx, cancel := context.WithTimeout(context.Background(), n.config.DBTimeout)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

query := n.getQuery(queryName)
rows, err := n.db.NamedQueryContext(ctx, query, arg)
n.traceQuery(query, arg, rows, err)
if err != nil {
return err
}

defer func() { _ = rows.Close() }()
for rows.Next() {
err = scanner(rows)
if err != nil {
break
}
}
return err

return rowsProcessor(rows)
}

// nolint: unparam
Expand Down Expand Up @@ -265,8 +272,8 @@ func (n *Node) getRunningQueryIds(excludeUsers []string, timeout time.Duration)
type schemaname string

func escape(s string) string {
s = strings.Replace(s, `\`, `\\`, -1)
s = strings.Replace(s, `'`, `\'`, -1)
s = strings.ReplaceAll(s, `\`, `\\`)
s = strings.ReplaceAll(s, `'`, `\'`)
return s
}

Expand Down
8 changes: 4 additions & 4 deletions internal/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ func TouchFile(fname string) error {
_, err := os.Stat(fname)
if os.IsNotExist(err) {
err := os.WriteFile(fname, []byte(""), 0644)
//file, err := os.Create(fname)
// file, err := os.Create(fname)
if err != nil {
return err
}
//defer file.Close()
// defer file.Close()
}
return nil
}

func RunParallel(f func(string) error, arguments []string) map[string]error {
type pair struct {
key string
err error
key string
}
errs := make(chan pair, len(arguments))
for _, argValue := range arguments {
go func(dbname string) {
errs <- pair{dbname, f(dbname)}
errs <- pair{key: dbname, err: f(dbname)}
}(argValue)
}
result := make(map[string]error)
Expand Down

0 comments on commit 3413f46

Please sign in to comment.