Skip to content

Commit

Permalink
Small refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
noname0443 committed Oct 9, 2023
1 parent b24907c commit ff1b2f0
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 29 deletions.
4 changes: 2 additions & 2 deletions internal/mysql/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ type Cluster struct {
sync.Mutex
config *config.Config
logger *log.Logger
haNodes map[string]*Node
cascadeNodes map[string]*Node
local *Node
dcs dcs.DCS
haNodes map[string]*Node
cascadeNodes map[string]*Node
}

func (c *Cluster) IsHAHost(hostname string) bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/mysql/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type NodeConfiguration struct {
}

type ResetupStatus struct {
Status bool
UpdateTime time.Time
Status bool
}

type replicationSettings struct {
Expand Down
59 changes: 33 additions & 26 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
type Node struct {
config *config.Config
logger *log.Logger
host string
db *sqlx.DB
version *Version
host string
}

var (
Expand Down Expand Up @@ -147,8 +147,8 @@ func (n *Node) getQuery(name string) string {
func (n *Node) traceQuery(query string, arg interface{}, result interface{}, err error) {
query = queryOnliner.ReplaceAllString(query, " ")
msg := fmt.Sprintf("node %s running query '%s' with args %#v, result: %#v, error: %v", n.host, query, arg, result, err)
msg = strings.Replace(msg, n.config.MySQL.Password, "********", -1)
msg = strings.Replace(msg, n.config.MySQL.ReplicationPassword, "********", -1)
msg = strings.ReplaceAll(msg, n.config.MySQL.Password, "********")
msg = strings.ReplaceAll(msg, n.config.MySQL.ReplicationPassword, "********")
n.logger.Debug(msg)
}

Expand All @@ -158,15 +158,9 @@ func (n *Node) queryRow(queryName string, arg interface{}, result interface{}) e
}

func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result interface{}, timeout time.Duration) error {
if arg == nil {
arg = struct{}{}
}
query := n.getQuery(queryName)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
rows, err := n.db.NamedQueryContext(ctx, query, arg)
if err == nil {
defer func() { _ = rows.Close() }()
return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error {
var err error

if rows.Next() {
err = rows.StructScan(result)
} else {
Expand All @@ -175,32 +169,45 @@ func (n *Node) queryRowWithTimeout(queryName string, arg interface{}, result int
err = sql.ErrNoRows
}
}
}
n.traceQuery(query, arg, result, err)
return err

return err
}, timeout)
}

// nolint: unparam
func (n *Node) queryRows(queryName string, arg interface{}, scanner func(*sqlx.Rows) error) error {
return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error {
var err error

for rows.Next() {
err = scanner(rows)
if err != nil {
break
}
}

return err
}, n.config.DBTimeout)
}

func (n *Node) processQuery(queryName string, arg interface{}, rowsProcessor func(*sqlx.Rows) error, timeout time.Duration) error {
if arg == nil {
arg = struct{}{}
}
query := n.getQuery(queryName)
ctx, cancel := context.WithTimeout(context.Background(), n.config.DBTimeout)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

query := n.getQuery(queryName)
rows, err := n.db.NamedQueryContext(ctx, query, arg)
n.traceQuery(query, arg, rows, err)
if err != nil {
return err
}

defer func() { _ = rows.Close() }()
for rows.Next() {
err = scanner(rows)
if err != nil {
break
}
}
return err

return rowsProcessor(rows)
}

// nolint: unparam
Expand Down Expand Up @@ -265,8 +272,8 @@ func (n *Node) getRunningQueryIds(excludeUsers []string, timeout time.Duration)
type schemaname string

func escape(s string) string {
s = strings.Replace(s, `\`, `\\`, -1)
s = strings.Replace(s, `'`, `\'`, -1)
s = strings.ReplaceAll(s, `\`, `\\`)
s = strings.ReplaceAll(s, `'`, `\'`)
return s
}

Expand Down

0 comments on commit ff1b2f0

Please sign in to comment.