Skip to content

Commit

Permalink
Merge branch 'ef001'
Browse files Browse the repository at this point in the history
Conflicts:
	mariadb-repmgr/repmgr.go
  • Loading branch information
Guillaume Lefranc committed Jan 28, 2015
2 parents 092cfe1 + 2006ffc commit 5e5ee13
Showing 1 changed file with 109 additions and 97 deletions.
206 changes: 109 additions & 97 deletions mariadb-repmgr/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/mariadb-tools/common"
"github.com/mariadb-tools/dbhelper"
"github.com/nsf/termbox-go"
"github.com/tanji/mariadb-tools/common"
"github.com/tanji/mariadb-tools/dbhelper"
"log"
"net"
"os/exec"
Expand All @@ -23,10 +23,10 @@ var (
variable map[string]string
slaveList []string
exit bool
vy int
)

var (
master *sqlx.DB
version = flag.Bool("version", false, "Return version")
user = flag.String("user", "", "User for MariaDB login, specified in the [user]:[password] format")
masterUrl = flag.String("host", "", "MariaDB master host IP and port (optional), specified in the host:[port] format")
Expand All @@ -51,23 +51,22 @@ var (
masterPort string
)

type MasterMonitor struct {
Host string
Port string
BinlogPos string
Strict string
}

type SlaveMonitor struct {
Host string
Port string
LogBin string
UsingGtid string
SlaveGtid string
IOThread string
SQLThread string
ReadOnly string
Delay sql.NullInt64
type ServerMonitor struct {
Conn *sqlx.DB
Host string
Port string
IP string
BinlogPos string
Strict string
LogBin string
UsingGtid string
CurrentGtid string
SlaveGtid string
IOThread string
SQLThread string
ReadOnly string
Delay sql.NullInt64
IsMaster bool
}

func main() {
Expand All @@ -82,10 +81,12 @@ func main() {
if *masterUrl == "" {
log.Fatal("ERROR: No master host specified.")
}
masterHost, masterPort = splitHostPort(*masterUrl)
masterIP, err := dbhelper.CheckHostAddr(masterHost)
master := new(ServerMonitor)
master.Host, master.Port = splitHostPort(*masterUrl)
var err error
master.IP, err = dbhelper.CheckHostAddr(master.Host)
if err != nil {
log.Fatalln("ERROR: DNS resolution error for host", masterHost)
log.Fatalln("ERROR: DNS resolution error for host", master.Host)
}
if *user == "" {
log.Fatal("ERROR: No master user/pair specified.")
Expand All @@ -96,71 +97,75 @@ func main() {
}
rplUser, rplPass = splitPair(*rpluser)
if *verbose {
log.Printf("Connecting to master server %s:%s", masterHost, masterPort)
log.Printf("Connecting to master server %s:%s", master.Host, master.Port)
}

master, err = dbhelper.MySQLConnect(dbUser, dbPass, dbhelper.GetAddress(masterHost, masterPort, *socket))
master.Conn, err = dbhelper.MySQLConnect(dbUser, dbPass, dbhelper.GetAddress(master.Host, master.Port, *socket))
if err != nil {
log.Fatal("Error: could not connect to master server.")
}
defer master.Close()
defer master.Conn.Close()
// If slaves option is empty, then attempt automatic discovery.
// fmt.Println("Length of slaveList", len(slaveList))
if len(slaveList) == 0 {
slaveList = dbhelper.GetSlaveHostsDiscovery(master)
slaveList = dbhelper.GetSlaveHostsDiscovery(master.Conn)
if len(slaveList) == 0 {
log.Fatal("Error: no slaves found. Please supply a list of slaves manually.")
}
}
for _, v := range slaveList {
slaveHost, slavePort := splitHostPort(v)
slaveIP, err := dbhelper.CheckHostAddr(slaveHost)
slave := make([]ServerMonitor, len(slaveList))
for k, v := range slaveList {
slave[k].Host, slave[k].Port = splitHostPort(v)
slave[k].IP, err = dbhelper.CheckHostAddr(slave[k].Host)
if err != nil {
log.Fatalln("ERROR: DNS resolution error for host", slaveHost)
log.Fatalln("ERROR: DNS resolution error for host", slave[k].Host)
}
if validateHostPort(slaveIP, slavePort) {
if validateHostPort(slave[k].IP, slave[k].Port) {
var err error
slave, err := dbhelper.MySQLConnect(dbUser, dbPass, dbhelper.GetAddress(slaveHost, slavePort, *socket))
slave[k].Conn, err = dbhelper.MySQLConnect(dbUser, dbPass, dbhelper.GetAddress(slave[k].Host, slave[k].Port, *socket))
if err != nil {
log.Fatal(err)
}
if *verbose {
log.Printf("Checking if server %s is a slave of server %s", slaveHost, masterHost)
log.Printf("Checking if server %s is a slave of server %s", slave[k].Host, master.Host)
}
if dbhelper.IsSlaveof(slave, slaveHost, masterIP) == false {
log.Fatalf("ERROR: Server %s is not a slave of declared master %s", v, masterHost)
if dbhelper.IsSlaveof(slave[k].Conn, slave[k].Host, master.IP) == false {
log.Fatalf("ERROR: Server %s is not a slave of declared master %s", v, master.Host)
}
slave.Close()
}
defer slave[k].Conn.Close()
}

err = termbox.Init()
if err != nil {
log.Fatal(err)
}
defer termbox.Close()
termboxChan := new_tb_chan()
interval := time.Second
ticker := time.NewTicker(interval * 3)
drawMonitor()
Loop:
var command string
for exit == false {
select {
case <-ticker.C:
status = dbhelper.GetStatusAsInt(master)
variable = dbhelper.GetVariables(master)
drawMonitor()
status = dbhelper.GetStatusAsInt(master.Conn)
variable = dbhelper.GetVariables(master.Conn)
drawHeader()
master.refresh()
master.drawMaster()
vy = 6
for k, _ := range slaveList {
slave[k].refresh()
slave[k].drawSlave(&vy)
}
drawFooter(&vy)
termbox.Flush()
case event := <-termboxChan:
switch event.Type {
case termbox.EventKey:
if event.Key == termbox.KeyCtrlS {
command = "switchover"
exit = true
ticker.Stop()
close(termboxChan)
termbox.Close()
switchover()
log.Println("Quitting")
goto Loop

}
if event.Key == termbox.KeyCtrlQ {
exit = true
Expand All @@ -172,52 +177,59 @@ Loop:
}
}
}
termbox.Close()
switch command {
case "switchover":
master.switchover()
log.Println("Quitting")
}
}

func drawMonitor() {
func drawHeader() {
termbox.Clear(termbox.ColorWhite, termbox.ColorBlack)
printTb(0, 0, termbox.ColorWhite, termbox.ColorBlack|termbox.AttrReverse|termbox.AttrBold, "MariaDB Replication Monitor and Health Checker")
m := new(MasterMonitor)
m.init()
printfTb(0, 5, termbox.ColorWhite|termbox.AttrBold, termbox.ColorBlack, "%15s %6s %7s %12s %20s %20s %20s %6s %3s", "Slave Host", "Port", "Binlog", "Using GTID", "Current GTID", "Slave GTID", "Replication Health", "Delay", "RO")

}

func (master *ServerMonitor) drawMaster() {
master.refresh()
printfTb(0, 2, termbox.ColorWhite|termbox.AttrBold, termbox.ColorBlack, "%15s %6s %20s %12s", "Master Host", "Port", "Binlog Position", "Strict Mode")
printfTb(0, 3, termbox.ColorWhite, termbox.ColorBlack, "%15s %6s %20s %12s", m.Host, m.Port, m.BinlogPos, m.Strict)
printfTb(0, 5, termbox.ColorWhite|termbox.AttrBold, termbox.ColorBlack, "%15s %6s %7s %12s %20s %20s %6s %3s", "Slave Host", "Port", "Binlog", "Using GTID", "Slave GTID", "Replication Health", "Delay", "RO")
vy := 6
for _, v := range slaveList {
slave := new(SlaveMonitor)
slave.init(v)
printfTb(0, vy, termbox.ColorWhite, termbox.ColorBlack, "%15s %6s %7s %12s %20s %20s %6d %3s", slave.Host, slave.Port, slave.LogBin, slave.UsingGtid, slave.SlaveGtid, slave.healthCheck(), slave.Delay.Int64, slave.ReadOnly)
vy++
}
vy += 2
printTb(0, vy, termbox.ColorWhite, termbox.ColorBlack, " Ctrl-Q to quit, Ctrl-S to switch over")
termbox.Flush()
time.Sleep(time.Duration(1) * time.Second)
printfTb(0, 3, termbox.ColorWhite, termbox.ColorBlack, "%15s %6s %20s %12s", master.Host, master.Port, master.BinlogPos, master.Strict)
}

func (slave *ServerMonitor) drawSlave(vy *int) {
printfTb(0, *vy, termbox.ColorWhite, termbox.ColorBlack, "%15s %6s %7s %12s %20s %20s %20s %6d %3s", slave.Host, slave.Port, slave.LogBin, slave.UsingGtid, slave.CurrentGtid, slave.SlaveGtid, slave.healthCheck(), slave.Delay.Int64, slave.ReadOnly)
*vy++
}

/* Init a monitored master object */
func (mm *MasterMonitor) init() {
mm.Host = masterHost
mm.Port = masterPort
mm.BinlogPos = dbhelper.GetVariableByName(master, "GTID_BINLOG_POS")
mm.Strict = dbhelper.GetVariableByName(master, "GTID_STRICT_MODE")
func drawFooter(vy *int) {
*vy++
printTb(0, *vy, termbox.ColorWhite, termbox.ColorBlack, " Ctrl-Q to quit, Ctrl-S to switch over")
}

/* Init a monitored slave object */
func (sm *SlaveMonitor) init(url string) error {
sm.Host, sm.Port = splitHostPort(url)
slave, err := dbhelper.MySQLConnect(dbUser, dbPass, "tcp("+url+")")
defer slave.Close()
/* Initializes a server object */
func (server *ServerMonitor) init(url string) {
server.Host, server.Port = splitHostPort(url)
var err error
server.IP, err = dbhelper.CheckHostAddr(server.Host)
if err != nil {
return err
log.Fatalln("ERROR: DNS resolution error for host", server.Host)
}
slaveStatus, err := dbhelper.GetSlaveStatus(slave)
}

/* Refresh a server object */
func (sm *ServerMonitor) refresh() error {
sm.BinlogPos = dbhelper.GetVariableByName(sm.Conn, "GTID_BINLOG_POS")
sm.Strict = dbhelper.GetVariableByName(sm.Conn, "GTID_STRICT_MODE")
slaveStatus, err := dbhelper.GetSlaveStatus(sm.Conn)
if err != nil {
return err
}
sm.LogBin = dbhelper.GetVariableByName(slave, "LOG_BIN")
sm.ReadOnly = dbhelper.GetVariableByName(slave, "READ_ONLY")
sm.SlaveGtid = dbhelper.GetVariableByName(slave, "GTID_SLAVE_POS")
sm.LogBin = dbhelper.GetVariableByName(sm.Conn, "LOG_BIN")
sm.ReadOnly = dbhelper.GetVariableByName(sm.Conn, "READ_ONLY")
sm.CurrentGtid = dbhelper.GetVariableByName(sm.Conn, "GTID_CURRENT_POS")
sm.SlaveGtid = dbhelper.GetVariableByName(sm.Conn, "GTID_SLAVE_POS")
sm.UsingGtid = slaveStatus.Using_Gtid
sm.IOThread = slaveStatus.Slave_IO_Running
sm.SQLThread = slaveStatus.Slave_SQL_Running
Expand All @@ -226,7 +238,7 @@ func (sm *SlaveMonitor) init(url string) error {
}

/* Check replication health and return status string */
func (sm *SlaveMonitor) healthCheck() string {
func (sm *ServerMonitor) healthCheck() string {
if sm.Delay.Valid == false {
if sm.SQLThread == "Yes" && sm.IOThread == "No" {
return "NOT OK, IO Stopped"
Expand All @@ -243,31 +255,31 @@ func (sm *SlaveMonitor) healthCheck() string {
}
}

func switchover() {
func (master *ServerMonitor) switchover() {
log.Println("Starting switchover")
log.Println("Flushing tables on master")
err := dbhelper.FlushTablesNoLog(master)
err := dbhelper.FlushTablesNoLog(master.Conn)
if err != nil {
log.Println("WARNING: Could not flush tables on master", err)
}
log.Println("Checking long running updates on master")
if dbhelper.CheckLongRunningWrites(master, 10) > 0 {
if dbhelper.CheckLongRunningWrites(master.Conn, 10) > 0 {
log.Fatal("ERROR: Long updates running on master. Cannot switchover")
}
log.Println("Electing a new master")
candidate := electCandidate(slaveList)
candidate := master.electCandidate(slaveList)
newMasterHost, newMasterPort := splitHostPort(candidate)
log.Printf("Slave %s has been elected as a new master", candidate)
if *preScript != "" {
log.Printf("Calling pre-failover script")
out, err := exec.Command(*preScript, masterHost, newMasterHost).CombinedOutput()
out, err := exec.Command(*preScript, master.Host, newMasterHost).CombinedOutput()
if err != nil {
log.Println("ERROR:", err)
}
log.Println("Post-failover script complete", string(out))
}
log.Printf("Rejecting updates on master")
err = dbhelper.FlushTablesWithReadLock(master)
err = dbhelper.FlushTablesWithReadLock(master.Conn)
if err != nil {
log.Println("WARNING: Could not lock tables on master", err)
}
Expand All @@ -278,7 +290,7 @@ func switchover() {
}
newMaster := dbhelper.Connect(dbUser, dbPass, "tcp("+candidate+")")
log.Println("Waiting for candidate master to synchronize")
masterGtid := dbhelper.GetVariableByName(master, "GTID_BINLOG_POS")
masterGtid := dbhelper.GetVariableByName(master.Conn, "GTID_BINLOG_POS")
dbhelper.MasterPosWait(newMaster, masterGtid)
log.Println("Stopping slave thread on new master")
err = dbhelper.StopSlave(newMaster)
Expand All @@ -287,19 +299,19 @@ func switchover() {
}
cm := "CHANGE MASTER TO master_host='" + newMasterIP + "', master_port=" + newMasterPort + ", master_user='" + rplUser + "', master_password='" + rplPass + "'"
log.Println("Switching old master as a slave")
err = dbhelper.UnlockTables(master)
err = dbhelper.UnlockTables(master.Conn)
if err != nil {
log.Println("WARNING: Could not unlock tables on old master", err)
}
_, err = master.Exec(cm + ", master_use_gtid=current_pos")
_, err = master.Conn.Exec(cm + ", master_use_gtid=current_pos")
if err != nil {
log.Println("WARNING: Change master failed on old master", err)
}
err = dbhelper.StartSlave(master)
err = dbhelper.StartSlave(master.Conn)
if err != nil {
log.Println("WARNING: Start slave failed on old master", err)
}
err = dbhelper.SetReadOnly(master, true)
err = dbhelper.SetReadOnly(master.Conn, true)
if err != nil {
log.Printf("ERROR: Could not set old master as read-only", err)
}
Expand Down Expand Up @@ -393,7 +405,7 @@ func validateHostPort(h string, p string) bool {
}

/* Returns a candidate from a list of slaves. If there's only one slave it will be the de facto candidate. */
func electCandidate(l []string) string {
func (master *ServerMonitor) electCandidate(l []string) string {
ll := len(l)
if *verbose {
log.Println("Processing %s candidates", ll)
Expand All @@ -414,11 +426,11 @@ func electCandidate(l []string) string {
if dbhelper.CheckSlavePrerequisites(sl, sh) == false {
continue
}
if dbhelper.CheckBinlogFilters(master, sl) == false {
if dbhelper.CheckBinlogFilters(master.Conn, sl) == false {
log.Printf("WARNING: Binlog filters differ on master and slave %s. Skipping", v)
continue
}
if dbhelper.CheckReplicationFilters(master, sl) == false {
if dbhelper.CheckReplicationFilters(master.Conn, sl) == false {
log.Printf("WARNING: Replication filters differ on master and slave %s. Skipping", v)
continue
}
Expand All @@ -431,7 +443,7 @@ func electCandidate(l []string) string {
log.Printf("WARNING: Slave %s has more than %d seconds of replication delay (%d). Skipping", v, *maxDelay, ss.Seconds_Behind_Master.Int64)
continue
}
if *gtidCheck && dbhelper.CheckSlaveSync(sl, master) == false {
if *gtidCheck && dbhelper.CheckSlaveSync(sl, master.Conn) == false {
log.Printf("WARNING: Slave %s not in sync. Skipping", v)
continue
}
Expand Down

0 comments on commit 5e5ee13

Please sign in to comment.