diff --git a/internal/app/cache.go b/internal/app/cache.go index 43f7b9f..9ba8f7d 100644 --- a/internal/app/cache.go +++ b/internal/app/cache.go @@ -7,14 +7,10 @@ import ( "github.com/yandex/rdsync/internal/redis" ) -func (app *App) updateCache() error { +func (app *App) updateCache(refState map[string]*HostState, cache *redis.SentiCacheNode) error { var state redis.SentiCacheState masterReadOnly := false - dcsState, err := app.getShardStateFromDcs() - if err != nil { - return err - } - for fqdn, hostState := range dcsState { + for fqdn, hostState := range refState { if hostState == nil || !hostState.PingOk || hostState.Error != "" { continue } @@ -53,7 +49,7 @@ func (app *App) updateCache() error { } state.Master.Port = app.config.Redis.Port state.Master.RunID = hostState.RunID - state.Master.Quorum = len(dcsState)/2 + 1 + state.Master.Quorum = len(refState)/2 + 1 state.Master.ParallelSyncs = app.config.Redis.MaxParallelSyncs state.Master.ConfigEpoch = 0 } else { @@ -84,9 +80,9 @@ func (app *App) updateCache() error { } } if state.Master.IP == "" { - return fmt.Errorf("0 open masters within %d hosts", len(dcsState)) + return fmt.Errorf("0 open masters within %d hosts", len(refState)) } - return app.cache.Update(app.ctx, &state) + return cache.Update(app.ctx, &state) } func (app *App) cacheUpdater() { @@ -94,7 +90,10 @@ func (app *App) cacheUpdater() { for { select { case <-ticker.C: - err := app.updateCache() + dcsState, err := app.getShardStateFromDcs() + if err == nil { + err = app.updateCache(dcsState, app.cache) + } if err != nil { app.logger.Error("CacheUpdater: failed to update cache", "error", err) } diff --git a/internal/app/switchover.go b/internal/app/switchover.go index 81ac35a..bfe1f92 100644 --- a/internal/app/switchover.go +++ b/internal/app/switchover.go @@ -6,6 +6,7 @@ import ( "time" "github.com/yandex/rdsync/internal/dcs" + "github.com/yandex/rdsync/internal/redis" ) const ( @@ -451,6 +452,25 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes app.repairMaster(newMasterNode, psyncActiveNodes, shardState[newMaster]) + if app.mode == modeSentinel { + shardState, err = app.getShardStateFromDB() + if err == nil { + sentiCacheUpdateErrs := runParallel(func(host string) error { + sentiCacheNode, err := redis.NewRemoteSentiCacheNode(app.config, host, app.logger) + if err != nil { + return err + } + return app.updateCache(shardState, sentiCacheNode) + }, activeNodes) + combined := combineErrors(sentiCacheUpdateErrs) + if combined != nil { + app.logger.Warn("Unable to notify all senticache nodes on new master", "error", combined) + } + } else { + app.logger.Warn("Unable to get state for senticache nodes notify on new master", "error", err) + } + } + errs := runParallel(func(host string) error { if host == newMaster || !shardState[host].PingOk { return nil diff --git a/internal/redis/senticache.go b/internal/redis/senticache.go index 222432a..6d75b12 100644 --- a/internal/redis/senticache.go +++ b/internal/redis/senticache.go @@ -63,9 +63,9 @@ type SentiCacheNode struct { broken bool } -// NewSentiCacheNode is a SentiCacheNode constructor -func NewSentiCacheNode(config *config.Config, logger *slog.Logger) (*SentiCacheNode, error) { - addr := net.JoinHostPort(localhost, strconv.Itoa(config.SentinelMode.CachePort)) +// NewRemoteSentiCacheNode is a remote SentiCacheNode constructor +func NewRemoteSentiCacheNode(config *config.Config, host string, logger *slog.Logger) (*SentiCacheNode, error) { + addr := net.JoinHostPort(host, strconv.Itoa(config.SentinelMode.CachePort)) opts := client.Options{ Addr: addr, Username: config.SentinelMode.CacheAuthUser, @@ -78,7 +78,7 @@ func NewSentiCacheNode(config *config.Config, logger *slog.Logger) (*SentiCacheN Protocol: 2, } if config.SentinelMode.UseTLS { - tlsConf, err := getTLSConfig(config, config.SentinelMode.TLSCAPath, localhost) + tlsConf, err := getTLSConfig(config, config.SentinelMode.TLSCAPath, host) if err != nil { return nil, err } @@ -93,6 +93,11 @@ func NewSentiCacheNode(config *config.Config, logger *slog.Logger) (*SentiCacheN return &node, nil } +// NewSentiCacheNode is a local SentiCacheNode constructor +func NewSentiCacheNode(config *config.Config, logger *slog.Logger) (*SentiCacheNode, error) { + return NewRemoteSentiCacheNode(config, localhost, logger) +} + // Close closes underlying Redis connection func (s *SentiCacheNode) Close() error { return s.conn.Close()