Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use advertised replica names for replication check #5

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/app/active_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (app *App) calcActiveNodes(state, stateDcs map[string]*HostState, oldActive
app.logger.Warn(fmt.Sprintf("Calc active nodes: lost master %s", host))
continue
}
if (masterState.PingOk && masterState.PingStable) && !(replicaState.MasterLinkState && masterNode.MatchHost(replicaState.MasterHost)) {
if (masterState.PingOk && masterState.PingStable) && !replicates(&masterState, replicaState, host, masterNode, false) {
app.logger.Error(fmt.Sprintf("Calc active nodes: %s is not replicating from alive master, deleting from active...", host))
continue
}
Expand Down
5 changes: 2 additions & 3 deletions internal/app/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ func (app *App) checkHAReplicasRunning() bool {
app.logger.Warn("Host is ahead in replication history", "fqdn", host)
aheadHosts++
}
if hostState.PingOk && !hostState.IsMaster && hostState.ReplicaState != nil {
rs := hostState.ReplicaState
if rs.MasterLinkState && local.MatchHost(rs.MasterHost) {
if hostState.PingOk && !hostState.IsMaster {
if replicates(localState, hostState.ReplicaState, host, local, false) {
availableReplicas++
}
}
Expand Down
11 changes: 6 additions & 5 deletions internal/app/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,35 +83,36 @@ func (app *App) changeMaster(host, master string) error {
}

node := app.shard.Get(host)
masterState := app.getHostState(master)
masterNode := app.shard.Get(master)
state := app.getHostState(host)

if !state.PingOk {
return fmt.Errorf("changeMaster: replica %s is dead - unable to init repair", host)
}

app.repairReplica(node, state, master)
app.repairReplica(node, masterState, state, master, host)

deadline := time.Now().Add(app.config.Redis.WaitReplicationTimeout)
for time.Now().Before(deadline) {
state = app.getHostState(host)
rs := state.ReplicaState
if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) {
if rs != nil && replicates(masterState, rs, host, masterNode, false) {
break
}
if !state.PingOk {
return fmt.Errorf("changeMaster: replica %s died while waiting to start replication from %s", host, master)
}
masterState := app.getHostState(master)
masterState = app.getHostState(master)
if !masterState.PingOk {
return fmt.Errorf("changeMaster: %s died while waiting to start replication to %s", master, host)
}
app.logger.Info(fmt.Sprintf("ChangeMaster: waiting for %s to start replication from %s", host, master))
app.repairReplica(node, state, master)
app.repairReplica(node, masterState, state, master, host)
time.Sleep(time.Second)
}
rs := state.ReplicaState
if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) {
if rs != nil && replicates(masterState, rs, host, masterNode, false) {
app.logger.Info(fmt.Sprintf("ChangeMaster: %s started replication from %s", host, master))
} else {
return fmt.Errorf("%s was unable to start replication from %s", host, master)
Expand Down
12 changes: 7 additions & 5 deletions internal/app/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
func (app *App) repairShard(shardState map[string]*HostState, activeNodes []string, master string) {
replicas := make([]string, 0)
syncing := 0
masterState := shardState[master]
masterNode := app.shard.Get(master)
for host, state := range shardState {
if !state.PingOk {
Expand All @@ -20,7 +21,7 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri
app.repairMaster(masterNode, activeNodes, state)
} else {
rs := state.ReplicaState
if rs != nil && rs.MasterSyncInProgress && masterNode.MatchHost(rs.MasterHost) {
if rs != nil && rs.MasterSyncInProgress && replicates(masterState, rs, host, masterNode, true) {
syncing++
}
replicas = append(replicas, host)
Expand All @@ -39,9 +40,9 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri
}
}
rs := state.ReplicaState
if rs == nil || state.IsReplPaused || !(rs.MasterLinkState || rs.MasterSyncInProgress) || !masterNode.MatchHost(rs.MasterHost) {
if rs == nil || state.IsReplPaused || !replicates(masterState, rs, host, masterNode, true) {
if syncing < app.config.Redis.MaxParallelSyncs {
app.repairReplica(node, state, master)
app.repairReplica(node, masterState, state, master, host)
syncing++
} else {
app.logger.Error(fmt.Sprintf("Leaving replica %s broken: currently syncing %d/%d", host, syncing, app.config.Redis.MaxParallelSyncs))
Expand Down Expand Up @@ -75,10 +76,11 @@ func (app *App) repairMaster(node *redis.Node, activeNodes []string, state *Host
}
}

func (app *App) repairReplica(node *redis.Node, state *HostState, master string) {
func (app *App) repairReplica(node *redis.Node, masterState, state *HostState, master, replicaFQDN string) {
masterNode := app.shard.Get(master)
rs := state.ReplicaState
if rs == nil || !masterNode.MatchHost(rs.MasterHost) {
if !replicates(masterState, rs, replicaFQDN, masterNode, true) {
app.logger.Info("Initiating replica repair", "fqdn", replicaFQDN)
switch app.mode {
case modeSentinel:
err := node.SentinelMakeReplica(app.ctx, master)
Expand Down
17 changes: 17 additions & 0 deletions internal/app/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package app

import (
"slices"

"github.com/yandex/rdsync/internal/redis"
)

func replicates(masterState *HostState, replicaState *ReplicaState, replicaFQDN string, masterNode *redis.Node, allowSync bool) bool {
if replicaState == nil || !(replicaState.MasterLinkState || allowSync) {
return false
}
if slices.Contains(masterState.ConnectedReplicas, replicaFQDN) {
return true
}
return masterNode.MatchHost(replicaState.MasterHost)
}
81 changes: 56 additions & 25 deletions internal/app/state.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package app

import (
"fmt"
"strconv"
"strings"
"time"

"github.com/yandex/rdsync/internal/dcs"
)

func (app *App) setStateError(state *HostState, fqdn, error string) {
app.logger.Error("GetHostState error", "fqdn", fqdn, "error", error)
state.Error = error
}

func (app *App) getHostState(fqdn string) *HostState {
node := app.shard.Get(fqdn)
var state HostState
Expand All @@ -20,7 +27,7 @@ func (app *App) getHostState(fqdn string) *HostState {
}
info, err := node.GetInfo(app.ctx)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
if len(info) == 0 {
state.PingOk = false
state.PingStable = false
Expand All @@ -31,134 +38,158 @@ func (app *App) getHostState(fqdn string) *HostState {
var ok bool
state.RunID, ok = info["run_id"]
if !ok {
state.Error = "No run_id in info"
app.setStateError(&state, fqdn, "No run_id in info")
return &state
}
state.ReplicationID, ok = info["master_replid"]
if !ok {
state.Error = "No master_replid in info"
app.setStateError(&state, fqdn, "No master_replid in info")
return &state
}
state.ReplicationID2, ok = info["master_replid2"]
if !ok {
state.Error = "No master_replid2 in info"
app.setStateError(&state, fqdn, "No master_replid2 in info")
return &state
}
masterOffset, ok := info["master_repl_offset"]
if !ok {
state.Error = "No master_repl_offset in info"
app.setStateError(&state, fqdn, "No master_repl_offset in info")
return &state
}
state.MasterReplicationOffset, err = strconv.ParseInt(masterOffset, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
secondOffset, ok := info["second_repl_offset"]
if !ok {
state.Error = "No second_repl_offset in info"
app.setStateError(&state, fqdn, "No second_repl_offset in info")
return &state
}
state.SecondReplicationOffset, err = strconv.ParseInt(secondOffset, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
replBacklogFirstByte, ok := info["repl_backlog_first_byte_offset"]
if !ok {
state.Error = "No repl_backlog_first_byte_offset in info"
app.setStateError(&state, fqdn, "No repl_backlog_first_byte_offset in info")
return &state
}
state.ReplicationBacklogStart, err = strconv.ParseInt(replBacklogFirstByte, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
replBacklogHistlen, ok := info["repl_backlog_histlen"]
if !ok {
state.Error = "No repl_backlog_histlen in info"
app.setStateError(&state, fqdn, "No repl_backlog_histlen in info")
return &state
}
state.ReplicationBacklogSize, err = strconv.ParseInt(replBacklogHistlen, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
role, ok := info["role"]
if !ok {
state.Error = "No role in info"
app.setStateError(&state, fqdn, "No role in info")
return &state
}
if role == "master" {
state.IsMaster = true
numReplicasStr, ok := info["connected_slaves"]
if !ok {
app.setStateError(&state, fqdn, "Master has no connected_slaves in info")
return &state
}
numReplicas, err := strconv.ParseInt(numReplicasStr, 10, 64)
if err != nil {
app.setStateError(&state, fqdn, err.Error())
return &state
}
var i int64
for i < numReplicas {
replicaID := fmt.Sprintf("slave%d", i)
replicaValue, ok := info[replicaID]
if !ok {
app.setStateError(&state, fqdn, fmt.Sprintf("Master has no %s in info", replicaID))
return &state
}
// ip is first value in slaveN info
start := strings.Index(replicaValue, "=")
end := strings.Index(replicaValue, ",")
state.ConnectedReplicas = append(state.ConnectedReplicas, replicaValue[start+1:end])
i++
}
} else {
state.IsMaster = false
rs := ReplicaState{}
rs.MasterHost, ok = info["master_host"]
if !ok {
state.Error = "Replica but no master_host in info"
app.setStateError(&state, fqdn, "Replica but no master_host in info")
return &state
}
linkState, ok := info["master_link_status"]
if !ok {
state.Error = "Replica but no master_link_status in info"
app.setStateError(&state, fqdn, "Replica but no master_link_status in info")
return &state
}
rs.MasterLinkState = (linkState == "up")
syncInProgress, ok := info["master_sync_in_progress"]
if !ok {
state.Error = "Replica but no master_sync_in_progress in info"
app.setStateError(&state, fqdn, "Replica but no master_sync_in_progress in info")
return &state
}
rs.MasterSyncInProgress = (syncInProgress != "0")
if !rs.MasterLinkState && !rs.MasterSyncInProgress {
downSeconds, ok := info["master_link_down_since_seconds"]
if !ok {
state.Error = "Replica with link down but no master_link_down_since_seconds in info"
app.setStateError(&state, fqdn, "Replica with link down but no master_link_down_since_seconds in info")
return &state
}
rs.MasterLinkDownTime, err = strconv.ParseInt(downSeconds, 10, 64)
rs.MasterLinkDownTime *= 1000
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
}
replicaOffset, ok := info["slave_repl_offset"]
if !ok {
state.Error = "Replica but no slave_repl_offset in info"
app.setStateError(&state, fqdn, "Replica but no slave_repl_offset in info")
return &state
}
rs.ReplicationOffset, err = strconv.ParseInt(replicaOffset, 10, 64)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
state.ReplicaState = &rs
}
state.IsReadOnly, err = node.IsReadOnly(app.ctx)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
state.IsOffline, err = node.IsOffline(app.ctx)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
state.IsReplPaused, err = node.IsReplPaused(app.ctx)
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
err = node.RefreshAddrs()
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
state.IP, err = node.GetIP()
if err != nil {
state.Error = err.Error()
app.setStateError(&state, fqdn, err.Error())
return &state
}
return &state
Expand Down
1 change: 1 addition & 0 deletions internal/app/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type HostState struct {
ReplicationID string `json:"replication_id"`
ReplicationID2 string `json:"replication_id2"`
Error string `json:"error"`
ConnectedReplicas []string `json:"connected_replicas"`
ReplicaState *ReplicaState `json:"replica_state"`
SentiCacheState *SentiCacheState `json:"senticache_state"`
}
Expand Down
1 change: 1 addition & 0 deletions tests/images/redis/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ protected-mode no
offline yes
repl-disable-tcp-nodelay yes
repl-diskless-sync yes
repl-diskless-sync-delay 8
no-appendfsync-on-rewrite yes
appendonly yes
masterauth "functestpassword"
Expand Down