Skip to content

Commit

Permalink
Cleanup. Allow monitor console to restart itself
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Lefranc committed Jan 28, 2015
1 parent 5e5ee13 commit 8feb02b
Showing 1 changed file with 83 additions and 98 deletions.
181 changes: 83 additions & 98 deletions mariadb-repmgr/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
)

var (
status map[string]int64
prevStatus map[string]int64
variable map[string]string
slaveList []string
exit bool
vy int
slaveList []string
exit bool
vy int
dbUser string
dbPass string
rplUser string
rplPass string
)

var (
Expand All @@ -42,15 +43,6 @@ var (
gtidCheck = flag.Bool("gtidcheck", false, "Check that GTID sequence numbers are identical before initiating failover")
)

var (
dbUser string
dbPass string
rplUser string
rplPass string
masterHost string
masterPort string
)

type ServerMonitor struct {
Conn *sqlx.DB
Host string
Expand Down Expand Up @@ -81,13 +73,6 @@ func main() {
if *masterUrl == "" {
log.Fatal("ERROR: No master host specified.")
}
master := new(ServerMonitor)
master.Host, master.Port = splitHostPort(*masterUrl)
var err error
master.IP, err = dbhelper.CheckHostAddr(master.Host)
if err != nil {
log.Fatalln("ERROR: DNS resolution error for host", master.Host)
}
if *user == "" {
log.Fatal("ERROR: No master user/pair specified.")
}
Expand All @@ -96,15 +81,15 @@ func main() {
log.Fatal("ERROR: No replication user/pair specified.")
}
rplUser, rplPass = splitPair(*rpluser)

master := new(ServerMonitor)
if *verbose {
log.Printf("Connecting to master server %s:%s", master.Host, master.Port)
}
master.init(*masterUrl)

master.Conn, err = dbhelper.MySQLConnect(dbUser, dbPass, dbhelper.GetAddress(master.Host, master.Port, *socket))
if err != nil {
log.Fatal("Error: could not connect to master server.")
}
defer master.Conn.Close()

// If slaves option is empty, then attempt automatic discovery.
// fmt.Println("Length of slaveList", len(slaveList))
if len(slaveList) == 0 {
Expand All @@ -113,29 +98,20 @@ func main() {
log.Fatal("Error: no slaves found. Please supply a list of slaves manually.")
}
}
var err error
slave := make([]ServerMonitor, len(slaveList))
for k, v := range slaveList {
slave[k].Host, slave[k].Port = splitHostPort(v)
slave[k].IP, err = dbhelper.CheckHostAddr(slave[k].Host)
if err != nil {
log.Fatalln("ERROR: DNS resolution error for host", slave[k].Host)
for k, url := range slaveList {
slave[k].init(url)
if *verbose {
log.Printf("Checking if server %s is a slave of server %s", slave[k].Host, master.Host)
}
if validateHostPort(slave[k].IP, slave[k].Port) {
var err error
slave[k].Conn, err = dbhelper.MySQLConnect(dbUser, dbPass, dbhelper.GetAddress(slave[k].Host, slave[k].Port, *socket))
if err != nil {
log.Fatal(err)
}
if *verbose {
log.Printf("Checking if server %s is a slave of server %s", slave[k].Host, master.Host)
}
if dbhelper.IsSlaveof(slave[k].Conn, slave[k].Host, master.IP) == false {
log.Fatalf("ERROR: Server %s is not a slave of declared master %s", v, master.Host)
}
if dbhelper.IsSlaveof(slave[k].Conn, slave[k].Host, master.IP) == false {
log.Fatalf("ERROR: Server %s is not a slave of declared master %s", url, master.Host)
}
defer slave[k].Conn.Close()
}

MainLoop:
err = termbox.Init()
if err != nil {
log.Fatal(err)
Expand All @@ -147,8 +123,6 @@ func main() {
for exit == false {
select {
case <-ticker.C:
status = dbhelper.GetStatusAsInt(master.Conn)
variable = dbhelper.GetVariables(master.Conn)
drawHeader()
master.refresh()
master.drawMaster()
Expand Down Expand Up @@ -180,41 +154,29 @@ func main() {
termbox.Close()
switch command {
case "switchover":
master.switchover()
log.Println("Quitting")
nmUrl, nsKey := master.switchover()
log.Printf("Info: new master is %s, demoted master is %s", nmUrl, slaveList[nsKey])
master.init(nmUrl)
slave[nsKey].init(slaveList[nsKey])
log.Println("Restarting monitor console in 5 seconds. Press Ctrl-C to exit")
time.Sleep(5 * time.Second)
exit = false
goto MainLoop
}
}

func drawHeader() {
termbox.Clear(termbox.ColorWhite, termbox.ColorBlack)
printTb(0, 0, termbox.ColorWhite, termbox.ColorBlack|termbox.AttrReverse|termbox.AttrBold, "MariaDB Replication Monitor and Health Checker")
printfTb(0, 5, termbox.ColorWhite|termbox.AttrBold, termbox.ColorBlack, "%15s %6s %7s %12s %20s %20s %20s %6s %3s", "Slave Host", "Port", "Binlog", "Using GTID", "Current GTID", "Slave GTID", "Replication Health", "Delay", "RO")

}

func (master *ServerMonitor) drawMaster() {
master.refresh()
printfTb(0, 2, termbox.ColorWhite|termbox.AttrBold, termbox.ColorBlack, "%15s %6s %20s %12s", "Master Host", "Port", "Binlog Position", "Strict Mode")
printfTb(0, 3, termbox.ColorWhite, termbox.ColorBlack, "%15s %6s %20s %12s", master.Host, master.Port, master.BinlogPos, master.Strict)
}

func (slave *ServerMonitor) drawSlave(vy *int) {
printfTb(0, *vy, termbox.ColorWhite, termbox.ColorBlack, "%15s %6s %7s %12s %20s %20s %20s %6d %3s", slave.Host, slave.Port, slave.LogBin, slave.UsingGtid, slave.CurrentGtid, slave.SlaveGtid, slave.healthCheck(), slave.Delay.Int64, slave.ReadOnly)
*vy++
}

func drawFooter(vy *int) {
*vy++
printTb(0, *vy, termbox.ColorWhite, termbox.ColorBlack, " Ctrl-Q to quit, Ctrl-S to switch over")
}

/* Initializes a server object */
func (server *ServerMonitor) init(url string) {
server.Host, server.Port = splitHostPort(url)
var err error
server.IP, err = dbhelper.CheckHostAddr(server.Host)
if err != nil {
log.Fatalln("ERROR: DNS resolution error for host", server.Host)

}
server.Conn, err = dbhelper.MySQLConnect(dbUser, dbPass, dbhelper.GetAddress(server.Host, server.Port, *socket))
if err != nil {
log.Fatalln("Error: could not connect to server", url, err, dbUser, dbPass)
}
}

Expand Down Expand Up @@ -255,7 +217,8 @@ func (sm *ServerMonitor) healthCheck() string {
}
}

func (master *ServerMonitor) switchover() {
/* Triggers a master switchover. Returns the new master's URL */
func (master *ServerMonitor) switchover() (string, int) {
log.Println("Starting switchover")
log.Println("Flushing tables on master")
err := dbhelper.FlushTablesNoLog(master.Conn)
Expand All @@ -267,12 +230,13 @@ func (master *ServerMonitor) switchover() {
log.Fatal("ERROR: Long updates running on master. Cannot switchover")
}
log.Println("Electing a new master")
candidate := master.electCandidate(slaveList)
newMasterHost, newMasterPort := splitHostPort(candidate)
log.Printf("Slave %s has been elected as a new master", candidate)
newMasterUrl := master.electCandidate(slaveList)
log.Printf("Slave %s has been elected as a new master", newMasterUrl)
newMaster := new(ServerMonitor)
newMaster.init(newMasterUrl)
if *preScript != "" {
log.Printf("Calling pre-failover script")
out, err := exec.Command(*preScript, master.Host, newMasterHost).CombinedOutput()
out, err := exec.Command(*preScript, master.Host, newMaster.Host).CombinedOutput()
if err != nil {
log.Println("ERROR:", err)
}
Expand All @@ -284,20 +248,15 @@ func (master *ServerMonitor) switchover() {
log.Println("WARNING: Could not lock tables on master", err)
}
log.Println("Switching master")
newMasterIP, err := dbhelper.CheckHostAddr(newMasterHost)
if err != nil {
log.Fatalln("ERROR: DNS resolution error for host", newMasterHost)
}
newMaster := dbhelper.Connect(dbUser, dbPass, "tcp("+candidate+")")
log.Println("Waiting for candidate master to synchronize")
masterGtid := dbhelper.GetVariableByName(master.Conn, "GTID_BINLOG_POS")
dbhelper.MasterPosWait(newMaster, masterGtid)
dbhelper.MasterPosWait(newMaster.Conn, masterGtid)
log.Println("Stopping slave thread on new master")
err = dbhelper.StopSlave(newMaster)
err = dbhelper.StopSlave(newMaster.Conn)
if err != nil {
log.Println("WARNING: Stopping slave failed on new master")
}
cm := "CHANGE MASTER TO master_host='" + newMasterIP + "', master_port=" + newMasterPort + ", master_user='" + rplUser + "', master_password='" + rplPass + "'"
cm := "CHANGE MASTER TO master_host='" + newMaster.IP + "', master_port=" + newMaster.Port + ", master_user='" + rplUser + "', master_password='" + rplPass + "'"
log.Println("Switching old master as a slave")
err = dbhelper.UnlockTables(master.Conn)
if err != nil {
Expand All @@ -316,55 +275,58 @@ func (master *ServerMonitor) switchover() {
log.Printf("ERROR: Could not set old master as read-only", err)
}
log.Println("Resetting slave on new master and set read/write mode on")
err = dbhelper.ResetSlave(newMaster, true)
err = dbhelper.ResetSlave(newMaster.Conn, true)
if err != nil {
log.Println("WARNING: Reset slave failed on new master")
}
err = dbhelper.SetReadOnly(newMaster, false)
err = dbhelper.SetReadOnly(newMaster.Conn, false)
if err != nil {
log.Println("ERROR: Could not set new master as read-write")
}
log.Println("Switching other slaves to the new master")
for _, v := range slaveList {
if v == candidate {
var oldMasterKey int
for k, url := range slaveList {
if url == newMasterUrl {
slaveList[k] = *masterUrl
oldMasterKey = k
continue
}
slaveHost, slavePort := splitHostPort(v)
slaveHost, slavePort := splitHostPort(url)
slave, err := dbhelper.MySQLConnect(dbUser, dbPass, dbhelper.GetAddress(slaveHost, slavePort, *socket))
if err != nil {
log.Printf("ERROR: Could not connect to slave %s, %s", v, err)
log.Printf("ERROR: Could not connect to slave %s, %s", url, err)
} else {
log.Printf("Waiting for slave %s to sync", v)
dbhelper.MasterPosWait(newMaster, masterGtid)
log.Printf("Change master on slave %s", v)
log.Printf("Waiting for slave %s to sync", url)
dbhelper.MasterPosWait(newMaster.Conn, masterGtid)
log.Printf("Change master on slave %s", url)
err := dbhelper.StopSlave(slave)
if err != nil {
log.Printf("WARNING: Could not stop slave on server %s, %s", v, err)
log.Printf("WARNING: Could not stop slave on server %s, %s", url, err)
}
_, err = slave.Exec(cm)
if err != nil {
log.Printf("ERROR: Change master failed on slave %s, %s", v, err)
log.Printf("ERROR: Change master failed on slave %s, %s", url, err)
}
err = dbhelper.StartSlave(slave)
if err != nil {
log.Printf("ERROR: could not start slave on server %s, %s", v, err)
log.Printf("ERROR: could not start slave on server %s, %s", url, err)
}
err = dbhelper.SetReadOnly(slave, true)
if err != nil {
log.Printf("ERROR: Could not set slave %s as read-only, %s", v, err)
log.Printf("ERROR: Could not set slave %s as read-only, %s", url, err)
}
}
}
if *postScript != "" {
log.Printf("Calling post-failover script")
out, err := exec.Command(*postScript, masterHost, newMasterHost).CombinedOutput()
out, err := exec.Command(*postScript, master.Host, newMaster.Host).CombinedOutput()
if err != nil {
log.Println("ERROR:", err)
}
log.Println("Post-failover script complete", string(out))
}
log.Println("Switchover complete")
return
return newMasterUrl, oldMasterKey
}

/* Returns two host and port items from a pair, e.g. host:port */
Expand Down Expand Up @@ -473,6 +435,29 @@ func getSeqFromGtid(gtid string) uint64 {
return s
}

func drawHeader() {
termbox.Clear(termbox.ColorWhite, termbox.ColorBlack)
printTb(0, 0, termbox.ColorWhite, termbox.ColorBlack|termbox.AttrReverse|termbox.AttrBold, "MariaDB Replication Monitor and Health Checker")
printfTb(0, 5, termbox.ColorWhite|termbox.AttrBold, termbox.ColorBlack, "%15s %6s %7s %12s %20s %20s %20s %6s %3s", "Slave Host", "Port", "Binlog", "Using GTID", "Current GTID", "Slave GTID", "Replication Health", "Delay", "RO")

}

func (master *ServerMonitor) drawMaster() {
master.refresh()
printfTb(0, 2, termbox.ColorWhite|termbox.AttrBold, termbox.ColorBlack, "%15s %6s %20s %12s", "Master Host", "Port", "Binlog Position", "Strict Mode")
printfTb(0, 3, termbox.ColorWhite, termbox.ColorBlack, "%15s %6s %20s %12s", master.Host, master.Port, master.BinlogPos, master.Strict)
}

func (slave *ServerMonitor) drawSlave(vy *int) {
printfTb(0, *vy, termbox.ColorWhite, termbox.ColorBlack, "%15s %6s %7s %12s %20s %20s %20s %6d %3s", slave.Host, slave.Port, slave.LogBin, slave.UsingGtid, slave.CurrentGtid, slave.SlaveGtid, slave.healthCheck(), slave.Delay.Int64, slave.ReadOnly)
*vy++
}

func drawFooter(vy *int) {
*vy++
printTb(0, *vy, termbox.ColorWhite, termbox.ColorBlack, " Ctrl-Q to quit, Ctrl-S to switch over")
}

func printTb(x, y int, fg, bg termbox.Attribute, msg string) {
for _, c := range msg {
termbox.SetCell(x, y, c, fg, bg)
Expand Down

0 comments on commit 8feb02b

Please sign in to comment.