Skip to content

Commit

Permalink
Introduce quorum-replicas-to-write setting
Browse files Browse the repository at this point in the history
  • Loading branch information
secwall committed Dec 1, 2024
1 parent a3d1aa9 commit d87f305
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 56 deletions.
2 changes: 1 addition & 1 deletion internal/app/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func countRunningHAReplicas(shardState map[string]*HostState) int {
}

func (app *App) getFailoverQuorum(activeNodes []string) int {
fq := len(activeNodes) - app.getMinReplicasToWrite(activeNodes)
fq := len(activeNodes) - app.getNumReplicasToWrite(activeNodes)
if fq < 1 || app.config.Redis.AllowDataLoss {
fq = 1
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func (app *App) enterMaintenance(maintenance *Maintenance, master string) error {
node := app.shard.Get(master)
err, rewriteErr := node.SetMinReplicas(app.ctx, 0)
err, rewriteErr := node.SetNumQuorumReplicas(app.ctx, 0)
if err != nil {
return err
}
Expand Down
8 changes: 2 additions & 6 deletions internal/app/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ import (
"github.com/yandex/rdsync/internal/dcs"
)

func (app *App) getMinReplicasToWrite(activeNodes []string) int {
minReplicas := len(activeNodes) / 2
if app.config.Redis.MaxReplicasToWrite >= 0 && minReplicas > app.config.Redis.MaxReplicasToWrite {
minReplicas = app.config.Redis.MaxReplicasToWrite
}
return minReplicas
func (app *App) getNumReplicasToWrite(activeNodes []string) int {
return len(activeNodes) / 2
}

func (app *App) getCurrentMaster(shardState map[string]*HostState) (string, error) {
Expand Down
23 changes: 16 additions & 7 deletions internal/app/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,26 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri
}

func (app *App) repairMaster(node *redis.Node, activeNodes []string, state *HostState) {
expectedMinReplicas := app.getMinReplicasToWrite(activeNodes)
actualMinReplicas, err := node.GetMinReplicas(app.ctx)
if state.IsReadOnly {
err, rewriteErr := node.SetReadWrite(app.ctx)
if err != nil {
app.logger.Error("Unable to set master read-write", "fqdn", node.FQDN(), "error", err)
}
if rewriteErr != nil {
app.logger.Error("Unable to rewrite config on master", "fqdn", node.FQDN(), "error", rewriteErr)
}
}
expectedNumReplicas := app.getNumReplicasToWrite(activeNodes)
actualNumReplicas, err := node.GetNumQuorumReplicas(app.ctx)
if err != nil {
app.logger.Error("Unable to get actual min replicas on master", "fqdn", node.FQDN(), "error", err)
app.logger.Error("Unable to get actual num quorum replicas on master", "fqdn", node.FQDN(), "error", err)
return
}
if actualMinReplicas != expectedMinReplicas {
app.logger.Info(fmt.Sprintf("Changing min replicas from %d to %d on master", actualMinReplicas, expectedMinReplicas), "fqdn", node.FQDN())
err, rewriteErr := node.SetMinReplicas(app.ctx, expectedMinReplicas)
if actualNumReplicas != expectedNumReplicas {
app.logger.Info(fmt.Sprintf("Changing num quorum replicas from %d to %d on master", actualNumReplicas, expectedNumReplicas), "fqdn", node.FQDN())
err, rewriteErr := node.SetNumQuorumReplicas(app.ctx, expectedNumReplicas)
if err != nil {
app.logger.Error("Unable to set min replicas on master", "fqdn", node.FQDN(), "error", err)
app.logger.Error("Unable to set num quorum replicas on master", "fqdn", node.FQDN(), "error", err)
}
if rewriteErr != nil {
app.logger.Error("Unable to rewrite config on master", "fqdn", node.FQDN(), "error", rewriteErr)
Expand Down
11 changes: 9 additions & 2 deletions internal/app/switchover.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,22 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes
app.waitPoisonPill(app.config.Redis.WaitPoisonPillTimeout)
}

if len(aliveActiveNodes) == 1 || app.config.Redis.AllowDataLoss || app.config.Redis.MaxReplicasToWrite == 0 {
if len(aliveActiveNodes) == 1 || app.config.Redis.AllowDataLoss {
node := app.shard.Get(newMaster)
err, errConf := node.SetMinReplicas(app.ctx, 0)
err, errConf := node.SetReadWrite(app.ctx)
if err != nil {
return fmt.Errorf("unable to set %s available for write before promote: %s", newMaster, err.Error())
}
if errConf != nil {
return fmt.Errorf("unable to rewrite config on %s before promote: %s", newMaster, errConf.Error())
}
err, errConf = node.SetNumQuorumReplicas(app.ctx, 0)
if err != nil {
return fmt.Errorf("unable to set num quorum replicas to 0 on %s: %s", newMaster, err.Error())
}
if errConf != nil {
return fmt.Errorf("unable to rewrite config on %s before promote: %s", newMaster, errConf.Error())
}
}

if app.config.Redis.TurnBeforeSwitchover {
Expand Down
2 changes: 0 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type RedisConfig struct {
WaitPromoteForceTimeout time.Duration `yaml:"wait_promote_force_timeout"`
WaitPoisonPillTimeout time.Duration `yaml:"wait_poison_pill_timeout"`
MaxParallelSyncs int `yaml:"max_parallel_syncs"`
MaxReplicasToWrite int `yaml:"max_replicas_to_write"`
AllowDataLoss bool `yaml:"allow_data_loss"`
TurnBeforeSwitchover bool `yaml:"turn_before_switchover"`
RestartCommand string `yaml:"restart_command"`
Expand Down Expand Up @@ -112,7 +111,6 @@ func DefaultRedisConfig() RedisConfig {
WaitPromoteForceTimeout: 10 * time.Second,
WaitPoisonPillTimeout: 30 * time.Second,
MaxParallelSyncs: 1,
MaxReplicasToWrite: -1,
AllowDataLoss: false,
TurnBeforeSwitchover: false,
RestartCommand: "systemctl restart redis-server",
Expand Down
48 changes: 36 additions & 12 deletions internal/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ func (n *Node) DisconnectClients(ctx context.Context, ctype string) error {
return disconnectCmd.Err()
}

// GetMinReplicas returns number of connected replicas to accept writes on node
func (n *Node) GetMinReplicas(ctx context.Context) (int, error) {
cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "min-replicas-to-write")
// GetNumQuorumReplicas returns number of connected replicas to accept writes on node
func (n *Node) GetNumQuorumReplicas(ctx context.Context) (int, error) {
cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "quorum-replicas-to-write")
err := n.conn.Process(ctx, cmd)
if err != nil {
return 0, err
Expand All @@ -269,18 +269,18 @@ func (n *Node) GetMinReplicas(ctx context.Context) (int, error) {
return 0, err
}
if len(vals) != 2 {
return 0, fmt.Errorf("unexpected config get result for min-replicas-to-write: %v", vals)
return 0, fmt.Errorf("unexpected config get result for quorum-replicas-to-write: %v", vals)
}
ret, err := strconv.ParseInt(vals[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("unable to parse min-replicas-to-write value: %s", err.Error())
return 0, fmt.Errorf("unable to parse quorum-replicas-to-write value: %s", err.Error())
}
return int(ret), nil
}

// SetMinReplicas sets desired number of connected replicas to accept writes on node
func (n *Node) SetMinReplicas(ctx context.Context, value int) (error, error) {
setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "min-replicas-to-write", strconv.Itoa(value))
// SetNumQuorumReplicas sets desired number of connected replicas to accept writes on node
func (n *Node) SetNumQuorumReplicas(ctx context.Context, value int) (error, error) {
setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "quorum-replicas-to-write", strconv.Itoa(value))
err := setCmd.Err()
if err != nil {
return err, nil
Expand Down Expand Up @@ -368,19 +368,33 @@ func (n *Node) SetAppendonly(ctx context.Context, value bool) error {

// IsReadOnly returns ReadOnly status for node
func (n *Node) IsReadOnly(ctx context.Context) (bool, error) {
minReplicas, err := n.GetMinReplicas(ctx)
cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "min-replicas-to-write")
err := n.conn.Process(ctx, cmd)
if err != nil {
return false, err
}
vals, err := cmd.Result()
if err != nil {
return false, err
}
return minReplicas == highMinReplicas, nil
if len(vals) != 2 {
return false, fmt.Errorf("unexpected config get result for min-replicas-to-write: %v", vals)
}
ret, err := strconv.ParseInt(vals[1], 10, 32)
if err != nil {
return false, fmt.Errorf("unable to parse min-replicas-to-write value: %s", err.Error())
}
return int(ret) > 0, nil
}

// SetReadOnly makes node read-only by setting min replicas to unreasonably high value and disconnecting clients
func (n *Node) SetReadOnly(ctx context.Context, disconnect bool) (error, error) {
err, rewriteErr := n.SetMinReplicas(ctx, highMinReplicas)
setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "min-replicas-to-write", strconv.Itoa(highMinReplicas))
err := setCmd.Err()
if err != nil {
return err, rewriteErr
return err, nil
}
rewriteErr := n.configRewrite(ctx)
if disconnect {
err = n.DisconnectClients(ctx, "normal")
if err != nil {
Expand All @@ -394,6 +408,16 @@ func (n *Node) SetReadOnly(ctx context.Context, disconnect bool) (error, error)
return nil, rewriteErr
}

// SetReadOnly makes node returns min-replicas-to-write to zero
func (n *Node) SetReadWrite(ctx context.Context) (error, error) {
setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "min-replicas-to-write", "0")
err := setCmd.Err()
if err != nil {
return err, nil
}
return nil, n.configRewrite(ctx)
}

// SetOnline allows non-localhost connections
func (n *Node) SetOnline(ctx context.Context) error {
if !n.IsLocal() {
Expand Down
46 changes: 31 additions & 15 deletions redis_patches/0004_Add_waitquorum_command.patch
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ index 000000000..dca11ab67
+ }
+}
diff --git a/src/config.c b/src/config.c
index a7231868c..60e197c36 100644
index a7231868c..6939dc893 100644
--- a/src/config.c
+++ b/src/config.c
@@ -3093,6 +3093,78 @@ static void rewriteConfigOfflineMode(standardConfig *config, const char *name, s
Expand Down Expand Up @@ -150,7 +150,15 @@ index a7231868c..60e197c36 100644
standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
@@ -3309,6 +3381,7 @@ standardConfig static_configs[] = {
@@ -3230,6 +3302,7 @@ standardConfig static_configs[] = {
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
+ createIntConfig("quorum-replicas-to-write", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.quorum_replicas_to_write, 0, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
@@ -3309,6 +3382,7 @@ standardConfig static_configs[] = {
createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL),
createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL),
createSpecialConfig("offline", NULL, MODIFIABLE_CONFIG, setOfflineMode, getOfflineMode, rewriteConfigOfflineMode, applyBind),
Expand All @@ -171,7 +179,7 @@ index 7696e8c28..82c6f2b97 100644
c->reploff = 0;
c->read_reploff = 0;
diff --git a/src/replication.c b/src/replication.c
index 47172dba3..6bfb21ef4 100644
index 47172dba3..04f0c29bb 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1285,7 +1285,8 @@ void replconfCommand(client *c) {
Expand Down Expand Up @@ -220,7 +228,7 @@ index 47172dba3..6bfb21ef4 100644
replicationRequestAckFromSlaves();
}

+/* WAITQUORUM for min-replicas-to-write quorum replicas to acknowledge the processing of our latest
+/* WAITQUORUM for quorum-replicas-to-write quorum replicas to acknowledge the processing of our latest
+ * write command (and all the previous commands). */
+void waitquorumCommand(client *c) {
+ long ackreplicas;
Expand All @@ -233,14 +241,14 @@ index 47172dba3..6bfb21ef4 100644
+
+ /* First try without blocking at all. */
+ ackreplicas = replicationCountQuorumAcksByOffset(c->woff);
+ if (ackreplicas >= server.repl_min_slaves_to_write || c->flags & CLIENT_DENY_BLOCKING) {
+ if (ackreplicas >= server.quorum_replicas_to_write || c->flags & CLIENT_DENY_BLOCKING) {
+ addReplyLongLong(c,ackreplicas);
+ return;
+ }
+
+ /* Otherwise block the client and put it into our list of clients
+ * waiting for ack from slaves. */
+ blockForReplication(c,0,offset,server.repl_min_slaves_to_write);
+ blockForReplication(c,0,offset,server.quorum_replicas_to_write);
+ c->bstate.quorum = 1;
+
+ /* Make sure that the server will send an ACK request to all the slaves
Expand Down Expand Up @@ -360,7 +368,7 @@ index 2aa63df77..4b1f4242a 100644
}
}
diff --git a/src/server.h b/src/server.h
index 05bdf5c0c..58dbd02b4 100644
index 320d5eeba..18c85d3dc 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1030,6 +1030,7 @@ typedef struct blockingState {
Expand All @@ -379,15 +387,20 @@ index 05bdf5c0c..58dbd02b4 100644
int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
@@ -1599,6 +1601,7 @@ struct redisServer {
@@ -1597,8 +1599,10 @@ struct redisServer {
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* The client that triggered the command execution (External or AOF). */
client *executing_client; /* The client executing the current command (possibly script or module). */
+ dict *quorum_replicas; /* Replicas that should participate in quorum commit */
- client *current_client; /* The client that triggered the command execution (External or AOF). */
- client *executing_client; /* The client executing the current command (possibly script or module). */
+ client *current_client; /* The client that triggered the command execution (External or AOF). */
+ client *executing_client; /* The client executing the current command (possibly script or module). */
+ dict *quorum_replicas; /* Replicas that should participate in quorum commit */
+ int quorum_replicas_to_write; /* Num replicas to accept write before returning from WAITQUORUM command */

#ifdef LOG_REQ_RES
char *req_res_logfile; /* Path of log file for logging all requests and their replies. If NULL, no logging will be performed */
@@ -2804,11 +2807,13 @@ void resizeReplicationBacklog(void);
@@ -2804,11 +2808,13 @@ void resizeReplicationBacklog(void);
void replicationSetMaster(char *ip, int port);
void replicationUnsetMaster(void);
void refreshGoodSlavesCount(void);
Expand All @@ -401,7 +414,7 @@ index 05bdf5c0c..58dbd02b4 100644
void replicationSendNewlineToMaster(void);
long long replicationGetSlaveOffset(void);
char *replicationGetSlaveName(client *c);
@@ -3654,6 +3659,7 @@ void bitposCommand(client *c);
@@ -3654,6 +3660,7 @@ void bitposCommand(client *c);
void replconfCommand(client *c);
void waitCommand(client *c);
void waitaofCommand(client *c);
Expand All @@ -410,7 +423,7 @@ index 05bdf5c0c..58dbd02b4 100644
void georadiusbymemberroCommand(client *c);
void georadiusCommand(client *c);
diff --git a/tests/unit/yandex-cloud-patches.tcl b/tests/unit/yandex-cloud-patches.tcl
index b8c3ba453..0ac84b270 100644
index b8c3ba453..43ea52352 100644
--- a/tests/unit/yandex-cloud-patches.tcl
+++ b/tests/unit/yandex-cloud-patches.tcl
@@ -21,3 +21,67 @@ start_server {config "minimal.conf" tags {"external:skip"}} {
Expand Down Expand Up @@ -438,7 +451,7 @@ index b8c3ba453..0ac84b270 100644
+ }
+
+ wait_replica_online $master
+ $master config set min-replicas-to-write 1
+ $master config set quorum-replicas-to-write 1
+ $master config set quorum-replicas $slave_host:$slave_port
+
+ test {WAITQUORUM should acknowledge 1 additional copy of the data} {
Expand Down Expand Up @@ -481,3 +494,6 @@ index b8c3ba453..0ac84b270 100644
+ }
+}
+}
--
2.47.1

10 changes: 5 additions & 5 deletions tests/features/01_cluster_maintenance.feature
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Feature: Cluster mode maintenance tests
["redis1","redis2","redis3"]
"""

Scenario: Cluster mode maintenance enter sets min-replicas-to-write to 0 on master
Scenario: Cluster mode maintenance enter sets quorum-replicas-to-write to 0 on master
Given clustered shard is up and running
Then zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
Expand All @@ -59,11 +59,11 @@ Feature: Cluster mode maintenance tests
When I wait for "60" seconds
And I run command on redis host "redis1"
"""
CONFIG GET min-replicas-to-write
CONFIG GET quorum-replicas-to-write
"""
Then redis cmd result should match regexp
"""
.*min-replicas-to-write 1.*
.*quorum-replicas-to-write 1.*
"""
When I run command on host "redis1"
"""
Expand All @@ -83,11 +83,11 @@ Feature: Cluster mode maintenance tests
And zookeeper node "/test/active_nodes" should not exist
When I run command on redis host "redis1"
"""
CONFIG GET min-replicas-to-write
CONFIG GET quorum-replicas-to-write
"""
Then redis cmd result should match regexp
"""
.*min-replicas-to-write *0.*
.*quorum-replicas-to-write *0.*
"""

Scenario: Cluster mode maintenance leave updates master host in DCS after manual change
Expand Down
Loading

0 comments on commit d87f305

Please sign in to comment.