Skip to content

Commit

Permalink
Rename Slave -> Replica
Browse files Browse the repository at this point in the history
Rename slaves_waiting_psync to replicas_waiting_psync
Rename methods for waiting rax
Rename variables

Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera committed Jun 18, 2024
1 parent e8e67b4 commit dd17e3e
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 48 deletions.
4 changes: 2 additions & 2 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1847,11 +1847,11 @@ client *lookupClientByID(uint64_t id) {
}

/* Return a client by ID, or NULL if the client ID is not in the set
* of slaves waiting psync clients. */
* of replicas waiting psync clients. */
client *lookupRdbClientByID(uint64_t id) {
id = htonu64(id);
void *c = NULL;
raxFind(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),&c);
raxFind(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),&c);
return c;
}

Expand Down
2 changes: 1 addition & 1 deletion src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3600,7 +3600,7 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
* to inform it with the save end offset.*/
sendCurrentOffsetToReplica(slave);
/* Make sure repl traffic is appended to the replication backlog */
addSlaveToPsyncWaitingRax(slave);
addReplicaToPsyncWaitingRax(slave);
} else {
server.rdb_pipe_numconns++;
}
Expand Down
54 changes: 27 additions & 27 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ void rebaseReplicationBuffer(long long base_repl_offset) {
* On COB overrun, association is deleted and the RDB connection
* is dropped.
*/
void addSlaveToPsyncWaitingRax(client* slave) {
void addReplicaToPsyncWaitingRax(client* replica) {
listNode *ln = NULL;
replBufBlock *tail = NULL;
if (server.repl_backlog == NULL) {
Expand All @@ -227,52 +227,52 @@ void addSlaveToPsyncWaitingRax(client* slave) {
tail->refcount++;
}
}
serverLog(LL_DEBUG, "Add slave %s to waiting psync rax, with cid %llu, %s ", replicationGetSlaveName(slave), (long long unsigned int)slave->id,
serverLog(LL_DEBUG, "Add replica %s to waiting psync rax, with cid %llu, %s ", replicationGetSlaveName(replica), (long long unsigned int)replica->id,
tail? "with repl-backlog tail": "repl-backlog is empty");
slave->ref_repl_buf_node = tail? ln: NULL;
replica->ref_repl_buf_node = tail? ln: NULL;
/* Prevent rdb client from being freed before psync is established. */
slave->flags |= CLIENT_PROTECTED_RDB_CHANNEL;
uint64_t id = htonu64(slave->id);
raxInsert(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),slave,NULL);
replica->flags |= CLIENT_PROTECTED_RDB_CHANNEL;
uint64_t id = htonu64(replica->id);
raxInsert(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),replica,NULL);
}

/* Attach waiting psync replicas with new replication backlog head. */
void addSlaveToPsyncWaitingRaxRetrospect(void) {
void addReplicaToPsyncWaitingRaxRetrospect(void) {
listNode *ln = listFirst(server.repl_buffer_blocks);
replBufBlock *head = ln ? listNodeValue(ln) : NULL;
raxIterator iter;

if (head == NULL) return;
/* Update waiting psync slaves to wait on new buffer block */
raxStart(&iter,server.slaves_waiting_psync);
/* Update waiting psync replicas to wait on new buffer block */
raxStart(&iter,server.replicas_waiting_psync);
raxSeek(&iter, "^", NULL, 0);
while(raxNext(&iter)) {
client* slave = iter.data;
if (slave->ref_repl_buf_node) continue;
slave->ref_repl_buf_node = ln;
client* replica = iter.data;
if (replica->ref_repl_buf_node) continue;
replica->ref_repl_buf_node = ln;
head->refcount++;
serverLog(LL_DEBUG, "Retrospect attach slave %llu to repl buf block", (long long unsigned int)slave->id);
serverLog(LL_DEBUG, "Retrospect attach replica %llu to repl buf block", (long long unsigned int)replica->id);
}
raxStop(&iter);
}

void removeSlaveFromPsyncWaitingRax(client* slave) {
void removeReplicaFromPsyncWaitingRax(client* replica) {
listNode *ln;
replBufBlock *o;
/* Get replBufBlock pointed by this replica */
client *peer_slave = lookupRdbClientByID(slave->associated_rdb_client_id);
ln = peer_slave->ref_repl_buf_node;
client *peer_replica = lookupRdbClientByID(replica->associated_rdb_client_id);
ln = peer_replica->ref_repl_buf_node;
o = ln ? listNodeValue(ln) : NULL;
if (o != NULL) {
serverAssert(o->refcount > 0);
o->refcount--;
}
peer_slave->ref_repl_buf_node = NULL;
peer_slave->flags &= ~CLIENT_PROTECTED_RDB_CHANNEL;
serverLog(LL_DEBUG, "Remove psync waiting slave %s with cid %llu, repl buffer block %s",
replicationGetSlaveName(slave), (long long unsigned int)slave->associated_rdb_client_id, o? "ref count decreased": "doesn't exist");
uint64_t id = htonu64(peer_slave->id);
raxRemove(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),NULL);
peer_replica->ref_repl_buf_node = NULL;
peer_replica->flags &= ~CLIENT_PROTECTED_RDB_CHANNEL;
serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu, repl buffer block %s",
replicationGetSlaveName(replica), (long long unsigned int)replica->associated_rdb_client_id, o? "ref count decreased": "doesn't exist");
uint64_t id = htonu64(peer_replica->id);
raxRemove(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),NULL);
}

void resetReplicationBuffer(void) {
Expand Down Expand Up @@ -392,7 +392,7 @@ void incrementalTrimReplicationBacklog(size_t max_blocks) {
void freeReplicaReferencedReplBuffer(client *replica) {
if (replica->flags & CLIENT_REPL_RDB_CHANNEL) {
uint64_t id = htonu64(replica->id);
if(raxRemove(server.slaves_waiting_psync,(unsigned char*)&id,sizeof(id),NULL)) {
if(raxRemove(server.replicas_waiting_psync,(unsigned char*)&id,sizeof(id),NULL)) {
serverLog(LL_DEBUG, "Remove psync waiting slave %s with cid %llu from replicas rax.",
replicationGetSlaveName(replica), (long long unsigned int)replica->associated_rdb_client_id);
}
Expand Down Expand Up @@ -473,9 +473,9 @@ void feedReplicationBuffer(char *s, size_t len) {
server.master_repl_offset += copy;
server.repl_backlog->histlen += copy;
}
if (empty_backlog && raxSize(server.slaves_waiting_psync) > 0) {
if (empty_backlog && raxSize(server.replicas_waiting_psync) > 0) {
/* Increase refcount for pending replicas. */
addSlaveToPsyncWaitingRaxRetrospect();
addReplicaToPsyncWaitingRaxRetrospect();
}

/* For output buffer of replicas. */
Expand Down Expand Up @@ -889,7 +889,7 @@ int masterTryPartialResynchronization(client *c, long long psync_offset) {
c->flags |= CLIENT_SLAVE;
if (c->flags & CLIENT_REPL_MAIN_CHANNEL && lookupRdbClientByID(c->associated_rdb_client_id)) {
c->replstate = SLAVE_STATE_BG_RDB_LOAD;
removeSlaveFromPsyncWaitingRax(c);
removeReplicaFromPsyncWaitingRax(c);
} else {
c->replstate = SLAVE_STATE_ONLINE;
}
Expand Down Expand Up @@ -4750,7 +4750,7 @@ void replicationCron(void) {
if (listLength(server.repl_buffer_blocks) > 0) {
replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks));
serverAssert(o->refcount > 0 &&
o->refcount <= (int)listLength(server.slaves) + 1 + (int)raxSize(server.slaves_waiting_psync));
o->refcount <= (int)listLength(server.slaves) + 1 + (int)raxSize(server.replicas_waiting_psync));
}

/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
Expand Down
6 changes: 3 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2638,7 +2638,7 @@ void initServer(void) {
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.slaves_waiting_psync = raxNew();
server.replicas_waiting_psync = raxNew();
server.wait_before_rdb_client_free = DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE;
server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate();
Expand Down Expand Up @@ -6071,13 +6071,13 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
slaveid,slaveip,slave->slave_listening_port,state,
slave->repl_ack_off, lag,
slave->flags & CLIENT_REPL_RDB_CHANNEL ? "rdb-conn":
slave->replstate == SLAVE_STATE_BG_RDB_LOAD ? "main-conn": "normal-slave");
slave->replstate == SLAVE_STATE_BG_RDB_LOAD ? "main-conn": "replica");
slaveid++;
}
}
/* clang-format off */
info = sdscatprintf(info, FMTARGS(
"slaves_waiting_psync:%llu\r\n", (unsigned long long)raxSize(server.slaves_waiting_psync),
"replicas_waiting_psync:%llu\r\n", (unsigned long long)raxSize(server.replicas_waiting_psync),
"master_failover_state:%s\r\n", getFailoverStateString(),
"master_replid:%s\r\n", server.replid,
"master_replid2:%s\r\n", server.replid2,
Expand Down
4 changes: 2 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ struct valkeyServer {
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */
rax *slaves_waiting_psync; /* Radix tree using rdb-client id as keys and rdb-client as values.
rax *replicas_waiting_psync; /* Radix tree using rdb-client id as keys and rdb-client as values.
* This rax contains slaves for the period from the beginning of
* their RDB connection to the end of their main connection's
* partial synchronization. */
Expand Down Expand Up @@ -2931,7 +2931,7 @@ void abortFailover(const char *err);
const char *getFailoverStateString(void);
void abortRdbConnectionSync(void);
int sendCurrentOffsetToReplica(client* replica);
void addSlaveToPsyncWaitingRax(client* slave);
void addReplicaToPsyncWaitingRax(client* slave);

/* Generic persistence functions */
void startLoadingFile(size_t size, char* filename, int rdbflags);
Expand Down
24 changes: 12 additions & 12 deletions tests/integration/repl-rdb-channel.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ start_server {tags {"repl rdb-channel external:skip"}} {
$replica2 debug sleep-after-fork [expr {2 * [expr {10 ** 6}]}]
test "Test rdb-channel connection peering - start with empty backlog (retrospect)" {
$replica1 slaveof $master_host $master_port
set res [wait_for_log_messages 0 {"*Add slave * repl-backlog is empty*"} $loglines 2000 1]
set res [wait_for_log_messages 0 {"*Retrospect attach slave*"} $loglines 2000 1]
set res [wait_for_log_messages 0 {"*Add replica * repl-backlog is empty*"} $loglines 2000 1]
set res [wait_for_log_messages 0 {"*Retrospect attach replica*"} $loglines 2000 1]
set loglines [lindex $res 1]
incr $loglines
verify_replica_online $master 0 700
Expand All @@ -486,12 +486,12 @@ start_server {tags {"repl rdb-channel external:skip"}} {
fail "Replica is not synced"
}
$replica1 slaveof no one
assert [string match *slaves_waiting_psync:0* [$master info replication]]
assert [string match *replicas_waiting_psync:0* [$master info replication]]
}

test "Test rdb-channel connection peering - start with backlog" {
$replica2 slaveof $master_host $master_port
set res [wait_for_log_messages 0 {"*Add slave * with repl-backlog tail*"} $loglines 2000 1]
set res [wait_for_log_messages 0 {"*Add replica * with repl-backlog tail*"} $loglines 2000 1]
set loglines [lindex $res 1]
incr $loglines
verify_replica_online $master 0 700
Expand All @@ -500,7 +500,7 @@ start_server {tags {"repl rdb-channel external:skip"}} {
} else {
fail "Replica is not synced"
}
assert [string match *slaves_waiting_psync:0* [$master info replication]]
assert [string match *replicas_waiting_psync:0* [$master info replication]]
}

stop_write_load $load_handle0
Expand Down Expand Up @@ -555,14 +555,14 @@ start_server {tags {"repl rdb-channel external:skip"}} {
# Force the replica to sleep for 3 seconds so the master main process will wake up, while the replica is unresponsive.
set sleep_handle [start_bg_server_sleep $replica_host $replica_port 3]
wait_for_condition 50 100 {
[string match {*slaves_waiting_psync:1*} [$master info replication]]
[string match {*replicas_waiting_psync:1*} [$master info replication]]
} else {
fail "Master freed RDB client before psync was established"
}

verify_replica_online $master 0 500
wait_for_condition 50 100 {
[string match {*slaves_waiting_psync:0*} [$master info replication]]
[string match {*replicas_waiting_psync:0*} [$master info replication]]
} else {
fail "Master did not free repl buf block after psync establishment"
}
Expand Down Expand Up @@ -594,7 +594,7 @@ start_server {tags {"repl rdb-channel external:skip"}} {
# We expect the grace time to be over before the replica wake up, so sync will fail.
set sleep_handle [start_bg_server_sleep $replica_host $replica_port 8]
wait_for_condition 50 100 {
[string match {*slaves_waiting_psync:1*} [$master info replication]]
[string match {*replicas_waiting_psync:1*} [$master info replication]]
} else {
fail "Master should wait before freeing repl block"
}
Expand All @@ -605,7 +605,7 @@ start_server {tags {"repl rdb-channel external:skip"}} {
# Should succeed on retry
verify_replica_online $master 0 500
wait_for_condition 50 100 {
[string match {*slaves_waiting_psync:0*} [$master info replication]]
[string match {*replicas_waiting_psync:0*} [$master info replication]]
} else {
fail "Master did not free repl buf block after psync establishment"
}
Expand Down Expand Up @@ -653,7 +653,7 @@ start_server {tags {"repl rdb-channel external:skip"}} {
set sleep_handle [start_bg_server_sleep $replica_host $replica_port 5]
wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 2000 1
wait_for_condition 50 100 {
[string match {*slaves_waiting_psync:0*} [$master info replication]]
[string match {*replicas_waiting_psync:0*} [$master info replication]]
} else {
fail "Master did not free repl buf block after sync failure"
}
Expand All @@ -675,7 +675,7 @@ start_server {tags {"repl rdb-channel external:skip"}} {

wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 2000 1
wait_for_condition 50 100 {
[string match {*slaves_waiting_psync:0*} [$master info replication]]
[string match {*replicas_waiting_psync:0*} [$master info replication]]
} else {
fail "Master did not free repl buf block after sync failure"
}
Expand Down Expand Up @@ -782,4 +782,4 @@ start_server {tags {"repl rdb-channel external:skip"}} {
}
}
}
}
}
2 changes: 1 addition & 1 deletion tests/integration/replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ foreach mdl {no yes} rdbchannel {no yes} {
wait_for_ofs_sync $master [lindex $slaves 1]
wait_for_ofs_sync $master [lindex $slaves 2]

assert [string match *slaves_waiting_psync:0* [$master info replication]]
assert [string match *replicas_waiting_psync:0* [$master info replication]]

# Check digests
set digest [$master debug digest]
Expand Down

0 comments on commit dd17e3e

Please sign in to comment.