Skip to content

Commit

Permalink
merge with unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
lipzhu committed Feb 22, 2024
2 parents 1b4dccb + 5b9fc46 commit c12e446
Show file tree
Hide file tree
Showing 64 changed files with 2,131 additions and 1,168 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/external.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
run: |
./runtest \
--host 127.0.0.1 --port 6379 \
--verbose \
--tags -slow
- name: Archive redis log
if: ${{ failure() }}
Expand All @@ -49,6 +50,7 @@ jobs:
run: |
./runtest \
--host 127.0.0.1 --port 6379 \
--verbose \
--cluster-mode \
--tags -slow
- name: Archive redis log
Expand All @@ -73,6 +75,7 @@ jobs:
run: |
./runtest \
--host 127.0.0.1 --port 6379 \
--verbose \
--tags "-slow -needs:debug"
- name: Archive redis log
if: ${{ failure() }}
Expand Down
5 changes: 4 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ ifeq ($(OPTIMIZATION),-O3)
endif
REDIS_LDFLAGS+=-O3 -flto
endif
ifneq ($(OPTIMIZATION),-O0)
REDIS_CFLAGS+=-fno-omit-frame-pointer
endif
DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram fpconv
NODEPS:=clean distclean

Expand Down Expand Up @@ -345,7 +348,7 @@ endif

REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)
Expand Down
10 changes: 2 additions & 8 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1903,12 +1903,6 @@ int ACLCheckAllPerm(client *c, int *idxptr) {
return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, idxptr);
}

int totalSubscriptions(void) {
return dictSize(server.pubsub_patterns) +
dictSize(server.pubsub_channels) +
server.shard_channel_count;
}

/* If 'new' can access all channels 'original' could then return NULL;
Otherwise return a list of channels that the new user can access */
list *getUpcomingChannelList(user *new, user *original) {
Expand Down Expand Up @@ -2017,7 +2011,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {
* permissions specified via the upcoming argument, and kill them if so. */
void ACLKillPubsubClientsIfNeeded(user *new, user *original) {
/* Do nothing if there are no subscribers. */
if (totalSubscriptions() == 0)
if (pubsubTotalSubscriptions() == 0)
return;

list *channels = getUpcomingChannelList(new, original);
Expand Down Expand Up @@ -2450,7 +2444,7 @@ sds ACLLoadFromFile(const char *filename) {

/* If there are some subscribers, we need to check if we need to drop some clients. */
rax *user_channels = NULL;
if (totalSubscriptions() > 0) {
if (pubsubTotalSubscriptions() > 0) {
user_channels = raxNew();
}

Expand Down
30 changes: 15 additions & 15 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ int openNewIncrAofForAppend(void) {
* is already synced at this point so fsync doesn't matter. */
if (server.aof_fd != -1) {
aof_background_fsync_and_close(server.aof_fd);
server.aof_last_fsync = server.unixtime;
server.aof_last_fsync = server.mstime;
}
server.aof_fd = newfd;

Expand Down Expand Up @@ -954,7 +954,7 @@ void stopAppendOnly(void) {
if (redis_fsync(server.aof_fd) == -1) {
serverLog(LL_WARNING,"Fail to fsync the AOF file: %s",strerror(errno));
} else {
server.aof_last_fsync = server.unixtime;
server.aof_last_fsync = server.mstime;
}
close(server.aof_fd);

Expand Down Expand Up @@ -998,7 +998,7 @@ int startAppendOnly(void) {
return C_ERR;
}
}
server.aof_last_fsync = server.unixtime;
server.aof_last_fsync = server.mstime;
/* If AOF fsync error in bio job, we just ignore it and log the event. */
int aof_bio_fsync_status;
atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status);
Expand Down Expand Up @@ -1074,7 +1074,7 @@ void flushAppendOnlyFile(int force) {
* the data in page cache cannot be flushed in time. */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_last_incr_fsync_offset != server.aof_last_incr_size &&
server.unixtime > server.aof_last_fsync &&
server.mstime - server.aof_last_fsync >= 1000 &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;

Expand Down Expand Up @@ -1109,9 +1109,9 @@ void flushAppendOnlyFile(int force) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
server.aof_flush_postponed_start = server.mstime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
} else if (server.mstime - server.aof_flush_postponed_start < 2000) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
Expand Down Expand Up @@ -1260,15 +1260,15 @@ void flushAppendOnlyFile(int force) {
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.unixtime;
server.aof_last_fsync = server.mstime;
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync) {
server.mstime - server.aof_last_fsync >= 1000) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
}
server.aof_last_fsync = server.unixtime;
server.aof_last_fsync = server.mstime;
}
}

Expand Down Expand Up @@ -2244,7 +2244,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
int j;
long key_count = 0;
long long updated_time = 0;
dbIterator *dbit = NULL;
kvstoreIterator *kvs_it = NULL;

/* Record timestamp at the beginning of rewriting AOF. */
if (server.aof_timestamp_enabled) {
Expand All @@ -2258,15 +2258,15 @@ int rewriteAppendOnlyFileRio(rio *aof) {
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db + j;
if (dbSize(db, DB_MAIN) == 0) continue;
if (kvstoreSize(db->keys) == 0) continue;

/* SELECT the new DB */
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;

dbit = dbIteratorInit(db, DB_MAIN);
kvs_it = kvstoreIteratorInit(db->keys);
/* Iterate this DB writing every entry */
while((de = dbIteratorNext(dbit)) != NULL) {
while((de = kvstoreIteratorNext(kvs_it)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
Expand Down Expand Up @@ -2331,12 +2331,12 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (server.rdb_key_save_delay)
debugDelay(server.rdb_key_save_delay);
}
dbReleaseIterator(dbit);
kvstoreIteratorRelease(kvs_it);
}
return C_OK;

werr:
if (dbit) dbReleaseIterator(dbit);
if (kvs_it) kvstoreIteratorRelease(kvs_it);
return C_ERR;
}

Expand Down
26 changes: 9 additions & 17 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,12 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;

c->bstate.timeout = timeout;
if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
}

for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dictionary ignore it. */
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) {
Expand All @@ -392,7 +397,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
listAddNodeTail(l,c);
dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l));


/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
* wants to be awakened if key is deleted (like XREADGROUP) */
if (unblock_on_nokey) {
Expand Down Expand Up @@ -703,6 +707,9 @@ static void moduleUnblockClientOnKey(client *c, robj *key) {
* we want to remove the pending flag to indicate we already responded to the
* command with timeout reply. */
void unblockClientOnTimeout(client *c) {
/* The client has been unlocked (in the moduleUnblocked list), return ASAP. */
if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;

replyToBlockedClientTimedOut(c);
if (c->flags & CLIENT_PENDING_COMMAND)
c->flags &= ~CLIENT_PENDING_COMMAND;
Expand All @@ -720,21 +727,6 @@ void unblockClientOnError(client *c, const char *err_str) {
unblockClient(c, 1);
}

/* sets blocking_keys to the total number of keys which has at least one client blocked on them
* sets blocking_keys_on_nokey to the total number of keys which has at least one client
* blocked on them to be written or deleted */
void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey) {
unsigned long bkeys=0, bkeys_on_nokey=0;
for (int j = 0; j < server.dbnum; j++) {
bkeys += dictSize(server.db[j].blocking_keys);
bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey);
}
if (blocking_keys)
*blocking_keys = bkeys;
if (bloking_keys_on_nokey)
*bloking_keys_on_nokey = bkeys_on_nokey;
}

void blockedBeforeSleep(void) {
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
Expand Down
18 changes: 18 additions & 0 deletions src/cli_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,21 @@ sds cliVersion(void) {
}
return version;
}

/* This is a wrapper to call redisConnect or redisConnectWithTimeout. */
redisContext *redisConnectWrapper(const char *ip, int port, const struct timeval tv) {
if (tv.tv_sec == 0 && tv.tv_usec == 0) {
return redisConnect(ip, port);
} else {
return redisConnectWithTimeout(ip, port, tv);
}
}

/* This is a wrapper to call redisConnectUnix or redisConnectUnixWithTimeout. */
redisContext *redisConnectUnixWrapper(const char *path, const struct timeval tv) {
if (tv.tv_sec == 0 && tv.tv_usec == 0) {
return redisConnectUnix(path);
} else {
return redisConnectUnixWithTimeout(path, tv);
}
}
3 changes: 3 additions & 0 deletions src/cli_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ sds escapeJsonString(sds s, const char *p, size_t len);

sds cliVersion(void);

redisContext *redisConnectWrapper(const char *ip, int port, const struct timeval tv);
redisContext *redisConnectUnixWrapper(const char *path, const struct timeval tv);

#endif /* __CLICOMMON_H */
10 changes: 5 additions & 5 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ static int shouldReturnTlsInfo(void) {
}

unsigned int countKeysInSlot(unsigned int slot) {
return dictSize(server.db->dict[slot]);
return kvstoreDictSize(server.db->keys, slot);
}

void clusterCommandHelp(client *c) {
Expand Down Expand Up @@ -917,16 +917,16 @@ void clusterCommand(client *c) {
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c,numkeys);
dictIterator *iter = NULL;
kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
iter = dictGetIterator(server.db->dict[slot]);
kvs_di = kvstoreGetDictIterator(server.db->keys, slot);
for (unsigned int i = 0; i < numkeys; i++) {
de = dictNext(iter);
de = kvstoreDictIteratorNext(kvs_di);
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
}
dictReleaseIterator(iter);
kvstoreReleaseDictIterator(kvs_di);
} else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
!strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
/* CLUSTER SLAVES <NODE ID> */
Expand Down
Loading

0 comments on commit c12e446

Please sign in to comment.