Skip to content

Commit

Permalink
Add aof mode control with options (on, off, on replicas)
Browse files Browse the repository at this point in the history
  • Loading branch information
secwall committed Nov 26, 2024
1 parent ed9ad7d commit 8068cab
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 1 deletion.
34 changes: 34 additions & 0 deletions internal/app/aof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package app

import (
"os"
)

func (app *App) adjustAofMode(master string) error {
if app.aofMode == modeUnspecified {
return nil
}
local := app.shard.Local()
targetMode := true
if app.aofMode == modeOff {
targetMode = false
} else if app.aofMode == modeOnReplicas && local.FQDN() == master && app.checkHAReplicasRunning() {
targetMode = false
}
currentMode, err := local.GetAppendonly(app.ctx)
if err != nil {
return err
}
if currentMode != targetMode {
err = local.SetAppendonly(app.ctx, targetMode)
if err != nil {
return err
}
}
if app.config.Redis.AofPath != "" && !targetMode {
if _, err := os.Stat(app.config.Redis.AofPath); err == nil {
return os.RemoveAll(app.config.Redis.AofPath)
}
}
return nil
}
6 changes: 6 additions & 0 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type App struct {
ctx context.Context
mode appMode
aofMode aofMode
nodeFailTime map[string]time.Time
splitTime map[string]time.Time
state appState
Expand Down Expand Up @@ -77,9 +78,14 @@ func NewApp(configFile, logLevel string) (*App, error) {
if err != nil {
return nil, err
}
aofMode, err := parseAofMode(conf.AofMode)
if err != nil {
return nil, err
}
app := &App{
ctx: baseContext(),
mode: mode,
aofMode: aofMode,
nodeFailTime: make(map[string]time.Time),
splitTime: make(map[string]time.Time),
state: stateInit,
Expand Down
4 changes: 4 additions & 0 deletions internal/app/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (app *App) repairLocalNode(master string) bool {
}

if !offline {
err = app.adjustAofMode(master)
if err != nil {
app.logger.Error("Unable to adjust aof config on local node", "error", err)
}
return true
}

Expand Down
37 changes: 37 additions & 0 deletions internal/app/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,43 @@ func parseMode(mode string) (appMode, error) {
return modeSentinel, fmt.Errorf("unknown mode: %s", mode)
}

type aofMode int

const (
modeUnspecified aofMode = iota
modeOn
modeOff
modeOnReplicas
)

func (m aofMode) String() string {
switch m {
case modeUnspecified:
return "Unspecified"
case modeOn:
return "On"
case modeOff:
return "Off"
case modeOnReplicas:
return "OnReplicas"
}
return "Unknown"
}

func parseAofMode(mode string) (aofMode, error) {
switch mode {
case "Unspecified":
return modeUnspecified, nil
case "On":
return modeOn, nil
case "Off":
return modeOff, nil
case "OnReplicas":
return modeOnReplicas, nil
}
return modeUnspecified, fmt.Errorf("unknown aof mode: %s", mode)
}

const (
// manager's lock
pathManagerLock = "manager"
Expand Down
4 changes: 4 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type RedisConfig struct {
AllowDataLoss bool `yaml:"allow_data_loss"`
TurnBeforeSwitchover bool `yaml:"turn_before_switchover"`
RestartCommand string `yaml:"restart_command"`
AofPath string `yaml:"aof_path"`
}

// RedisRenamesConfig contains redis command renames
Expand Down Expand Up @@ -72,6 +73,7 @@ type Config struct {
LogLevel string `yaml:"loglevel"`
Hostname string `yaml:"hostname"`
Mode string `yaml:"mode"`
AofMode string `yaml:"aof_mode"`
InfoFile string `yaml:"info_file"`
MaintenanceFile string `yaml:"maintenance_file"`
DaemonLockFile string `yaml:"daemon_lock_file"`
Expand Down Expand Up @@ -114,6 +116,7 @@ func DefaultRedisConfig() RedisConfig {
AllowDataLoss: false,
TurnBeforeSwitchover: false,
RestartCommand: "systemctl restart redis-server",
AofPath: "",
}
}

Expand Down Expand Up @@ -177,6 +180,7 @@ func DefaultConfig() (Config, error) {
return Config{}, err
}
config := Config{
AofMode: "Unspecified",
LogLevel: "Info",
Hostname: hostname,
Mode: "Sentinel",
Expand Down
31 changes: 31 additions & 0 deletions internal/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,37 @@ func (n *Node) EmptyQuorumReplicas(ctx context.Context) error {
return nil
}

// GetAppendonly returns a setting of appendonly config
func (n *Node) GetAppendonly(ctx context.Context) (bool, error) {
cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "appendonly")
err := n.conn.Process(ctx, cmd)
if err != nil {
return false, err
}
vals, err := cmd.Result()
if err != nil {
return false, err
}
if len(vals) != 2 {
return false, fmt.Errorf("unexpected config get result for repl-paused: %v", vals)
}
return vals[1] == "yes", nil
}

// SetOffline disallows non-localhost connections and drops all existing clients (except rdsync ones)
func (n *Node) SetAppendonly(ctx context.Context, value bool) error {
strValue := "yes"
if !value {
strValue = "no"
}
setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "appendonly", strValue)
err := setCmd.Err()
if err != nil {
return err
}
return n.configRewrite(ctx)
}

// IsReadOnly returns ReadOnly status for node
func (n *Node) IsReadOnly(ctx context.Context) (bool, error) {
minReplicas, err := n.GetMinReplicas(ctx)
Expand Down
3 changes: 3 additions & 0 deletions tests/features/00_cluster_smoke.feature
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Feature: Cluster mode smoke tests
"""
["redis1","redis2","redis3"]
"""
And path "/var/lib/redis/appendonlydir" does not exist on "redis1"
And path "/var/lib/redis/appendonlydir" exists on "redis2"
And path "/var/lib/redis/appendonlydir" exists on "redis3"

Scenario: Cluster mode duplicate ip resolve does not break rdsync
Given clustered shard is up and running
Expand Down
3 changes: 3 additions & 0 deletions tests/features/00_sentinel_smoke.feature
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ Feature: Sentinel mode smoke tests
And senticache host "redis1" should have master "redis1" within "30" seconds
And senticache host "redis2" should have master "redis1" within "30" seconds
And senticache host "redis3" should have master "redis1" within "30" seconds
And path "/var/lib/redis/appendonlydir" does not exist on "redis1"
And path "/var/lib/redis/appendonlydir" exists on "redis2"
And path "/var/lib/redis/appendonlydir" exists on "redis3"

Scenario: Sentinel mode duplicate ip resolve does not break rdsync
Given sentinel shard is up and running
Expand Down
3 changes: 3 additions & 0 deletions tests/features/02_cluster_switchover_from.feature
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ Feature: Cluster mode switchover from old master
When I get zookeeper node "/test/master"
And I save zookeeper query result as "new_master"
Then redis host "{{.new_master}}" should be master
When I wait for "30" seconds
Then path "/var/lib/redis/appendonlydir" exists on "redis1"
Then path "/var/lib/redis/appendonlydir" does not exist on "{{.new_master}}"

Scenario: Cluster mode switchover (from) with unhealthy replicas is rejected
Given clustered shard is up and running
Expand Down
3 changes: 3 additions & 0 deletions tests/features/02_sentinel_switchover_from.feature
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ Feature: Sentinel mode switchover from old master
And senticache host "redis1" should have master "{{.new_master}}" within "30" seconds
And senticache host "redis2" should have master "{{.new_master}}" within "30" seconds
And senticache host "redis3" should have master "{{.new_master}}" within "30" seconds
When I wait for "30" seconds
Then path "/var/lib/redis/appendonlydir" exists on "redis1"
Then path "/var/lib/redis/appendonlydir" does not exist on "{{.new_master}}"

Scenario: Sentinel mode switchover with unhealthy replicas is rejected
Given sentinel shard is up and running
Expand Down
2 changes: 2 additions & 0 deletions tests/images/redis/rdsync_cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mode: Cluster
aof_mode: OnReplicas
loglevel: Debug
pprof_addr: ":8081"
info_file: /var/run/rdsync.info
Expand All @@ -7,6 +8,7 @@ daemon_lock_file: /var/run/rdsync.lock
redis:
auth_password: functestpassword
restart_command: supervisorctl restart redis
aof_path: /var/lib/redis/appendonlydir
zookeeper:
session_timeout: 3s
namespace: /test
Expand Down
2 changes: 2 additions & 0 deletions tests/images/redis/rdsync_sentinel.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mode: Sentinel
aof_mode: OnReplicas
loglevel: Debug
pprof_addr: ":8081"
info_file: /var/run/rdsync.info
Expand All @@ -7,6 +8,7 @@ daemon_lock_file: /var/run/rdsync.lock
redis:
auth_password: functestpassword
restart_command: supervisorctl restart redis
aof_path: /var/lib/redis/appendonlydir
sentinel_mode:
announce_hostname: true
cluster_name: functest
Expand Down
38 changes: 37 additions & 1 deletion tests/rdsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,15 @@ func (tctx *testContext) stepSentinelShardIsUpAndRunning() error {
func (tctx *testContext) stepPersistenceDisabled() error {
for _, service := range tctx.composer.Services() {
if strings.HasPrefix(service, redisName) {
_, err := tctx.runRedisCmd(service, []string{"CONFIG", "SET", "appendonly", "no"})
_, _, err := tctx.composer.RunCommand(service, "sed -i /OnReplicas/d /etc/rdsync.yaml", 10*time.Second)
if err != nil {
return err
}
_, _, err = tctx.composer.RunCommand(service, "supervisorctl restart rdsync", 10*time.Second)
if err != nil {
return err
}
_, err = tctx.runRedisCmd(service, []string{"CONFIG", "SET", "appendonly", "no"})
if err != nil {
return err
}
Expand Down Expand Up @@ -598,6 +606,30 @@ func (tctx *testContext) stepHostShouldHaveFileWithin(node string, path string,
return err
}

func (tctx *testContext) stepPathExists(path, host string) error {
cmd := fmt.Sprintf("stat %s", path)
retCode, _, err := tctx.composer.RunCommand(host, cmd, 10*time.Second)
if err != nil {
return fmt.Errorf("failed to check path %s on %s: %s", path, host, err)
}
if retCode != 0 {
return fmt.Errorf("expected %s to exist on %s but it's not", path, host)
}
return nil
}

func (tctx *testContext) stepPathDoesNotExist(path, host string) error {
cmd := fmt.Sprintf("stat %s", path)
retCode, _, err := tctx.composer.RunCommand(host, cmd, 10*time.Second)
if err != nil {
return fmt.Errorf("failed to check path %s on %s: %s", path, host, err)
}
if retCode == 0 {
return fmt.Errorf("expected %s to be absent on %s but it exists", path, host)
}
return nil
}

func (tctx *testContext) stepIRunCommandOnHost(host string, body *godog.DocString) error {
cmd := strings.TrimSpace(body.Content)
var err error
Expand Down Expand Up @@ -1027,6 +1059,10 @@ func InitializeScenario(s *godog.ScenarioContext) {
s.Step(`^host "([^"]*)" should have file "([^"]*)"$`, tctx.stepHostShouldHaveFile)
s.Step(`^host "([^"]*)" should have file "([^"]*)" within "(\d+)" seconds$`, tctx.stepHostShouldHaveFileWithin)

// path checking
s.Step(`^path "([^"]*)" does not exist on "([^"]*)"$`, tctx.stepPathDoesNotExist)
s.Step(`^path "([^"]*)" exists on "([^"]*)"$`, tctx.stepPathExists)

// command execution
s.Step(`^I run command on host "([^"]*)"$`, tctx.stepIRunCommandOnHost)
s.Step(`^I run command on host "([^"]*)" with timeout "(\d+)" seconds$`, tctx.stepIRunCommandOnHostWithTimeout)
Expand Down

0 comments on commit 8068cab

Please sign in to comment.