Skip to content

Commit

Permalink
Support read only replicas attaching to active replicas (Bug #229)
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnSully committed Sep 23, 2020
1 parent 22ac56d commit 676644f
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 16 deletions.
6 changes: 3 additions & 3 deletions src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2260,7 +2260,7 @@ int processMultibulkBuffer(client *c) {
* 1. The client is reset unless there are reasons to avoid doing it.
* 2. In the case of master clients, the replication offset is updated.
* 3. Propagate commands we got from our master to replicas down the line. */
void commandProcessed(client *c) {
void commandProcessed(client *c, int flags) {
long long prev_offset = c->reploff;
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
Expand Down Expand Up @@ -2288,7 +2288,7 @@ void commandProcessed(client *c) {
ae.arm(c);
long long applied = c->reploff - prev_offset;
if (applied) {
if (!g_pserver->fActiveReplica)
if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE))
{
replicationFeedSlavesFromMasterStream(g_pserver->slaves,
c->pending_querybuf, applied);
Expand All @@ -2312,7 +2312,7 @@ int processCommandAndResetClient(client *c, int flags) {
serverAssert(GlobalLocksAcquired());

if (processCommand(c, flags) == C_OK) {
commandProcessed(c);
commandProcessed(c, flags);
}
if (serverTL->current_client == NULL) deadclient = 1;
serverTL->current_client = NULL;
Expand Down
53 changes: 42 additions & 11 deletions src/replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ void processReplconfUuid(client *c, robj *arg)
* full resync. */
void replconfCommand(client *c) {
int j;
bool fCapaCommand = false;

if ((c->argc % 2) == 0) {
/* Number of arguments must be odd to make sure that every
Expand All @@ -1176,6 +1177,7 @@ void replconfCommand(client *c) {

/* Process every option-value pair. */
for (j = 1; j < c->argc; j+=2) {
fCapaCommand = false;
if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"listening-port")) {
long port;

Expand All @@ -1200,6 +1202,8 @@ void replconfCommand(client *c) {
c->slave_capa |= SLAVE_CAPA_PSYNC2;
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;

fCapaCommand = true;
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
/* REPLCONF ACK is used by replica to inform the master the amount
* of replication stream that it processed so far. It is an
Expand Down Expand Up @@ -1242,7 +1246,16 @@ void replconfCommand(client *c) {
return;
}
}
addReply(c,shared.ok);

if (fCapaCommand) {
sds reply = sdsnew("+OK");
if (g_pserver->fActiveReplica)
reply = sdscat(reply, " active-replica");
reply = sdscat(reply, "\r\n");
addReplySds(c, reply);
} else {
addReply(c,shared.ok);
}
}

/* This function puts a replica in the online state, and should be called just
Expand Down Expand Up @@ -2557,6 +2570,30 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
return PSYNC_NOT_SUPPORTED;
}

void parseMasterCapa(redisMaster *mi, sds strcapa)
{
if (sdslen(strcapa) < 1 || strcapa[0] != '+')
return;

char *szStart = strcapa + 1; // skip the +
char *pchEnd = szStart;

mi->isActive = false;
for (;;)
{
if (*pchEnd == ' ' || *pchEnd == '\0') {
// Parse the word
if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) {
mi->isActive = true;
}
szStart = pchEnd + 1;
}
if (*pchEnd == '\0')
break;
++pchEnd;
}
}

/* This handler fires when the non blocking connect was able to
* establish a connection with the master. */
void syncWithMaster(connection *conn) {
Expand Down Expand Up @@ -2750,16 +2787,8 @@ void syncWithMaster(connection *conn) {
*
* The master will ignore capabilities it does not understand. */
if (mi->repl_state == REPL_STATE_SEND_CAPA) {
if (g_pserver->fActiveReplica)
{
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
"capa","eof","capa","psync2","capa","activeExpire",NULL);
}
else
{
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
"capa","eof","capa","psync2",NULL);
}
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
"capa","eof","capa","psync2","capa","activeExpire",NULL);
if (err) goto write_error;
sdsfree(err);
mi->repl_state = REPL_STATE_RECEIVE_CAPA;
Expand All @@ -2774,6 +2803,8 @@ void syncWithMaster(connection *conn) {
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
} else {
parseMasterCapa(mi, err);
}
sdsfree(err);
mi->repl_state = REPL_STATE_SEND_PSYNC;
Expand Down
17 changes: 15 additions & 2 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1735,14 +1735,26 @@ void clientsCron(int iel) {
freeClientsInAsyncFreeQueue(iel);
}

bool expireOwnKeys()
{
if (iAmMaster()) {
return true;
} else if (!g_pserver->fActiveReplica && (listLength(g_pserver->masters) == 1)) {
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
if (mi->isActive)
return true;
}
return false;
}

/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (g_pserver->active_expire_enabled) {
if (iAmMaster()) {
if (expireOwnKeys()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
Expand Down Expand Up @@ -2461,6 +2473,7 @@ void initMasterInfo(redisMaster *master)
master->cached_master = NULL;
master->master_initial_offset = -1;

master->isActive = false;

master->repl_state = REPL_STATE_NONE;
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
Expand Down Expand Up @@ -3551,7 +3564,7 @@ void call(client *c, int flags) {
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;

if (c->cmd->flags & CMD_SKIP_PROPOGATE)
if ((c->cmd->flags & CMD_SKIP_PROPOGATE) && g_pserver->fActiveReplica)
propagate_flags &= ~PROPAGATE_REPL;

/* Call propagate() only if at least one of AOF / replication
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,7 @@ struct redisMaster {
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */

bool isActive = false;
int repl_state; /* Replication status if the instance is a replica */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
Expand Down

0 comments on commit 676644f

Please sign in to comment.