diff --git a/internal/app/active_nodes.go b/internal/app/active_nodes.go index 227507b..4e1c05b 100644 --- a/internal/app/active_nodes.go +++ b/internal/app/active_nodes.go @@ -140,7 +140,7 @@ func (app *App) calcActiveNodes(state, stateDcs map[string]*HostState, oldActive app.logger.Warn(fmt.Sprintf("Calc active nodes: lost master %s", host)) continue } - if (masterState.PingOk && masterState.PingStable) && !(replicaState.MasterLinkState && masterNode.MatchHost(replicaState.MasterHost)) { + if (masterState.PingOk && masterState.PingStable) && !replicates(&masterState, replicaState, host, masterNode, false) { app.logger.Error(fmt.Sprintf("Calc active nodes: %s is not replicating from alive master, deleting from active...", host)) continue } diff --git a/internal/app/checks.go b/internal/app/checks.go index eb9b147..4784b2d 100644 --- a/internal/app/checks.go +++ b/internal/app/checks.go @@ -32,9 +32,8 @@ func (app *App) checkHAReplicasRunning() bool { app.logger.Warn("Host is ahead in replication history", "fqdn", host) aheadHosts++ } - if hostState.PingOk && !hostState.IsMaster && hostState.ReplicaState != nil { - rs := hostState.ReplicaState - if rs.MasterLinkState && local.MatchHost(rs.MasterHost) { + if hostState.PingOk && !hostState.IsMaster { + if replicates(localState, hostState.ReplicaState, host, local, false) { availableReplicas++ } } diff --git a/internal/app/master.go b/internal/app/master.go index ceaf0e2..36db14d 100644 --- a/internal/app/master.go +++ b/internal/app/master.go @@ -83,6 +83,7 @@ func (app *App) changeMaster(host, master string) error { } node := app.shard.Get(host) + masterState := app.getHostState(master) masterNode := app.shard.Get(master) state := app.getHostState(host) @@ -90,28 +91,28 @@ func (app *App) changeMaster(host, master string) error { return fmt.Errorf("changeMaster: replica %s is dead - unable to init repair", host) } - app.repairReplica(node, state, master) + app.repairReplica(node, masterState, state, master, host) deadline := time.Now().Add(app.config.Redis.WaitReplicationTimeout) for time.Now().Before(deadline) { state = app.getHostState(host) rs := state.ReplicaState - if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) { + if rs != nil && replicates(masterState, rs, host, masterNode, false) { break } if !state.PingOk { return fmt.Errorf("changeMaster: replica %s died while waiting to start replication from %s", host, master) } - masterState := app.getHostState(master) + masterState = app.getHostState(master) if !masterState.PingOk { return fmt.Errorf("changeMaster: %s died while waiting to start replication to %s", master, host) } app.logger.Info(fmt.Sprintf("ChangeMaster: waiting for %s to start replication from %s", host, master)) - app.repairReplica(node, state, master) + app.repairReplica(node, masterState, state, master, host) time.Sleep(time.Second) } rs := state.ReplicaState - if rs != nil && rs.MasterLinkState && masterNode.MatchHost(rs.MasterHost) { + if rs != nil && replicates(masterState, rs, host, masterNode, false) { app.logger.Info(fmt.Sprintf("ChangeMaster: %s started replication from %s", host, master)) } else { return fmt.Errorf("%s was unable to start replication from %s", host, master) diff --git a/internal/app/repair.go b/internal/app/repair.go index 3be3cb7..bfcbf4c 100644 --- a/internal/app/repair.go +++ b/internal/app/repair.go @@ -11,6 +11,7 @@ import ( func (app *App) repairShard(shardState map[string]*HostState, activeNodes []string, master string) { replicas := make([]string, 0) syncing := 0 + masterState := shardState[master] masterNode := app.shard.Get(master) for host, state := range shardState { if !state.PingOk { @@ -20,7 +21,7 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri app.repairMaster(masterNode, activeNodes, state) } else { rs := state.ReplicaState - if rs != nil && rs.MasterSyncInProgress && masterNode.MatchHost(rs.MasterHost) { + if rs != nil && rs.MasterSyncInProgress && replicates(masterState, rs, host, masterNode, true) { syncing++ } replicas = append(replicas, host) @@ -39,9 +40,9 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri } } rs := state.ReplicaState - if rs == nil || state.IsReplPaused || !(rs.MasterLinkState || rs.MasterSyncInProgress) || !masterNode.MatchHost(rs.MasterHost) { + if rs == nil || state.IsReplPaused || !replicates(masterState, rs, host, masterNode, true) { if syncing < app.config.Redis.MaxParallelSyncs { - app.repairReplica(node, state, master) + app.repairReplica(node, masterState, state, master, host) syncing++ } else { app.logger.Error(fmt.Sprintf("Leaving replica %s broken: currently syncing %d/%d", host, syncing, app.config.Redis.MaxParallelSyncs)) @@ -75,10 +76,10 @@ func (app *App) repairMaster(node *redis.Node, activeNodes []string, state *Host } } -func (app *App) repairReplica(node *redis.Node, state *HostState, master string) { +func (app *App) repairReplica(node *redis.Node, masterState, state *HostState, master, replicaFQDN string) { masterNode := app.shard.Get(master) rs := state.ReplicaState - if rs == nil || !masterNode.MatchHost(rs.MasterHost) { + if !replicates(masterState, rs, replicaFQDN, masterNode, true) { switch app.mode { case modeSentinel: err := node.SentinelMakeReplica(app.ctx, master) diff --git a/internal/app/replication.go b/internal/app/replication.go new file mode 100644 index 0000000..5c56de5 --- /dev/null +++ b/internal/app/replication.go @@ -0,0 +1,17 @@ +package app + +import ( + "slices" + + "github.com/yandex/rdsync/internal/redis" +) + +func replicates(masterState *HostState, replicaState *ReplicaState, replicaFQDN string, masterNode *redis.Node, allowSync bool) bool { + if replicaState == nil || !(replicaState.MasterLinkState || (allowSync && !replicaState.MasterSyncInProgress)) { + return false + } + if slices.Contains(masterState.ConnectedReplicas, replicaFQDN) { + return true + } + return masterNode.MatchHost(replicaState.MasterHost) +} diff --git a/internal/app/state.go b/internal/app/state.go index 9b41fcb..bcf15b0 100644 --- a/internal/app/state.go +++ b/internal/app/state.go @@ -1,7 +1,9 @@ package app import ( + "fmt" "strconv" + "strings" "time" "github.com/yandex/rdsync/internal/dcs" @@ -91,6 +93,30 @@ func (app *App) getHostState(fqdn string) *HostState { } if role == "master" { state.IsMaster = true + numReplicasStr, ok := info["connected_slaves"] + if !ok { + state.Error = "Master has no connected_slaves in info" + return &state + } + numReplicas, err := strconv.ParseInt(numReplicasStr, 10, 64) + if err != nil { + state.Error = err.Error() + return &state + } + var i int64 + for i < numReplicas { + replicaID := fmt.Sprintf("slave%d", i) + replicaValue, ok := info[replicaID] + if !ok { + state.Error = fmt.Sprintf("Master has no %s in info", replicaID) + return &state + } + // ip is first value in slaveN info + start := strings.Index(replicaValue, "=") + end := strings.Index(replicaValue, ",") + state.ConnectedReplicas = append(state.ConnectedReplicas, replicaValue[start+1:end]) + i++ + } } else { state.IsMaster = false rs := ReplicaState{} diff --git a/internal/app/types.go b/internal/app/types.go index f214ba8..b032f6d 100644 --- a/internal/app/types.go +++ b/internal/app/types.go @@ -111,6 +111,7 @@ type HostState struct { ReplicationID string `json:"replication_id"` ReplicationID2 string `json:"replication_id2"` Error string `json:"error"` + ConnectedReplicas []string `json:"connected_replicas"` ReplicaState *ReplicaState `json:"replica_state"` SentiCacheState *SentiCacheState `json:"senticache_state"` }