Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give up manager role if DCS is reachable by unavailable primary #64

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type App struct {
ctx context.Context
mode appMode
nodeFailTime map[string]time.Time
splitTime map[string]time.Time
state appState
critical atomic.Value
logger *slog.Logger
Expand Down Expand Up @@ -72,9 +73,6 @@ func NewApp(configFile, logLevel string) (*App, error) {
return nil, err
}
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevelN}))
if err != nil {
return nil, err
}
mode, err := parseMode(conf.Mode)
if err != nil {
return nil, err
Expand All @@ -83,6 +81,7 @@ func NewApp(configFile, logLevel string) (*App, error) {
ctx: baseContext(),
mode: mode,
nodeFailTime: make(map[string]time.Time),
splitTime: make(map[string]time.Time),
state: stateInit,
logger: logger,
config: conf,
Expand Down
39 changes: 39 additions & 0 deletions internal/app/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -136,9 +137,47 @@ func (app *App) stateManager() appState {
}
if !shardState[master].PingOk {
app.logger.Error(fmt.Sprintf("Master %s probably failed, do not perform any kind of repair", master))
secwall marked this conversation as resolved.
Show resolved Hide resolved
if master != app.config.Hostname && shardStateDcs[master].PingOk {
if app.splitTime[master].IsZero() {
app.splitTime[master] = time.Now()
}
if app.config.Redis.FailoverTimeout > 0 {
failedTime := time.Since(app.splitTime[master])
if failedTime < app.config.Redis.FailoverTimeout {
app.logger.Error(fmt.Sprintf("According to DCS %s is still alive, will wait for %v before giving up on manager role",
master, app.config.Redis.FailoverTimeout-failedTime))
return stateManager
}
}
app.logger.Error(fmt.Sprintf("According to DCS master %s is alive, but we see it as failed. Giving up on manager role", master))
delete(app.splitTime, master)
app.dcs.ReleaseLock(pathManagerLock)
waitCtx, cancel := context.WithTimeout(app.ctx, app.config.Redis.FailoverTimeout)
defer cancel()
ticker := time.NewTicker(app.config.TickInterval)
var manager dcs.LockOwner
Out:
for {
select {
case <-ticker.C:
err = app.dcs.Get(pathManagerLock, &manager)
if err != nil {
app.logger.Error(fmt.Sprintf("Failed to get %s", pathManagerLock), "error", err)
} else if manager.Hostname != app.config.Hostname {
app.logger.Info(fmt.Sprintf("New manager: %s", manager.Hostname))
break Out
}
case <-waitCtx.Done():
app.logger.Error("No node took manager lock for failover timeout")
break Out
}
}
return stateCandidate
}
return stateManager
}
delete(app.nodeFailTime, master)
delete(app.splitTime, master)
app.repairShard(shardState, activeNodes, master)

if updateActive {
Expand Down
51 changes: 51 additions & 0 deletions tests/features/06_cluster_lost.feature
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,54 @@ Feature: Cluster mode survives dcs conn loss
And host "redis2" is attached to the network
And host "redis3" is attached to the network
Then redis host "redis1" should become available within "60" seconds

Scenario: Cluster mode partially partitioned manager gives up on manager role
Given clustered shard is up and running
Then redis host "redis1" should be master
And redis host "redis2" should become replica of "redis1" within "15" seconds
And replication on redis host "redis2" should run fine within "15" seconds
And redis host "redis3" should become replica of "redis1" within "15" seconds
And replication on redis host "redis3" should run fine within "15" seconds
And zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
["redis1","redis2","redis3"]
"""
When I run command on host "redis1" with timeout "20" seconds
"""
supervisorctl stop rdsync
"""
Then command return code should be "0"
And zookeeper node "/test/manager" should match regexp within "30" seconds
"""
.*redis[23].*
"""
When I run command on host "redis1" with timeout "20" seconds
"""
supervisorctl start rdsync
"""
When I get zookeeper node "/test/manager"
And I save zookeeper query result as "new_manager"
And port "6379" on host "{{.new_manager.hostname}}" is blocked
And I wait for "60" seconds
Then redis host "redis1" should be master
When I run command on host "{{.new_manager.hostname}}"
"""
grep ERROR /var/log/rdsync.log
"""
Then command output should match regexp
"""
.*Giving up on manager role.*
"""
When I run command on host "{{.new_manager.hostname}}"
"""
grep INFO /var/log/rdsync.log
"""
Then command output should match regexp
"""
.*New manager.*
"""
When port "6379" on host "{{.new_manager.hostname}}" is unblocked
Then zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
["redis1","redis2","redis3"]
"""
51 changes: 51 additions & 0 deletions tests/features/06_sentinel_lost.feature
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,54 @@ Feature: Sentinel mode survives dcs conn loss
And host "redis2" is attached to the network
And host "redis3" is attached to the network
Then redis host "redis1" should become available within "60" seconds

Scenario: Sentinel mode partially partitioned manager gives up on manager role
Given sentinel shard is up and running
Then redis host "redis1" should be master
And redis host "redis2" should become replica of "redis1" within "15" seconds
And replication on redis host "redis2" should run fine within "15" seconds
And redis host "redis3" should become replica of "redis1" within "15" seconds
And replication on redis host "redis3" should run fine within "15" seconds
And zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
["redis1","redis2","redis3"]
"""
When I run command on host "redis1" with timeout "20" seconds
"""
supervisorctl stop rdsync
"""
Then command return code should be "0"
And zookeeper node "/test/manager" should match regexp within "30" seconds
"""
.*redis[23].*
"""
When I run command on host "redis1" with timeout "20" seconds
"""
supervisorctl start rdsync
"""
When I get zookeeper node "/test/manager"
And I save zookeeper query result as "new_manager"
And port "6379" on host "{{.new_manager.hostname}}" is blocked
And I wait for "60" seconds
Then redis host "redis1" should be master
When I run command on host "{{.new_manager.hostname}}"
"""
grep ERROR /var/log/rdsync.log
"""
Then command output should match regexp
"""
.*Giving up on manager role.*
"""
When I run command on host "{{.new_manager.hostname}}"
"""
grep INFO /var/log/rdsync.log
"""
Then command output should match regexp
"""
.*New manager.*
"""
When port "6379" on host "{{.new_manager.hostname}}" is unblocked
Then zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
["redis1","redis2","redis3"]
"""
10 changes: 10 additions & 0 deletions tests/rdsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,14 @@ func (tctx *testContext) stepHostIsAttachedToTheNetwork(host string) error {
return tctx.composer.AttachToNet(host)
}

func (tctx *testContext) stepPortOnHostIsBlocked(port int, host string) error {
return tctx.composer.BlockPort(host, port)
}

func (tctx *testContext) stepPortOnHostIsUnBlocked(port int, host string) error {
return tctx.composer.UnBlockPort(host, port)
}

func (tctx *testContext) stepHostIsAdded(host string) error {
err := tctx.composer.Start(host)
if err != nil {
Expand Down Expand Up @@ -1010,6 +1018,8 @@ func InitializeScenario(s *godog.ScenarioContext) {
s.Step(`^host "([^"]*)" is detached from the network$`, tctx.stepHostIsDetachedFromTheNetwork)
s.Step(`^host "([^"]*)" is started$`, tctx.stepHostIsStarted)
s.Step(`^host "([^"]*)" is attached to the network$`, tctx.stepHostIsAttachedToTheNetwork)
s.Step(`^port "(\d+)" on host "([^"]*)" is blocked$`, tctx.stepPortOnHostIsBlocked)
s.Step(`^port "(\d+)" on host "([^"]*)" is unblocked$`, tctx.stepPortOnHostIsUnBlocked)
s.Step(`^host "([^"]*)" is added`, tctx.stepHostIsAdded)
s.Step(`^host "([^"]*)" is deleted$`, tctx.stepHostIsDeleted)

Expand Down
46 changes: 46 additions & 0 deletions tests/testutil/docker_composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type Composer interface {
DetachFromNet(service string) error
// Attachs container/VM to network
AttachToNet(service string) error
// Blocks port on host
BlockPort(service string, port int) error
// Unblocks port on host
UnBlockPort(service string, port int) error
// Executes command inside container/VM with given timeout.
// Returns command retcode and output (stdoud and stderr are mixed)
RunCommand(service, cmd string, timeout time.Duration) (retcode int, output string, err error)
Expand Down Expand Up @@ -334,6 +338,48 @@ func (dc *DockerComposer) DetachFromNet(service string) error {
return nil
}

// BlockPort blocks port for host
func (dc *DockerComposer) BlockPort(service string, port int) error {
_, ok := dc.containers[service]
if !ok {
return fmt.Errorf("no such service: %s", service)
}
cmds := []string{
fmt.Sprintf("iptables -A INPUT -i eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("iptables -A OUTPUT -o eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("ip6tables -A INPUT -i eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("ip6tables -A OUTPUT -o eth0 -p tcp --dport %d -j DROP", port),
}
for _, cmd := range cmds {
_, _, err := dc.RunCommand(service, cmd, defaultDockerTimeout)
if err != nil {
return err
}
}
return nil
}

// UnBlockPort removes blocking rules for port on host
func (dc *DockerComposer) UnBlockPort(service string, port int) error {
_, ok := dc.containers[service]
if !ok {
return fmt.Errorf("no such service: %s", service)
}
cmds := []string{
fmt.Sprintf("iptables -D INPUT -i eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("iptables -D OUTPUT -o eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("ip6tables -D INPUT -i eth0 -p tcp --dport %d -j DROP", port),
fmt.Sprintf("ip6tables -D OUTPUT -o eth0 -p tcp --dport %d -j DROP", port),
}
for _, cmd := range cmds {
_, _, err := dc.RunCommand(service, cmd, defaultDockerTimeout)
if err != nil {
return err
}
}
return nil
}

func newUntarReaderCloser(reader io.ReadCloser) (io.ReadCloser, error) {
tarReader := tar.NewReader(reader)
_, err := tarReader.Next()
Expand Down
Loading