Skip to content

Commit

Permalink
Refactor replica handling in CLUSTER NODES parsing
Browse files Browse the repository at this point in the history
Store parsed replicas in a separate dict during parsing, and move
them to their primary when all lines are parsed.
This dict owns the memory for added nodes until moved.

Previously the dict contained references to both replicas and primaries
and a bit harder to handle.

Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Oct 14, 2024
1 parent 25d584e commit a41a55f
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 130 deletions.
217 changes: 95 additions & 122 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,101 +504,6 @@ static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) {
}
}

static int cluster_master_slave_mapping_with_name(valkeyClusterContext *cc,
dict **nodes,
valkeyClusterNode *node,
sds master_name) {
int ret;
dictEntry *di;
valkeyClusterNode *node_old;
listNode *lnode;

if (node == NULL || master_name == NULL) {
return VALKEY_ERR;
}

if (*nodes == NULL) {
*nodes = dictCreate(&clusterNodesRefDictType, NULL);
if (*nodes == NULL) {
goto oom;
}
}

di = dictFind(*nodes, master_name);
if (di == NULL) {
sds key = sdsnewlen(master_name, sdslen(master_name));
if (key == NULL) {
goto oom;
}
ret = dictAdd(*nodes, key, node);
if (ret != DICT_OK) {
sdsfree(key);
goto oom;
}

} else {
node_old = dictGetEntryVal(di);
if (node_old == NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "dict get value null");
return VALKEY_ERR;
}

if (node->role == VALKEY_ROLE_MASTER &&
node_old->role == VALKEY_ROLE_MASTER) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"two masters have the same name");
return VALKEY_ERR;
} else if (node->role == VALKEY_ROLE_MASTER &&
node_old->role == VALKEY_ROLE_SLAVE) {
if (node->slaves == NULL) {
node->slaves = listCreate();
if (node->slaves == NULL) {
goto oom;
}

node->slaves->free = listClusterNodeDestructor;
}

if (node_old->slaves != NULL) {
while (listLength(node_old->slaves) > 0) {
lnode = listFirst(node_old->slaves);
if (listAddNodeHead(node->slaves, lnode->value) == NULL) {
goto oom;
}
node_old->slaves->free = NULL;
listDelNode(node_old->slaves, lnode);
}
listRelease(node_old->slaves);
node_old->slaves = NULL;
}

if (listAddNodeHead(node->slaves, node_old) == NULL) {
goto oom;
}
dictSetHashVal(*nodes, di, node);

} else if (node->role == VALKEY_ROLE_SLAVE) {
if (node_old->slaves == NULL) {
node_old->slaves = listCreate();
if (node_old->slaves == NULL) {
goto oom;
}

node_old->slaves->free = listClusterNodeDestructor;
}
if (listAddNodeTail(node_old->slaves, node) == NULL) {
goto oom;
}
}
}

return VALKEY_OK;

oom:
valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory");
return VALKEY_ERR;
}

/**
* Parse the "cluster slots" command reply to nodes dict.
*/
Expand Down Expand Up @@ -784,6 +689,93 @@ static dict *parse_cluster_slots(valkeyClusterContext *cc, valkeyReply *reply) {
return NULL;
}

/* Store a parsed replica node in given dict using the primary_id as key.
* Additional replicas for a primary are stored within the first replica. */
static int store_replica_node(dict **replicas, char *primary_id, valkeyClusterNode *node) {
if (*replicas == NULL) {
*replicas = dictCreate(&clusterNodesDictType, NULL);
if (replicas == NULL)
return VALKEY_ERR;
}

sds key = sdsnew(primary_id);
if (key == NULL)
return VALKEY_ERR;

dictEntry *de = dictFind(*replicas, key);
if (de == NULL) {
if (dictAdd(*replicas, key, node) != DICT_OK) {
sdsfree(key);
return VALKEY_ERR;
}
return VALKEY_OK;
}

/* Store replica node in the existing replica node. */
sdsfree(key);
valkeyClusterNode *n = dictGetEntryVal(de);
if (n->slaves == NULL) {
n->slaves = listCreate();
if (n->slaves == NULL)
return VALKEY_ERR;
n->slaves->free = listClusterNodeDestructor;
}
if (listAddNodeTail(n->slaves, node) == NULL)
return VALKEY_ERR;

return VALKEY_OK;
}

/* Move parsed replica nodes from the collection to related primary. */
static int move_replica_nodes(dict *replicas, dict *nodes) {
if (replicas == NULL)
return VALKEY_OK;

dictIterator di;
dictInitIterator(&di, nodes);
dictEntry *de;
while ((de = dictNext(&di))) {
valkeyClusterNode *primary = dictGetEntryVal(de);

/* Move all replica nodes related to this primary. */
dictEntry *der = dictFind(replicas, primary->name);
if (der != NULL) {
if (primary->slaves == NULL) {
primary->slaves = listCreate();
if (primary->slaves == NULL) {
return VALKEY_ERR;
}
primary->slaves->free = listClusterNodeDestructor;
}

/* Move all replicas stored in the first parsed replica. */
valkeyClusterNode *replica = dictGetEntryVal(der);
if (replica->slaves != NULL) {
while (listLength(replica->slaves) > 0) {
listNode *node = listFirst(replica->slaves);
if (listAddNodeTail(primary->slaves, node->value) == NULL) {
return VALKEY_ERR;
}
/* Delete element without freeing the moved cluster node. */
replica->slaves->free = NULL;
listDelNode(replica->slaves, node);
replica->slaves->free = listClusterNodeDestructor;
}
listRelease(replica->slaves);
replica->slaves = NULL;
}
/* Move replica that was parsed first. */
if (listAddNodeHead(primary->slaves, replica) == NULL) {
return VALKEY_ERR;
}
/* All replicas for this primary moved, set dict value
* to NULL avoiding freeing the moved memory. */
dictSetHashVal(replicas, der, NULL);
}
}
return VALKEY_OK;
}

/* Parse a node from a single CLUSTER NODES line. Only parse primary nodes if
* the 'replica_master_id' argument is NULL, otherwise replicas are parsed and
* its master_id is given via 'replica_master_id'. */
Expand Down Expand Up @@ -944,9 +936,9 @@ static int parse_cluster_nodes_line(valkeyClusterContext *cc, char *line,
*/
static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
dict *nodes = NULL;
dict *nodes_name = NULL;
int slot_ranges_found = 0;
int add_replicas = cc->flags & VALKEYCLUSTER_FLAG_ADD_SLAVE;
dict *replicas = NULL;

if (reply->type != VALKEY_REPLY_STRING) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Unexpected reply type");
Expand Down Expand Up @@ -991,24 +983,12 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
}
slot_ranges_found += listLength(node->slots);

if (add_replicas) {
if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, node->name) != VALKEY_OK) {
goto error;
}
}
} else {
assert(node->role == VALKEY_ROLE_SLAVE);
sds id = sdsnew(master_id);
if (id == NULL) {
if (store_replica_node(&replicas, master_id, node) != VALKEY_OK) {
freeValkeyClusterNode(node);
goto oom;
}
if (cluster_master_slave_mapping_with_name(cc, &nodes_name, node, id) != VALKEY_OK) {
freeValkeyClusterNode(node);
sdsfree(id);
goto error;
}
sdsfree(id);
}
}

Expand All @@ -1017,7 +997,11 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
goto error;
}

dictRelease(nodes_name);
/* Move parsed replica nodes to related primary nodes. */
if (move_replica_nodes(replicas, nodes) != VALKEY_OK) {
goto oom;
}
dictRelease(replicas);

return nodes;

Expand All @@ -1026,18 +1010,7 @@ static dict *parse_cluster_nodes(valkeyClusterContext *cc, valkeyReply *reply) {
// passthrough

error:
if (nodes_name != NULL) {
/* Only free parsed replicas since the `nodes` dict owns primary nodes. */
dictIterator di;
dictInitIterator(&di, nodes_name);
dictEntry *de;
while ((de = dictNext(&di))) {
valkeyClusterNode *node = dictGetEntryVal(de);
if (node->role == VALKEY_ROLE_SLAVE)
freeValkeyClusterNode(node);
}
dictRelease(nodes_name);
}
dictRelease(replicas);
dictRelease(nodes);
return NULL;
}
Expand Down
8 changes: 4 additions & 4 deletions tests/ct_out_of_memory_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ void test_alloc_failure_handling(void) {

// Connect
{
for (int i = 0; i < 91; ++i) {
for (int i = 0; i < 88; ++i) {
prepare_allocation_test(cc, i);
result = valkeyClusterConnect2(cc);
assert(result == VALKEY_ERR);
ASSERT_STR_EQ(cc->errstr, "Out of memory");
}

prepare_allocation_test(cc, 91);
prepare_allocation_test(cc, 88);
result = valkeyClusterConnect2(cc);
assert(result == VALKEY_OK);
}
Expand Down Expand Up @@ -521,14 +521,14 @@ void test_alloc_failure_handling_async(void) {

// Connect
{
for (int i = 0; i < 89; ++i) {
for (int i = 0; i < 86; ++i) {
prepare_allocation_test(acc->cc, i);
result = valkeyClusterConnect2(acc->cc);
assert(result == VALKEY_ERR);
ASSERT_STR_EQ(acc->cc->errstr, "Out of memory");
}

prepare_allocation_test(acc->cc, 89);
prepare_allocation_test(acc->cc, 86);
result = valkeyClusterConnect2(acc->cc);
assert(result == VALKEY_OK);
}
Expand Down
8 changes: 4 additions & 4 deletions tests/ut_slotmap_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ void test_parse_cluster_nodes_with_multiple_replicas(void) {
assert(strcmp(node->addr, "127.0.0.1:30004") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
node = listNodeValue(listNext(&li));
assert(strcmp(node->name, "824fe116063bc5fcf9f4ffd895bc17aee7731ac3") == 0);
assert(strcmp(node->addr, "127.0.0.1:30006") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
node = listNodeValue(listNext(&li));
assert(strcmp(node->name, "6ec23923021cf3ffec47632106199cb7f496ce01") == 0);
assert(strcmp(node->addr, "127.0.0.1:30005") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
node = listNodeValue(listNext(&li));
assert(strcmp(node->name, "824fe116063bc5fcf9f4ffd895bc17aee7731ac3") == 0);
assert(strcmp(node->addr, "127.0.0.1:30006") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
node = listNodeValue(listNext(&li));
assert(strcmp(node->name, "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1") == 0);
assert(strcmp(node->addr, "127.0.0.1:30002") == 0);
assert(node->role == VALKEY_ROLE_SLAVE);
Expand Down

0 comments on commit a41a55f

Please sign in to comment.