Skip to content

Commit

Permalink
Unified db rehash method for both standalone and cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
soloestoy committed Dec 8, 2023
1 parent 826b39e commit f25e226
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 91 deletions.
16 changes: 13 additions & 3 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,21 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
if (async) {
emptyDbAsync(&dbarray[j]);
} else {
dbDictMetadata *metadata;
for (int k = 0; k < dbarray[j].dict_count; k++) {
dictEmpty(dbarray[j].dict[k],callback);
metadata = (dbDictMetadata *)dictMetadata(dbarray[j].dict[k]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}

dictEmpty(dbarray[j].expires[k],callback);
metadata = (dbDictMetadata *)dictMetadata(dbarray[j].expires[k]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
}
}
/* Because all keys of database are removed, reset average ttl. */
Expand All @@ -682,8 +694,6 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
dbarray[j].sub_dict[subdict].key_count = 0;
dbarray[j].sub_dict[subdict].resize_cursor = -1;
if (server.cluster_enabled) {
if (dbarray[j].sub_dict[subdict].rehashing)
listEmpty(dbarray[j].sub_dict[subdict].rehashing);
dbarray[j].sub_dict[subdict].bucket_count = 0;
unsigned long long *slot_size_index = dbarray[j].sub_dict[subdict].slot_size_index;
memset(slot_size_index, 0, sizeof(unsigned long long) * (CLUSTER_SLOTS + 1));
Expand Down Expand Up @@ -1443,7 +1453,7 @@ size_t dbMemUsage(redisDb *db, dbKeyType keyType) {
unsigned long long keys_count = dbSize(db, keyType);
mem += keys_count * dictEntryMemUsage() +
dbBuckets(db, keyType) * sizeof(dictEntry*) +
db->dict_count * sizeof(dict);
db->dict_count * (sizeof(dict) + dictMetadataSize(db->dict[0]));
if (keyType == DB_MAIN) {
mem+=keys_count * sizeof(robj);
}
Expand Down
6 changes: 5 additions & 1 deletion src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ static void _dictReset(dict *d, int htidx)
/* Create a new hash table */
dict *dictCreate(dictType *type)
{
dict *d = zmalloc(sizeof(*d));
size_t metasize = type->dictMetadataBytes ? type->dictMetadataBytes(NULL) : 0;
dict *d = zmalloc(sizeof(*d)+metasize);
if (metasize > 0) {
memset(dictMetadata(d), 0, metasize);
}
_dictInit(d,type);
return d;
}
Expand Down
9 changes: 8 additions & 1 deletion src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ typedef struct dictType {
/* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists
* and are cleaned up after this callback. */
void (*rehashingCompleted)(dict *d);
/* Allow a dict to carry extra caller-defined metadata. The
* extra memory is initialized to 0 when a dict is allocated. */
size_t (*dictMetadataBytes)(dict *d);
/* Flags */
/* The 'no_value' flag, if set, indicates that values are not used, i.e. the
* dict is a set. When this flag is set, it's not possible to access the
Expand Down Expand Up @@ -88,6 +91,7 @@ struct dict {
/* Keep small vars at end for optimal (minimal) struct padding */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
void *metadata[];
};

/* If safe is set to 1 this is a safe iterator, that means, you can call
Expand Down Expand Up @@ -140,6 +144,10 @@ typedef struct {
(d)->type->keyCompare((d), key1, key2) : \
(key1) == (key2))

#define dictMetadata(d) (&(d)->metadata)
#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \
? (d)->type->dictMetadataBytes(d) : 0)

#define dictHashKey(d, key) ((d)->type->hashFunction(key))
#define dictBuckets(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1]))
#define dictSize(d) ((d)->ht_used[0]+(d)->ht_used[1])
Expand All @@ -166,7 +174,6 @@ dict *dictCreate(dictType *type);
dict **dictCreateMultiple(dictType *type, int count);
int dictExpand(dict *d, unsigned long size);
int dictTryExpand(dict *d, unsigned long size);
void *dictMetadata(dict *d);
int dictAdd(dict *d, void *key, void *val);
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
void *dictFindPositionForInsert(dict *d, const void *key, dictEntry **existing);
Expand Down
14 changes: 14 additions & 0 deletions src/lazyfree.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,20 @@ void freeObjAsync(robj *key, robj *obj, int dbid) {
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dbDictMetadata *metadata;
for (int i = 0; i < db->dict_count; i++) {
metadata = (dbDictMetadata *)dictMetadata(db->dict[i]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}

metadata = (dbDictMetadata *)dictMetadata(db->expires[i]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
}
dict **oldDict = db->dict;
dict **oldExpires = db->expires;
atomicIncr(lazyfree_objects,dbSize(db, DB_MAIN));
Expand Down
146 changes: 61 additions & 85 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,52 +419,61 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) {
}
}

/* Updates the bucket count in cluster-mode for the given dictionary in a DB. bucket count
* incremented with the new ht size during the rehashing phase.
* And also adds dictionary to the rehashing list in cluster mode, which allows us
/* Adds dictionary to the rehashing list, which allows us
* to quickly find rehash targets during incremental rehashing.
*
* In non-cluster mode, bucket count can be retrieved directly from single dict bucket and
* we don't need this list as there is only one dictionary per DB. */
void dictRehashingStarted(dict *d) {
if (!server.cluster_enabled) return;
*
* Updates the bucket count in cluster-mode for the given dictionary in a DB, bucket count
* incremented with the new ht size during the rehashing phase. In non-cluster mode,
* bucket count can be retrieved directly from single dict bucket. */
void dictRehashingStarted(dict *d, dbKeyType keyType) {
dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d);
listAddNodeTail(server.rehashing, d);
metadata->rehashing_node = listLast(server.rehashing);

if (!server.cluster_enabled) return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[DB_MAIN].bucket_count += to; /* Started rehashing (Add the new ht size) */
if (from == 0) return; /* No entries are to be moved. */
if (server.activerehashing) {
listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d);
}
server.db[0].sub_dict[keyType].bucket_count += to; /* Started rehashing (Add the new ht size) */
}

/* Updates the bucket count for the given dictionary in a DB. It removes
/* Remove dictionary from the rehashing list.
*
* Updates the bucket count for the given dictionary in a DB. It removes
* the old ht size of the dictionary from the total sum of buckets for a DB. */
void dictRehashingCompleted(dict *d) {
void dictRehashingCompleted(dict *d, dbKeyType keyType) {
dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}

if (!server.cluster_enabled) return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[DB_MAIN].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
server.db[0].sub_dict[keyType].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
}

void dictRehashingStartedForExpires(dict *d) {
if (!server.cluster_enabled) return;
void dbDictRehashingStarted(dict *d) {
dictRehashingStarted(d, DB_MAIN);
}

unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[DB_EXPIRES].bucket_count += to; /* Started rehashing (Add the new ht size) */
if (from == 0) return; /* No entries are to be moved. */
if (server.activerehashing) {
listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d);
}
void dbDictRehashingCompleted(dict *d) {
dictRehashingCompleted(d, DB_MAIN);
}

void dictRehashingCompletedForExpires(dict *d) {
if (!server.cluster_enabled) return;
void dbExpiresRehashingStarted(dict *d) {
dictRehashingStarted(d, DB_EXPIRES);
}

unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[DB_EXPIRES].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
void dbExpiresRehashingCompleted(dict *d) {
dictRehashingCompleted(d, DB_EXPIRES);
}

/* Returns the size of the DB dict metadata in bytes. */
size_t dbDictMetadataSize(dict *d) {
UNUSED(d);
/* NOTICE: this also affects overhead_ht_main and overhead_ht_expires in getMemoryOverheadData. */
return sizeof(dbDictMetadata);
}

/* Generic hash table type where keys are Redis Objects, Values
Expand Down Expand Up @@ -522,8 +531,9 @@ dictType dbDictType = {
dictSdsDestructor, /* key destructor */
dictObjectDestructor, /* val destructor */
dictExpandAllowed, /* allow to expand */
dictRehashingStarted,
dictRehashingCompleted,
dbDictRehashingStarted,
dbDictRehashingCompleted,
dbDictMetadataSize,
};

/* Db->expires */
Expand All @@ -535,8 +545,9 @@ dictType dbExpiresDictType = {
NULL, /* key destructor */
NULL, /* val destructor */
dictExpandAllowed, /* allow to expand */
dictRehashingStartedForExpires,
dictRehashingCompletedForExpires,
dbExpiresRehashingStarted,
dbExpiresRehashingCompleted,
dbDictMetadataSize,
};

/* Command table. sds string -> command struct pointer. */
Expand Down Expand Up @@ -683,45 +694,22 @@ void tryResizeHashTables(int dbid) {
*
* The function returns 1 if some rehashing was performed, otherwise 0
* is returned. */
int incrementallyRehash(int dbid) {
/* Rehash main and expire dictionary . */
if (server.cluster_enabled) {
listNode *node, *nextNode;
monotime timer;
elapsedStart(&timer);
/* Our goal is to rehash as many slot specific dictionaries as we can before reaching predefined threshold,
* while removing those that already finished rehashing from the queue. */
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.db[dbid].sub_dict[subdict].rehashing));
while ((node = listFirst(server.db[dbid].sub_dict[subdict].rehashing))) {
if (dictIsRehashing((dict *) listNodeValue(node))) {
dictRehashMilliseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_MS);
if (elapsedMs(timer) >= INCREMENTAL_REHASHING_THRESHOLD_MS) {
return 1; /* Reached the time limit. */
}
} else { /* It is possible that rehashing has already completed for this dictionary, simply remove it from the queue. */
nextNode = listNextNode(node);
listDelNode(server.db[dbid].sub_dict[subdict].rehashing, node);
node = nextNode;
}
}
}
/* When cluster mode is disabled, only one dict is used for the entire DB and rehashing list isn't populated. */
} else {
/* Rehash main dict. */
dict *main_dict = server.db[dbid].dict[0];
if (dictIsRehashing(main_dict)) {
dictRehashMilliseconds(main_dict, INCREMENTAL_REHASHING_THRESHOLD_MS);
return 1; /* already used our millisecond for this loop... */
}
/* Rehash expires. */
dict *expires_dict = server.db[dbid].expires[0];
if (dictIsRehashing(expires_dict)) {
dictRehashMilliseconds(expires_dict, INCREMENTAL_REHASHING_THRESHOLD_MS);
return 1; /* already used our millisecond for this loop... */
int incrementallyRehash(void) {
serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.rehashing));
if (listLength(server.rehashing) == 0) return 0;

/* Our goal is to rehash as many slot specific dictionaries as we can before reaching predefined threshold,
* after each dictionary completes rehashing, it removes itself from the list. */
listNode *node;
monotime timer;
elapsedStart(&timer);
while ((node = listFirst(server.rehashing))) {
dictRehashMilliseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_MS);
if (elapsedMs(timer) >= INCREMENTAL_REHASHING_THRESHOLD_MS) {
break; /* Reached the time limit. */
}
}
return 0;
return 1;
}

/* This function is called once a background process of some kind terminates,
Expand Down Expand Up @@ -1162,7 +1150,6 @@ void databasesCron(void) {
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;

Expand All @@ -1177,18 +1164,7 @@ void databasesCron(void) {

/* Rehash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
incrementallyRehash();
}
}
}
Expand Down Expand Up @@ -2653,7 +2629,6 @@ void makeThreadKillable(void) {

void initDbState(redisDb *db){
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
db->sub_dict[subdict].rehashing = listCreate();
db->sub_dict[subdict].non_empty_slots = 0;
db->sub_dict[subdict].key_count = 0;
db->sub_dict[subdict].resize_cursor = -1;
Expand Down Expand Up @@ -2753,6 +2728,7 @@ void initServer(void) {
initDbState(&server.db[j]);
listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}
server.rehashing = listCreate();
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType);
server.pubsub_patterns = dictCreate(&keylistDictType);
Expand Down
7 changes: 6 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,6 @@ typedef struct replBufBlock {
} replBufBlock;

typedef struct dbDictState {
list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */
int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used for cluster-enabled). */
int non_empty_slots; /* The number of non-empty slots. */
unsigned long long key_count; /* Total number of keys in this DB. */
Expand All @@ -983,6 +982,11 @@ typedef enum dbKeyType {
DB_EXPIRES
} dbKeyType;

/* Dict metadata for database, used for record the position in rehashing list. */
typedef struct dbDictMetadata {
listNode *rehashing_node; /* list node in rehashing list */
} dbDictMetadata;

/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
Expand Down Expand Up @@ -1568,6 +1572,7 @@ struct redisServer {
int hz; /* serverCron() calls frequency in hertz */
int in_fork_child; /* indication that this is a fork child */
redisDb *db;
list *rehashing; /* List of dictionaries in DBs that are currently rehashing. */
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
Expand Down

0 comments on commit f25e226

Please sign in to comment.