diff --git a/src/networking.cpp b/src/networking.cpp index f881dd1a6..0616534ee 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2260,7 +2260,7 @@ int processMultibulkBuffer(client *c) { * 1. The client is reset unless there are reasons to avoid doing it. * 2. In the case of master clients, the replication offset is updated. * 3. Propagate commands we got from our master to replicas down the line. */ -void commandProcessed(client *c) { +void commandProcessed(client *c, int flags) { long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ @@ -2288,7 +2288,7 @@ void commandProcessed(client *c) { ae.arm(c); long long applied = c->reploff - prev_offset; if (applied) { - if (!g_pserver->fActiveReplica) + if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE)) { replicationFeedSlavesFromMasterStream(g_pserver->slaves, c->pending_querybuf, applied); @@ -2312,7 +2312,7 @@ int processCommandAndResetClient(client *c, int flags) { serverAssert(GlobalLocksAcquired()); if (processCommand(c, flags) == C_OK) { - commandProcessed(c); + commandProcessed(c, flags); } if (serverTL->current_client == NULL) deadclient = 1; serverTL->current_client = NULL; diff --git a/src/replication.cpp b/src/replication.cpp index 38bbed40f..beab02809 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1166,6 +1166,7 @@ void processReplconfUuid(client *c, robj *arg) * full resync. */ void replconfCommand(client *c) { int j; + bool fCapaCommand = false; if ((c->argc % 2) == 0) { /* Number of arguments must be odd to make sure that every @@ -1176,6 +1177,7 @@ void replconfCommand(client *c) { /* Process every option-value pair. */ for (j = 1; j < c->argc; j+=2) { + fCapaCommand = false; if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"listening-port")) { long port; @@ -1200,6 +1202,8 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_PSYNC2; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; + + fCapaCommand = true; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { /* REPLCONF ACK is used by replica to inform the master the amount * of replication stream that it processed so far. It is an @@ -1242,7 +1246,16 @@ void replconfCommand(client *c) { return; } } - addReply(c,shared.ok); + + if (fCapaCommand) { + sds reply = sdsnew("+OK"); + if (g_pserver->fActiveReplica) + reply = sdscat(reply, " active-replica"); + reply = sdscat(reply, "\r\n"); + addReplySds(c, reply); + } else { + addReply(c,shared.ok); + } } /* This function puts a replica in the online state, and should be called just @@ -2557,6 +2570,30 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read return PSYNC_NOT_SUPPORTED; } +void parseMasterCapa(redisMaster *mi, sds strcapa) +{ + if (sdslen(strcapa) < 1 || strcapa[0] != '+') + return; + + char *szStart = strcapa + 1; // skip the + + char *pchEnd = szStart; + + mi->isActive = false; + for (;;) + { + if (*pchEnd == ' ' || *pchEnd == '\0') { + // Parse the word + if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) { + mi->isActive = true; + } + szStart = pchEnd + 1; + } + if (*pchEnd == '\0') + break; + ++pchEnd; + } +} + /* This handler fires when the non blocking connect was able to * establish a connection with the master. */ void syncWithMaster(connection *conn) { @@ -2750,16 +2787,8 @@ void syncWithMaster(connection *conn) { * * The master will ignore capabilities it does not understand. */ if (mi->repl_state == REPL_STATE_SEND_CAPA) { - if (g_pserver->fActiveReplica) - { - err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF", - "capa","eof","capa","psync2","capa","activeExpire",NULL); - } - else - { - err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF", - "capa","eof","capa","psync2",NULL); - } + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF", + "capa","eof","capa","psync2","capa","activeExpire",NULL); if (err) goto write_error; sdsfree(err); mi->repl_state = REPL_STATE_RECEIVE_CAPA; @@ -2774,6 +2803,8 @@ void syncWithMaster(connection *conn) { if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF capa: %s", err); + } else { + parseMasterCapa(mi, err); } sdsfree(err); mi->repl_state = REPL_STATE_SEND_PSYNC; diff --git a/src/server.cpp b/src/server.cpp index 3813e9d85..17aec83d7 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1735,6 +1735,18 @@ void clientsCron(int iel) { freeClientsInAsyncFreeQueue(iel); } +bool expireOwnKeys() +{ + if (iAmMaster()) { + return true; + } else if (!g_pserver->fActiveReplica && (listLength(g_pserver->masters) == 1)) { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + if (mi->isActive) + return true; + } + return false; +} + /* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ @@ -1742,7 +1754,7 @@ void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (g_pserver->active_expire_enabled) { - if (iAmMaster()) { + if (expireOwnKeys()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); @@ -2461,6 +2473,7 @@ void initMasterInfo(redisMaster *master) master->cached_master = NULL; master->master_initial_offset = -1; + master->isActive = false; master->repl_state = REPL_STATE_NONE; master->repl_down_since = 0; /* Never connected, repl is down since EVER. */ @@ -3551,7 +3564,7 @@ void call(client *c, int flags) { !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; - if (c->cmd->flags & CMD_SKIP_PROPOGATE) + if ((c->cmd->flags & CMD_SKIP_PROPOGATE) && g_pserver->fActiveReplica) propagate_flags &= ~PROPAGATE_REPL; /* Call propagate() only if at least one of AOF / replication diff --git a/src/server.h b/src/server.h index 2fb066546..02aa630ab 100644 --- a/src/server.h +++ b/src/server.h @@ -1371,6 +1371,7 @@ struct redisMaster { char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ long long master_initial_offset; /* Master PSYNC offset. */ + bool isActive = false; int repl_state; /* Replication status if the instance is a replica */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */