From d87f305dbc6500971a7635c3632f104c45a1acfa Mon Sep 17 00:00:00 2001 From: secwall Date: Sun, 1 Dec 2024 03:37:17 +0100 Subject: [PATCH] Introduce quorum-replicas-to-write setting --- internal/app/failover.go | 2 +- internal/app/maintenance.go | 2 +- internal/app/master.go | 8 +--- internal/app/repair.go | 23 ++++++--- internal/app/switchover.go | 11 ++++- internal/config/config.go | 2 - internal/redis/node.go | 48 ++++++++++++++----- .../0004_Add_waitquorum_command.patch | 46 ++++++++++++------ tests/features/01_cluster_maintenance.feature | 10 ++-- .../features/01_sentinel_maintenance.feature | 10 ++-- 10 files changed, 106 insertions(+), 56 deletions(-) diff --git a/internal/app/failover.go b/internal/app/failover.go index cbce5d4..9447cae 100644 --- a/internal/app/failover.go +++ b/internal/app/failover.go @@ -19,7 +19,7 @@ func countRunningHAReplicas(shardState map[string]*HostState) int { } func (app *App) getFailoverQuorum(activeNodes []string) int { - fq := len(activeNodes) - app.getMinReplicasToWrite(activeNodes) + fq := len(activeNodes) - app.getNumReplicasToWrite(activeNodes) if fq < 1 || app.config.Redis.AllowDataLoss { fq = 1 } diff --git a/internal/app/maintenance.go b/internal/app/maintenance.go index 7443e6f..033d752 100644 --- a/internal/app/maintenance.go +++ b/internal/app/maintenance.go @@ -9,7 +9,7 @@ import ( func (app *App) enterMaintenance(maintenance *Maintenance, master string) error { node := app.shard.Get(master) - err, rewriteErr := node.SetMinReplicas(app.ctx, 0) + err, rewriteErr := node.SetNumQuorumReplicas(app.ctx, 0) if err != nil { return err } diff --git a/internal/app/master.go b/internal/app/master.go index cae31b2..22be307 100644 --- a/internal/app/master.go +++ b/internal/app/master.go @@ -7,12 +7,8 @@ import ( "github.com/yandex/rdsync/internal/dcs" ) -func (app *App) getMinReplicasToWrite(activeNodes []string) int { - minReplicas := len(activeNodes) / 2 - if app.config.Redis.MaxReplicasToWrite >= 0 && minReplicas > app.config.Redis.MaxReplicasToWrite { - minReplicas = app.config.Redis.MaxReplicasToWrite - } - return minReplicas +func (app *App) getNumReplicasToWrite(activeNodes []string) int { + return len(activeNodes) / 2 } func (app *App) getCurrentMaster(shardState map[string]*HostState) (string, error) { diff --git a/internal/app/repair.go b/internal/app/repair.go index a53a255..ac30af0 100644 --- a/internal/app/repair.go +++ b/internal/app/repair.go @@ -52,17 +52,26 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri } func (app *App) repairMaster(node *redis.Node, activeNodes []string, state *HostState) { - expectedMinReplicas := app.getMinReplicasToWrite(activeNodes) - actualMinReplicas, err := node.GetMinReplicas(app.ctx) + if state.IsReadOnly { + err, rewriteErr := node.SetReadWrite(app.ctx) + if err != nil { + app.logger.Error("Unable to set master read-write", "fqdn", node.FQDN(), "error", err) + } + if rewriteErr != nil { + app.logger.Error("Unable to rewrite config on master", "fqdn", node.FQDN(), "error", rewriteErr) + } + } + expectedNumReplicas := app.getNumReplicasToWrite(activeNodes) + actualNumReplicas, err := node.GetNumQuorumReplicas(app.ctx) if err != nil { - app.logger.Error("Unable to get actual min replicas on master", "fqdn", node.FQDN(), "error", err) + app.logger.Error("Unable to get actual num quorum replicas on master", "fqdn", node.FQDN(), "error", err) return } - if actualMinReplicas != expectedMinReplicas { - app.logger.Info(fmt.Sprintf("Changing min replicas from %d to %d on master", actualMinReplicas, expectedMinReplicas), "fqdn", node.FQDN()) - err, rewriteErr := node.SetMinReplicas(app.ctx, expectedMinReplicas) + if actualNumReplicas != expectedNumReplicas { + app.logger.Info(fmt.Sprintf("Changing num quorum replicas from %d to %d on master", actualNumReplicas, expectedNumReplicas), "fqdn", node.FQDN()) + err, rewriteErr := node.SetNumQuorumReplicas(app.ctx, expectedNumReplicas) if err != nil { - app.logger.Error("Unable to set min replicas on master", "fqdn", node.FQDN(), "error", err) + app.logger.Error("Unable to set num quorum replicas on master", "fqdn", node.FQDN(), "error", err) } if rewriteErr != nil { app.logger.Error("Unable to rewrite config on master", "fqdn", node.FQDN(), "error", rewriteErr) diff --git a/internal/app/switchover.go b/internal/app/switchover.go index bfe1f92..24f126a 100644 --- a/internal/app/switchover.go +++ b/internal/app/switchover.go @@ -355,15 +355,22 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes app.waitPoisonPill(app.config.Redis.WaitPoisonPillTimeout) } - if len(aliveActiveNodes) == 1 || app.config.Redis.AllowDataLoss || app.config.Redis.MaxReplicasToWrite == 0 { + if len(aliveActiveNodes) == 1 || app.config.Redis.AllowDataLoss { node := app.shard.Get(newMaster) - err, errConf := node.SetMinReplicas(app.ctx, 0) + err, errConf := node.SetReadWrite(app.ctx) if err != nil { return fmt.Errorf("unable to set %s available for write before promote: %s", newMaster, err.Error()) } if errConf != nil { return fmt.Errorf("unable to rewrite config on %s before promote: %s", newMaster, errConf.Error()) } + err, errConf = node.SetNumQuorumReplicas(app.ctx, 0) + if err != nil { + return fmt.Errorf("unable to set num quorum replicas to 0 on %s: %s", newMaster, err.Error()) + } + if errConf != nil { + return fmt.Errorf("unable to rewrite config on %s before promote: %s", newMaster, errConf.Error()) + } } if app.config.Redis.TurnBeforeSwitchover { diff --git a/internal/config/config.go b/internal/config/config.go index f615713..a79e9e4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -34,7 +34,6 @@ type RedisConfig struct { WaitPromoteForceTimeout time.Duration `yaml:"wait_promote_force_timeout"` WaitPoisonPillTimeout time.Duration `yaml:"wait_poison_pill_timeout"` MaxParallelSyncs int `yaml:"max_parallel_syncs"` - MaxReplicasToWrite int `yaml:"max_replicas_to_write"` AllowDataLoss bool `yaml:"allow_data_loss"` TurnBeforeSwitchover bool `yaml:"turn_before_switchover"` RestartCommand string `yaml:"restart_command"` @@ -112,7 +111,6 @@ func DefaultRedisConfig() RedisConfig { WaitPromoteForceTimeout: 10 * time.Second, WaitPoisonPillTimeout: 30 * time.Second, MaxParallelSyncs: 1, - MaxReplicasToWrite: -1, AllowDataLoss: false, TurnBeforeSwitchover: false, RestartCommand: "systemctl restart redis-server", diff --git a/internal/redis/node.go b/internal/redis/node.go index 32bbb94..911e500 100644 --- a/internal/redis/node.go +++ b/internal/redis/node.go @@ -257,9 +257,9 @@ func (n *Node) DisconnectClients(ctx context.Context, ctype string) error { return disconnectCmd.Err() } -// GetMinReplicas returns number of connected replicas to accept writes on node -func (n *Node) GetMinReplicas(ctx context.Context) (int, error) { - cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "min-replicas-to-write") +// GetNumQuorumReplicas returns number of connected replicas to accept writes on node +func (n *Node) GetNumQuorumReplicas(ctx context.Context) (int, error) { + cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "quorum-replicas-to-write") err := n.conn.Process(ctx, cmd) if err != nil { return 0, err @@ -269,18 +269,18 @@ func (n *Node) GetMinReplicas(ctx context.Context) (int, error) { return 0, err } if len(vals) != 2 { - return 0, fmt.Errorf("unexpected config get result for min-replicas-to-write: %v", vals) + return 0, fmt.Errorf("unexpected config get result for quorum-replicas-to-write: %v", vals) } ret, err := strconv.ParseInt(vals[1], 10, 32) if err != nil { - return 0, fmt.Errorf("unable to parse min-replicas-to-write value: %s", err.Error()) + return 0, fmt.Errorf("unable to parse quorum-replicas-to-write value: %s", err.Error()) } return int(ret), nil } -// SetMinReplicas sets desired number of connected replicas to accept writes on node -func (n *Node) SetMinReplicas(ctx context.Context, value int) (error, error) { - setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "min-replicas-to-write", strconv.Itoa(value)) +// SetNumQuorumReplicas sets desired number of connected replicas to accept writes on node +func (n *Node) SetNumQuorumReplicas(ctx context.Context, value int) (error, error) { + setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "quorum-replicas-to-write", strconv.Itoa(value)) err := setCmd.Err() if err != nil { return err, nil @@ -368,19 +368,33 @@ func (n *Node) SetAppendonly(ctx context.Context, value bool) error { // IsReadOnly returns ReadOnly status for node func (n *Node) IsReadOnly(ctx context.Context) (bool, error) { - minReplicas, err := n.GetMinReplicas(ctx) + cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "min-replicas-to-write") + err := n.conn.Process(ctx, cmd) + if err != nil { + return false, err + } + vals, err := cmd.Result() if err != nil { return false, err } - return minReplicas == highMinReplicas, nil + if len(vals) != 2 { + return false, fmt.Errorf("unexpected config get result for min-replicas-to-write: %v", vals) + } + ret, err := strconv.ParseInt(vals[1], 10, 32) + if err != nil { + return false, fmt.Errorf("unable to parse min-replicas-to-write value: %s", err.Error()) + } + return int(ret) > 0, nil } // SetReadOnly makes node read-only by setting min replicas to unreasonably high value and disconnecting clients func (n *Node) SetReadOnly(ctx context.Context, disconnect bool) (error, error) { - err, rewriteErr := n.SetMinReplicas(ctx, highMinReplicas) + setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "min-replicas-to-write", strconv.Itoa(highMinReplicas)) + err := setCmd.Err() if err != nil { - return err, rewriteErr + return err, nil } + rewriteErr := n.configRewrite(ctx) if disconnect { err = n.DisconnectClients(ctx, "normal") if err != nil { @@ -394,6 +408,16 @@ func (n *Node) SetReadOnly(ctx context.Context, disconnect bool) (error, error) return nil, rewriteErr } +// SetReadOnly makes node returns min-replicas-to-write to zero +func (n *Node) SetReadWrite(ctx context.Context) (error, error) { + setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "min-replicas-to-write", "0") + err := setCmd.Err() + if err != nil { + return err, nil + } + return nil, n.configRewrite(ctx) +} + // SetOnline allows non-localhost connections func (n *Node) SetOnline(ctx context.Context) error { if !n.IsLocal() { diff --git a/redis_patches/0004_Add_waitquorum_command.patch b/redis_patches/0004_Add_waitquorum_command.patch index d539e12..583f4e7 100644 --- a/redis_patches/0004_Add_waitquorum_command.patch +++ b/redis_patches/0004_Add_waitquorum_command.patch @@ -68,7 +68,7 @@ index 000000000..dca11ab67 + } +} diff --git a/src/config.c b/src/config.c -index a7231868c..60e197c36 100644 +index a7231868c..6939dc893 100644 --- a/src/config.c +++ b/src/config.c @@ -3093,6 +3093,78 @@ static void rewriteConfigOfflineMode(standardConfig *config, const char *name, s @@ -150,7 +150,15 @@ index a7231868c..60e197c36 100644 standardConfig static_configs[] = { /* Bool configs */ createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL), -@@ -3309,6 +3381,7 @@ standardConfig static_configs[] = { +@@ -3230,6 +3302,7 @@ standardConfig static_configs[] = { + createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod), + createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL), + createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL), ++ createIntConfig("quorum-replicas-to-write", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.quorum_replicas_to_write, 0, INTEGER_CONFIG, NULL, NULL), + + /* Unsigned int configs */ + createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), +@@ -3309,6 +3382,7 @@ standardConfig static_configs[] = { createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL), createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL), createSpecialConfig("offline", NULL, MODIFIABLE_CONFIG, setOfflineMode, getOfflineMode, rewriteConfigOfflineMode, applyBind), @@ -171,7 +179,7 @@ index 7696e8c28..82c6f2b97 100644 c->reploff = 0; c->read_reploff = 0; diff --git a/src/replication.c b/src/replication.c -index 47172dba3..6bfb21ef4 100644 +index 47172dba3..04f0c29bb 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1285,7 +1285,8 @@ void replconfCommand(client *c) { @@ -220,7 +228,7 @@ index 47172dba3..6bfb21ef4 100644 replicationRequestAckFromSlaves(); } -+/* WAITQUORUM for min-replicas-to-write quorum replicas to acknowledge the processing of our latest ++/* WAITQUORUM for quorum-replicas-to-write quorum replicas to acknowledge the processing of our latest + * write command (and all the previous commands). */ +void waitquorumCommand(client *c) { + long ackreplicas; @@ -233,14 +241,14 @@ index 47172dba3..6bfb21ef4 100644 + + /* First try without blocking at all. */ + ackreplicas = replicationCountQuorumAcksByOffset(c->woff); -+ if (ackreplicas >= server.repl_min_slaves_to_write || c->flags & CLIENT_DENY_BLOCKING) { ++ if (ackreplicas >= server.quorum_replicas_to_write || c->flags & CLIENT_DENY_BLOCKING) { + addReplyLongLong(c,ackreplicas); + return; + } + + /* Otherwise block the client and put it into our list of clients + * waiting for ack from slaves. */ -+ blockForReplication(c,0,offset,server.repl_min_slaves_to_write); ++ blockForReplication(c,0,offset,server.quorum_replicas_to_write); + c->bstate.quorum = 1; + + /* Make sure that the server will send an ACK request to all the slaves @@ -360,7 +368,7 @@ index 2aa63df77..4b1f4242a 100644 } } diff --git a/src/server.h b/src/server.h -index 05bdf5c0c..58dbd02b4 100644 +index 320d5eeba..18c85d3dc 100644 --- a/src/server.h +++ b/src/server.h @@ -1030,6 +1030,7 @@ typedef struct blockingState { @@ -379,15 +387,20 @@ index 05bdf5c0c..58dbd02b4 100644 int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */ int repldbfd; /* Replication DB file descriptor. */ off_t repldboff; /* Replication DB file offset. */ -@@ -1599,6 +1601,7 @@ struct redisServer { +@@ -1597,8 +1599,10 @@ struct redisServer { + list *clients_pending_write; /* There is to write or install handler. */ + list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ - client *current_client; /* The client that triggered the command execution (External or AOF). */ - client *executing_client; /* The client executing the current command (possibly script or module). */ -+ dict *quorum_replicas; /* Replicas that should participate in quorum commit */ +- client *current_client; /* The client that triggered the command execution (External or AOF). */ +- client *executing_client; /* The client executing the current command (possibly script or module). */ ++ client *current_client; /* The client that triggered the command execution (External or AOF). */ ++ client *executing_client; /* The client executing the current command (possibly script or module). */ ++ dict *quorum_replicas; /* Replicas that should participate in quorum commit */ ++ int quorum_replicas_to_write; /* Num replicas to accept write before returning from WAITQUORUM command */ #ifdef LOG_REQ_RES char *req_res_logfile; /* Path of log file for logging all requests and their replies. If NULL, no logging will be performed */ -@@ -2804,11 +2807,13 @@ void resizeReplicationBacklog(void); +@@ -2804,11 +2808,13 @@ void resizeReplicationBacklog(void); void replicationSetMaster(char *ip, int port); void replicationUnsetMaster(void); void refreshGoodSlavesCount(void); @@ -401,7 +414,7 @@ index 05bdf5c0c..58dbd02b4 100644 void replicationSendNewlineToMaster(void); long long replicationGetSlaveOffset(void); char *replicationGetSlaveName(client *c); -@@ -3654,6 +3659,7 @@ void bitposCommand(client *c); +@@ -3654,6 +3660,7 @@ void bitposCommand(client *c); void replconfCommand(client *c); void waitCommand(client *c); void waitaofCommand(client *c); @@ -410,7 +423,7 @@ index 05bdf5c0c..58dbd02b4 100644 void georadiusbymemberroCommand(client *c); void georadiusCommand(client *c); diff --git a/tests/unit/yandex-cloud-patches.tcl b/tests/unit/yandex-cloud-patches.tcl -index b8c3ba453..0ac84b270 100644 +index b8c3ba453..43ea52352 100644 --- a/tests/unit/yandex-cloud-patches.tcl +++ b/tests/unit/yandex-cloud-patches.tcl @@ -21,3 +21,67 @@ start_server {config "minimal.conf" tags {"external:skip"}} { @@ -438,7 +451,7 @@ index b8c3ba453..0ac84b270 100644 + } + + wait_replica_online $master -+ $master config set min-replicas-to-write 1 ++ $master config set quorum-replicas-to-write 1 + $master config set quorum-replicas $slave_host:$slave_port + + test {WAITQUORUM should acknowledge 1 additional copy of the data} { @@ -481,3 +494,6 @@ index b8c3ba453..0ac84b270 100644 + } +} +} +-- +2.47.1 + diff --git a/tests/features/01_cluster_maintenance.feature b/tests/features/01_cluster_maintenance.feature index c2e7f43..09bf138 100644 --- a/tests/features/01_cluster_maintenance.feature +++ b/tests/features/01_cluster_maintenance.feature @@ -50,7 +50,7 @@ Feature: Cluster mode maintenance tests ["redis1","redis2","redis3"] """ - Scenario: Cluster mode maintenance enter sets min-replicas-to-write to 0 on master + Scenario: Cluster mode maintenance enter sets quorum-replicas-to-write to 0 on master Given clustered shard is up and running Then zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds """ @@ -59,11 +59,11 @@ Feature: Cluster mode maintenance tests When I wait for "60" seconds And I run command on redis host "redis1" """ - CONFIG GET min-replicas-to-write + CONFIG GET quorum-replicas-to-write """ Then redis cmd result should match regexp """ - .*min-replicas-to-write 1.* + .*quorum-replicas-to-write 1.* """ When I run command on host "redis1" """ @@ -83,11 +83,11 @@ Feature: Cluster mode maintenance tests And zookeeper node "/test/active_nodes" should not exist When I run command on redis host "redis1" """ - CONFIG GET min-replicas-to-write + CONFIG GET quorum-replicas-to-write """ Then redis cmd result should match regexp """ - .*min-replicas-to-write *0.* + .*quorum-replicas-to-write *0.* """ Scenario: Cluster mode maintenance leave updates master host in DCS after manual change diff --git a/tests/features/01_sentinel_maintenance.feature b/tests/features/01_sentinel_maintenance.feature index 732385a..e5e359b 100644 --- a/tests/features/01_sentinel_maintenance.feature +++ b/tests/features/01_sentinel_maintenance.feature @@ -56,7 +56,7 @@ Feature: Sentinel mode maintenance tests ["redis1","redis2","redis3"] """ - Scenario: Sentinel mode maintenance enter sets min-replicas-to-write to 0 on master + Scenario: Sentinel mode maintenance enter sets quorum-replicas-to-write to 0 on master Given sentinel shard is up and running Then zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds """ @@ -65,11 +65,11 @@ Feature: Sentinel mode maintenance tests When I wait for "60" seconds And I run command on redis host "redis1" """ - CONFIG GET min-replicas-to-write + CONFIG GET quorum-replicas-to-write """ Then redis cmd result should match regexp """ - .*min-replicas-to-write 1.* + .*quorum-replicas-to-write 1.* """ When I run command on host "redis1" """ @@ -89,11 +89,11 @@ Feature: Sentinel mode maintenance tests And zookeeper node "/test/active_nodes" should not exist When I run command on redis host "redis1" """ - CONFIG GET min-replicas-to-write + CONFIG GET quorum-replicas-to-write """ Then redis cmd result should match regexp """ - .*min-replicas-to-write *0.* + .*quorum-replicas-to-write *0.* """ Scenario: Sentinel mode maintenance leave updates master host in DCS after manual change