Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce quorum-replicas-to-write setting #105

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/app/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func countRunningHAReplicas(shardState map[string]*HostState) int {
}

func (app *App) getFailoverQuorum(activeNodes []string) int {
fq := len(activeNodes) - app.getMinReplicasToWrite(activeNodes)
fq := len(activeNodes) - app.getNumReplicasToWrite(activeNodes)
if fq < 1 || app.config.Redis.AllowDataLoss {
fq = 1
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func (app *App) enterMaintenance(maintenance *Maintenance, master string) error {
node := app.shard.Get(master)
err, rewriteErr := node.SetMinReplicas(app.ctx, 0)
err, rewriteErr := node.SetNumQuorumReplicas(app.ctx, 0)
if err != nil {
return err
}
Expand Down
8 changes: 2 additions & 6 deletions internal/app/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ import (
"github.com/yandex/rdsync/internal/dcs"
)

func (app *App) getMinReplicasToWrite(activeNodes []string) int {
minReplicas := len(activeNodes) / 2
if app.config.Redis.MaxReplicasToWrite >= 0 && minReplicas > app.config.Redis.MaxReplicasToWrite {
minReplicas = app.config.Redis.MaxReplicasToWrite
}
return minReplicas
func (app *App) getNumReplicasToWrite(activeNodes []string) int {
return len(activeNodes) / 2
}

func (app *App) getCurrentMaster(shardState map[string]*HostState) (string, error) {
Expand Down
23 changes: 16 additions & 7 deletions internal/app/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,26 @@ func (app *App) repairShard(shardState map[string]*HostState, activeNodes []stri
}

func (app *App) repairMaster(node *redis.Node, activeNodes []string, state *HostState) {
expectedMinReplicas := app.getMinReplicasToWrite(activeNodes)
actualMinReplicas, err := node.GetMinReplicas(app.ctx)
if state.IsReadOnly {
err, rewriteErr := node.SetReadWrite(app.ctx)
if err != nil {
app.logger.Error("Unable to set master read-write", "fqdn", node.FQDN(), "error", err)
}
if rewriteErr != nil {
app.logger.Error("Unable to rewrite config on master", "fqdn", node.FQDN(), "error", rewriteErr)
}
}
expectedNumReplicas := app.getNumReplicasToWrite(activeNodes)
actualNumReplicas, err := node.GetNumQuorumReplicas(app.ctx)
if err != nil {
app.logger.Error("Unable to get actual min replicas on master", "fqdn", node.FQDN(), "error", err)
app.logger.Error("Unable to get actual num quorum replicas on master", "fqdn", node.FQDN(), "error", err)
return
}
if actualMinReplicas != expectedMinReplicas {
app.logger.Info(fmt.Sprintf("Changing min replicas from %d to %d on master", actualMinReplicas, expectedMinReplicas), "fqdn", node.FQDN())
err, rewriteErr := node.SetMinReplicas(app.ctx, expectedMinReplicas)
if actualNumReplicas != expectedNumReplicas {
app.logger.Info(fmt.Sprintf("Changing num quorum replicas from %d to %d on master", actualNumReplicas, expectedNumReplicas), "fqdn", node.FQDN())
err, rewriteErr := node.SetNumQuorumReplicas(app.ctx, expectedNumReplicas)
if err != nil {
app.logger.Error("Unable to set min replicas on master", "fqdn", node.FQDN(), "error", err)
app.logger.Error("Unable to set num quorum replicas on master", "fqdn", node.FQDN(), "error", err)
}
if rewriteErr != nil {
app.logger.Error("Unable to rewrite config on master", "fqdn", node.FQDN(), "error", rewriteErr)
Expand Down
11 changes: 9 additions & 2 deletions internal/app/switchover.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,22 @@ func (app *App) performSwitchover(shardState map[string]*HostState, activeNodes
app.waitPoisonPill(app.config.Redis.WaitPoisonPillTimeout)
}

if len(aliveActiveNodes) == 1 || app.config.Redis.AllowDataLoss || app.config.Redis.MaxReplicasToWrite == 0 {
if len(aliveActiveNodes) == 1 || app.config.Redis.AllowDataLoss {
node := app.shard.Get(newMaster)
err, errConf := node.SetMinReplicas(app.ctx, 0)
err, errConf := node.SetReadWrite(app.ctx)
if err != nil {
return fmt.Errorf("unable to set %s available for write before promote: %s", newMaster, err.Error())
}
if errConf != nil {
return fmt.Errorf("unable to rewrite config on %s before promote: %s", newMaster, errConf.Error())
}
err, errConf = node.SetNumQuorumReplicas(app.ctx, 0)
if err != nil {
return fmt.Errorf("unable to set num quorum replicas to 0 on %s: %s", newMaster, err.Error())
}
if errConf != nil {
return fmt.Errorf("unable to rewrite config on %s before promote: %s", newMaster, errConf.Error())
}
}

if app.config.Redis.TurnBeforeSwitchover {
Expand Down
2 changes: 0 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type RedisConfig struct {
WaitPromoteForceTimeout time.Duration `yaml:"wait_promote_force_timeout"`
WaitPoisonPillTimeout time.Duration `yaml:"wait_poison_pill_timeout"`
MaxParallelSyncs int `yaml:"max_parallel_syncs"`
MaxReplicasToWrite int `yaml:"max_replicas_to_write"`
AllowDataLoss bool `yaml:"allow_data_loss"`
TurnBeforeSwitchover bool `yaml:"turn_before_switchover"`
RestartCommand string `yaml:"restart_command"`
Expand Down Expand Up @@ -112,7 +111,6 @@ func DefaultRedisConfig() RedisConfig {
WaitPromoteForceTimeout: 10 * time.Second,
WaitPoisonPillTimeout: 30 * time.Second,
MaxParallelSyncs: 1,
MaxReplicasToWrite: -1,
AllowDataLoss: false,
TurnBeforeSwitchover: false,
RestartCommand: "systemctl restart redis-server",
Expand Down
48 changes: 36 additions & 12 deletions internal/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ func (n *Node) DisconnectClients(ctx context.Context, ctype string) error {
return disconnectCmd.Err()
}

// GetMinReplicas returns number of connected replicas to accept writes on node
func (n *Node) GetMinReplicas(ctx context.Context) (int, error) {
cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "min-replicas-to-write")
// GetNumQuorumReplicas returns number of connected replicas to accept writes on node
func (n *Node) GetNumQuorumReplicas(ctx context.Context) (int, error) {
cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "quorum-replicas-to-write")
err := n.conn.Process(ctx, cmd)
if err != nil {
return 0, err
Expand All @@ -269,18 +269,18 @@ func (n *Node) GetMinReplicas(ctx context.Context) (int, error) {
return 0, err
}
if len(vals) != 2 {
return 0, fmt.Errorf("unexpected config get result for min-replicas-to-write: %v", vals)
return 0, fmt.Errorf("unexpected config get result for quorum-replicas-to-write: %v", vals)
}
ret, err := strconv.ParseInt(vals[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("unable to parse min-replicas-to-write value: %s", err.Error())
return 0, fmt.Errorf("unable to parse quorum-replicas-to-write value: %s", err.Error())
}
return int(ret), nil
}

// SetMinReplicas sets desired number of connected replicas to accept writes on node
func (n *Node) SetMinReplicas(ctx context.Context, value int) (error, error) {
setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "min-replicas-to-write", strconv.Itoa(value))
// SetNumQuorumReplicas sets desired number of connected replicas to accept writes on node
func (n *Node) SetNumQuorumReplicas(ctx context.Context, value int) (error, error) {
setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "quorum-replicas-to-write", strconv.Itoa(value))
err := setCmd.Err()
if err != nil {
return err, nil
Expand Down Expand Up @@ -368,19 +368,33 @@ func (n *Node) SetAppendonly(ctx context.Context, value bool) error {

// IsReadOnly returns ReadOnly status for node
func (n *Node) IsReadOnly(ctx context.Context) (bool, error) {
minReplicas, err := n.GetMinReplicas(ctx)
cmd := client.NewStringSliceCmd(ctx, n.config.Renames.Config, "get", "min-replicas-to-write")
err := n.conn.Process(ctx, cmd)
if err != nil {
return false, err
}
vals, err := cmd.Result()
if err != nil {
return false, err
}
return minReplicas == highMinReplicas, nil
if len(vals) != 2 {
return false, fmt.Errorf("unexpected config get result for min-replicas-to-write: %v", vals)
}
ret, err := strconv.ParseInt(vals[1], 10, 32)
if err != nil {
return false, fmt.Errorf("unable to parse min-replicas-to-write value: %s", err.Error())
}
return int(ret) > 0, nil
}

// SetReadOnly makes node read-only by setting min replicas to unreasonably high value and disconnecting clients
func (n *Node) SetReadOnly(ctx context.Context, disconnect bool) (error, error) {
err, rewriteErr := n.SetMinReplicas(ctx, highMinReplicas)
setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "min-replicas-to-write", strconv.Itoa(highMinReplicas))
err := setCmd.Err()
if err != nil {
return err, rewriteErr
return err, nil
}
rewriteErr := n.configRewrite(ctx)
if disconnect {
err = n.DisconnectClients(ctx, "normal")
if err != nil {
Expand All @@ -394,6 +408,16 @@ func (n *Node) SetReadOnly(ctx context.Context, disconnect bool) (error, error)
return nil, rewriteErr
}

// SetReadOnly makes node returns min-replicas-to-write to zero
func (n *Node) SetReadWrite(ctx context.Context) (error, error) {
setCmd := n.conn.Do(ctx, n.config.Renames.Config, "set", "min-replicas-to-write", "0")
err := setCmd.Err()
if err != nil {
return err, nil
}
return nil, n.configRewrite(ctx)
}

// SetOnline allows non-localhost connections
func (n *Node) SetOnline(ctx context.Context) error {
if !n.IsLocal() {
Expand Down
46 changes: 31 additions & 15 deletions redis_patches/0004_Add_waitquorum_command.patch
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ index 000000000..dca11ab67
+ }
+}
diff --git a/src/config.c b/src/config.c
index a7231868c..60e197c36 100644
index a7231868c..6939dc893 100644
--- a/src/config.c
+++ b/src/config.c
@@ -3093,6 +3093,78 @@ static void rewriteConfigOfflineMode(standardConfig *config, const char *name, s
Expand Down Expand Up @@ -150,7 +150,15 @@ index a7231868c..60e197c36 100644
standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
@@ -3309,6 +3381,7 @@ standardConfig static_configs[] = {
@@ -3230,6 +3302,7 @@ standardConfig static_configs[] = {
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
+ createIntConfig("quorum-replicas-to-write", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.quorum_replicas_to_write, 0, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
@@ -3309,6 +3382,7 @@ standardConfig static_configs[] = {
createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL),
createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL),
createSpecialConfig("offline", NULL, MODIFIABLE_CONFIG, setOfflineMode, getOfflineMode, rewriteConfigOfflineMode, applyBind),
Expand All @@ -171,7 +179,7 @@ index 7696e8c28..82c6f2b97 100644
c->reploff = 0;
c->read_reploff = 0;
diff --git a/src/replication.c b/src/replication.c
index 47172dba3..6bfb21ef4 100644
index 47172dba3..04f0c29bb 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1285,7 +1285,8 @@ void replconfCommand(client *c) {
Expand Down Expand Up @@ -220,7 +228,7 @@ index 47172dba3..6bfb21ef4 100644
replicationRequestAckFromSlaves();
}

+/* WAITQUORUM for min-replicas-to-write quorum replicas to acknowledge the processing of our latest
+/* WAITQUORUM for quorum-replicas-to-write quorum replicas to acknowledge the processing of our latest
+ * write command (and all the previous commands). */
+void waitquorumCommand(client *c) {
+ long ackreplicas;
Expand All @@ -233,14 +241,14 @@ index 47172dba3..6bfb21ef4 100644
+
+ /* First try without blocking at all. */
+ ackreplicas = replicationCountQuorumAcksByOffset(c->woff);
+ if (ackreplicas >= server.repl_min_slaves_to_write || c->flags & CLIENT_DENY_BLOCKING) {
+ if (ackreplicas >= server.quorum_replicas_to_write || c->flags & CLIENT_DENY_BLOCKING) {
+ addReplyLongLong(c,ackreplicas);
+ return;
+ }
+
+ /* Otherwise block the client and put it into our list of clients
+ * waiting for ack from slaves. */
+ blockForReplication(c,0,offset,server.repl_min_slaves_to_write);
+ blockForReplication(c,0,offset,server.quorum_replicas_to_write);
+ c->bstate.quorum = 1;
+
+ /* Make sure that the server will send an ACK request to all the slaves
Expand Down Expand Up @@ -360,7 +368,7 @@ index 2aa63df77..4b1f4242a 100644
}
}
diff --git a/src/server.h b/src/server.h
index 05bdf5c0c..58dbd02b4 100644
index 320d5eeba..18c85d3dc 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1030,6 +1030,7 @@ typedef struct blockingState {
Expand All @@ -379,15 +387,20 @@ index 05bdf5c0c..58dbd02b4 100644
int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
@@ -1599,6 +1601,7 @@ struct redisServer {
@@ -1597,8 +1599,10 @@ struct redisServer {
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* The client that triggered the command execution (External or AOF). */
client *executing_client; /* The client executing the current command (possibly script or module). */
+ dict *quorum_replicas; /* Replicas that should participate in quorum commit */
- client *current_client; /* The client that triggered the command execution (External or AOF). */
- client *executing_client; /* The client executing the current command (possibly script or module). */
+ client *current_client; /* The client that triggered the command execution (External or AOF). */
+ client *executing_client; /* The client executing the current command (possibly script or module). */
+ dict *quorum_replicas; /* Replicas that should participate in quorum commit */
+ int quorum_replicas_to_write; /* Num replicas to accept write before returning from WAITQUORUM command */

#ifdef LOG_REQ_RES
char *req_res_logfile; /* Path of log file for logging all requests and their replies. If NULL, no logging will be performed */
@@ -2804,11 +2807,13 @@ void resizeReplicationBacklog(void);
@@ -2804,11 +2808,13 @@ void resizeReplicationBacklog(void);
void replicationSetMaster(char *ip, int port);
void replicationUnsetMaster(void);
void refreshGoodSlavesCount(void);
Expand All @@ -401,7 +414,7 @@ index 05bdf5c0c..58dbd02b4 100644
void replicationSendNewlineToMaster(void);
long long replicationGetSlaveOffset(void);
char *replicationGetSlaveName(client *c);
@@ -3654,6 +3659,7 @@ void bitposCommand(client *c);
@@ -3654,6 +3660,7 @@ void bitposCommand(client *c);
void replconfCommand(client *c);
void waitCommand(client *c);
void waitaofCommand(client *c);
Expand All @@ -410,7 +423,7 @@ index 05bdf5c0c..58dbd02b4 100644
void georadiusbymemberroCommand(client *c);
void georadiusCommand(client *c);
diff --git a/tests/unit/yandex-cloud-patches.tcl b/tests/unit/yandex-cloud-patches.tcl
index b8c3ba453..0ac84b270 100644
index b8c3ba453..43ea52352 100644
--- a/tests/unit/yandex-cloud-patches.tcl
+++ b/tests/unit/yandex-cloud-patches.tcl
@@ -21,3 +21,67 @@ start_server {config "minimal.conf" tags {"external:skip"}} {
Expand Down Expand Up @@ -438,7 +451,7 @@ index b8c3ba453..0ac84b270 100644
+ }
+
+ wait_replica_online $master
+ $master config set min-replicas-to-write 1
+ $master config set quorum-replicas-to-write 1
+ $master config set quorum-replicas $slave_host:$slave_port
+
+ test {WAITQUORUM should acknowledge 1 additional copy of the data} {
Expand Down Expand Up @@ -481,3 +494,6 @@ index b8c3ba453..0ac84b270 100644
+ }
+}
+}
--
2.47.1

10 changes: 5 additions & 5 deletions tests/features/01_cluster_maintenance.feature
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Feature: Cluster mode maintenance tests
["redis1","redis2","redis3"]
"""

Scenario: Cluster mode maintenance enter sets min-replicas-to-write to 0 on master
Scenario: Cluster mode maintenance enter sets quorum-replicas-to-write to 0 on master
Given clustered shard is up and running
Then zookeeper node "/test/active_nodes" should match json_exactly within "30" seconds
"""
Expand All @@ -59,11 +59,11 @@ Feature: Cluster mode maintenance tests
When I wait for "60" seconds
And I run command on redis host "redis1"
"""
CONFIG GET min-replicas-to-write
CONFIG GET quorum-replicas-to-write
"""
Then redis cmd result should match regexp
"""
.*min-replicas-to-write 1.*
.*quorum-replicas-to-write 1.*
"""
When I run command on host "redis1"
"""
Expand All @@ -83,11 +83,11 @@ Feature: Cluster mode maintenance tests
And zookeeper node "/test/active_nodes" should not exist
When I run command on redis host "redis1"
"""
CONFIG GET min-replicas-to-write
CONFIG GET quorum-replicas-to-write
"""
Then redis cmd result should match regexp
"""
.*min-replicas-to-write *0.*
.*quorum-replicas-to-write *0.*
"""

Scenario: Cluster mode maintenance leave updates master host in DCS after manual change
Expand Down
Loading
Loading