From e3431b6e7e366547d46f25d513de0e1abfae5215 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Wed, 12 Jun 2024 11:44:27 +0200 Subject: [PATCH 1/2] Remove support for splitting multi-key commands per slot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- libvalkeycluster/tests/ct_commands.c | 29 +- .../tests/ct_out_of_memory_handling.c | 87 +-- libvalkeycluster/tests/ct_pipeline.c | 36 - libvalkeycluster/tests/ut_parse_cmd.c | 2 +- src/command.c | 57 -- src/command.h | 6 +- src/valkeycluster.c | 614 +----------------- 7 files changed, 49 insertions(+), 782 deletions(-) diff --git a/libvalkeycluster/tests/ct_commands.c b/libvalkeycluster/tests/ct_commands.c index 5281f7f6..fa5a742e 100644 --- a/libvalkeycluster/tests/ct_commands.c +++ b/libvalkeycluster/tests/ct_commands.c @@ -10,24 +10,23 @@ void test_exists(valkeyClusterContext *cc) { valkeyReply *reply; - reply = (valkeyReply *)valkeyClusterCommand(cc, "SET key1 Hello"); + reply = valkeyClusterCommand(cc, "SET {key}1 Hello"); CHECK_REPLY_OK(cc, reply); freeReplyObject(reply); - reply = (valkeyReply *)valkeyClusterCommand(cc, "EXISTS key1"); + reply = valkeyClusterCommand(cc, "EXISTS {key}1"); CHECK_REPLY_INT(cc, reply, 1); freeReplyObject(reply); - reply = (valkeyReply *)valkeyClusterCommand(cc, "EXISTS nosuchkey"); + reply = valkeyClusterCommand(cc, "EXISTS nosuch{key}"); CHECK_REPLY_INT(cc, reply, 0); freeReplyObject(reply); - reply = (valkeyReply *)valkeyClusterCommand(cc, "SET key2 World"); + reply = valkeyClusterCommand(cc, "SET {key}2 World"); CHECK_REPLY_OK(cc, reply); freeReplyObject(reply); - reply = - (valkeyReply *)valkeyClusterCommand(cc, "EXISTS key1 key2 nosuchkey"); + reply = valkeyClusterCommand(cc, "EXISTS {key}1 {key}2 nosuch{key}"); CHECK_REPLY_INT(cc, reply, 2); freeReplyObject(reply); } @@ -61,39 +60,39 @@ void test_bitfield_ro(valkeyClusterContext *cc) { void test_mset(valkeyClusterContext *cc) { valkeyReply *reply; - reply = (valkeyReply *)valkeyClusterCommand( - cc, "MSET key1 mset1 key2 mset2 key3 mset3"); + reply = + valkeyClusterCommand(cc, "MSET {key}1 mset1 {key}2 mset2 {key}3 mset3"); CHECK_REPLY_OK(cc, reply); freeReplyObject(reply); - reply = (valkeyReply *)valkeyClusterCommand(cc, "GET key1"); + reply = valkeyClusterCommand(cc, "GET {key}1"); CHECK_REPLY_STR(cc, reply, "mset1"); freeReplyObject(reply); - reply = (valkeyReply *)valkeyClusterCommand(cc, "GET key2"); + reply = valkeyClusterCommand(cc, "GET {key}2"); CHECK_REPLY_STR(cc, reply, "mset2"); freeReplyObject(reply); - reply = (valkeyReply *)valkeyClusterCommand(cc, "GET key3"); + reply = valkeyClusterCommand(cc, "GET {key}3"); CHECK_REPLY_STR(cc, reply, "mset3"); freeReplyObject(reply); } void test_mget(valkeyClusterContext *cc) { valkeyReply *reply; - reply = (valkeyReply *)valkeyClusterCommand(cc, "SET key1 mget1"); + reply = valkeyClusterCommand(cc, "SET {key}1 mget1"); CHECK_REPLY_OK(cc, reply); freeReplyObject(reply); - reply = (valkeyReply *)valkeyClusterCommand(cc, "SET key2 mget2"); + reply = valkeyClusterCommand(cc, "SET {key}2 mget2"); CHECK_REPLY_OK(cc, reply); freeReplyObject(reply); - reply = (valkeyReply *)valkeyClusterCommand(cc, "SET key3 mget3"); + reply = valkeyClusterCommand(cc, "SET {key}3 mget3"); CHECK_REPLY_OK(cc, reply); freeReplyObject(reply); - reply = (valkeyReply *)valkeyClusterCommand(cc, "MGET key1 key2 key3"); + reply = valkeyClusterCommand(cc, "MGET {key}1 {key}2 {key}3"); CHECK_REPLY_ARRAY(cc, reply, 3); CHECK_REPLY_STR(cc, reply->element[0], "mget1"); CHECK_REPLY_STR(cc, reply->element[1], "mget2"); diff --git a/libvalkeycluster/tests/ct_out_of_memory_handling.c b/libvalkeycluster/tests/ct_out_of_memory_handling.c index a7c7ba68..6ca0f816 100644 --- a/libvalkeycluster/tests/ct_out_of_memory_handling.c +++ b/libvalkeycluster/tests/ct_out_of_memory_handling.c @@ -185,33 +185,14 @@ void test_alloc_failure_handling(void) { valkeyReply *reply; const char *cmd = "SET key value"; - for (int i = 0; i < 36; ++i) { + for (int i = 0; i < 35; ++i) { prepare_allocation_test(cc, i); reply = (valkeyReply *)valkeyClusterCommand(cc, cmd); assert(reply == NULL); ASSERT_STR_EQ(cc->errstr, "Out of memory"); } - prepare_allocation_test(cc, 36); - reply = (valkeyReply *)valkeyClusterCommand(cc, cmd); - CHECK_REPLY_OK(cc, reply); - freeReplyObject(reply); - } - - // Multi key command - { - valkeyReply *reply; - const char *cmd = "MSET key1 v1 key2 v2 key3 v3"; - - for (int i = 0; i < 77; ++i) { - prepare_allocation_test(cc, i); - reply = (valkeyReply *)valkeyClusterCommand(cc, cmd); - assert(reply == NULL); - ASSERT_STR_EQ(cc->errstr, "Out of memory"); - } - - // Multi-key commands - prepare_allocation_test(cc, 77); + prepare_allocation_test(cc, 35); reply = (valkeyReply *)valkeyClusterCommand(cc, cmd); CHECK_REPLY_OK(cc, reply); freeReplyObject(reply); @@ -245,7 +226,7 @@ void test_alloc_failure_handling(void) { valkeyReply *reply; const char *cmd = "SET foo one"; - for (int i = 0; i < 37; ++i) { + for (int i = 0; i < 36; ++i) { prepare_allocation_test(cc, i); result = valkeyClusterAppendCommand(cc, cmd); assert(result == VALKEY_ERR); @@ -257,7 +238,7 @@ void test_alloc_failure_handling(void) { for (int i = 0; i < 4; ++i) { // Appended command lost when receiving error from valkey // during a GetReply, needs a new append for each test loop - prepare_allocation_test(cc, 37); + prepare_allocation_test(cc, 36); result = valkeyClusterAppendCommand(cc, cmd); assert(result == VALKEY_OK); @@ -269,7 +250,7 @@ void test_alloc_failure_handling(void) { valkeyClusterReset(cc); } - prepare_allocation_test(cc, 37); + prepare_allocation_test(cc, 36); result = valkeyClusterAppendCommand(cc, cmd); assert(result == VALKEY_OK); @@ -280,44 +261,6 @@ void test_alloc_failure_handling(void) { freeReplyObject(reply); } - // Append multi-key command - { - valkeyReply *reply; - const char *cmd = "MSET key1 val1 key2 val2 key3 val3"; - - for (int i = 0; i < 90; ++i) { - prepare_allocation_test(cc, i); - result = valkeyClusterAppendCommand(cc, cmd); - assert(result == VALKEY_ERR); - ASSERT_STR_EQ(cc->errstr, "Out of memory"); - - valkeyClusterReset(cc); - } - - for (int i = 0; i < 12; ++i) { - prepare_allocation_test(cc, 90); - result = valkeyClusterAppendCommand(cc, cmd); - assert(result == VALKEY_OK); - - prepare_allocation_test(cc, i); - result = valkeyClusterGetReply(cc, (void *)&reply); - assert(result == VALKEY_ERR); - ASSERT_STR_EQ(cc->errstr, "Out of memory"); - - valkeyClusterReset(cc); - } - - prepare_allocation_test(cc, 90); - result = valkeyClusterAppendCommand(cc, cmd); - assert(result == VALKEY_OK); - - prepare_allocation_test(cc, 12); - result = valkeyClusterGetReply(cc, (void *)&reply); - assert(result == VALKEY_OK); - CHECK_REPLY_OK(cc, reply); - freeReplyObject(reply); - } - // Append command to node { valkeyReply *reply; @@ -412,7 +355,7 @@ void test_alloc_failure_handling(void) { freeReplyObject(reply); /* Test ASK reply handling with OOM */ - for (int i = 0; i < 50; ++i) { + for (int i = 0; i < 49; ++i) { prepare_allocation_test(cc, i); reply = valkeyClusterCommand(cc, "GET foo"); assert(reply == NULL); @@ -420,7 +363,7 @@ void test_alloc_failure_handling(void) { } /* Test ASK reply handling without OOM */ - prepare_allocation_test(cc, 50); + prepare_allocation_test(cc, 49); reply = valkeyClusterCommand(cc, "GET foo"); CHECK_REPLY_STR(cc, reply, "one"); freeReplyObject(reply); @@ -441,7 +384,7 @@ void test_alloc_failure_handling(void) { freeReplyObject(reply); /* Test MOVED reply handling with OOM */ - for (int i = 0; i < 34; ++i) { + for (int i = 0; i < 33; ++i) { prepare_allocation_test(cc, i); reply = valkeyClusterCommand(cc, "GET foo"); assert(reply == NULL); @@ -449,7 +392,7 @@ void test_alloc_failure_handling(void) { } /* Test MOVED reply handling without OOM */ - prepare_allocation_test(cc, 34); + prepare_allocation_test(cc, 33); reply = valkeyClusterCommand(cc, "GET foo"); CHECK_REPLY_STR(cc, reply, "one"); freeReplyObject(reply); @@ -597,18 +540,18 @@ void test_alloc_failure_handling_async(void) { { const char *cmd1 = "SET foo one"; - for (int i = 0; i < 38; ++i) { + for (int i = 0; i < 37; ++i) { prepare_allocation_test_async(acc, i); result = valkeyClusterAsyncCommand(acc, commandCallback, &r1, cmd1); assert(result == VALKEY_ERR); - if (i != 36) { + if (i != 35) { ASSERT_STR_EQ(acc->errstr, "Out of memory"); } else { ASSERT_STR_EQ(acc->errstr, "Failed to attach event adapter"); } } - prepare_allocation_test_async(acc, 38); + prepare_allocation_test_async(acc, 37); result = valkeyClusterAsyncCommand(acc, commandCallback, &r1, cmd1); ASSERT_MSG(result == VALKEY_OK, acc->errstr); } @@ -619,16 +562,16 @@ void test_alloc_failure_handling_async(void) { { const char *cmd2 = "GET foo"; - for (int i = 0; i < 15; ++i) { + for (int i = 0; i < 14; ++i) { prepare_allocation_test_async(acc, i); result = valkeyClusterAsyncCommand(acc, commandCallback, &r2, cmd2); assert(result == VALKEY_ERR); ASSERT_STR_EQ(acc->errstr, "Out of memory"); } - /* Skip iteration 15, errstr not set by libvalkey when valkeyFormatSdsCommandArgv() fails. */ + /* Skip iteration 14, errstr not set by libvalkey when valkeyFormatSdsCommandArgv() fails. */ - prepare_allocation_test_async(acc, 16); + prepare_allocation_test_async(acc, 15); result = valkeyClusterAsyncCommand(acc, commandCallback, &r2, cmd2); ASSERT_MSG(result == VALKEY_OK, acc->errstr); } diff --git a/libvalkeycluster/tests/ct_pipeline.c b/libvalkeycluster/tests/ct_pipeline.c index a38c7dd8..ca86fdf7 100644 --- a/libvalkeycluster/tests/ct_pipeline.c +++ b/libvalkeycluster/tests/ct_pipeline.c @@ -57,40 +57,6 @@ void test_pipeline(void) { valkeyClusterFree(cc); } -// Test of pipelines containing multi-node commands -void test_pipeline_with_multinode_commands(void) { - valkeyClusterContext *cc = valkeyClusterContextInit(); - assert(cc); - - int status; - status = valkeyClusterSetOptionAddNodes(cc, CLUSTER_NODE); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); - - status = valkeyClusterConnect2(cc); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); - - status = - valkeyClusterAppendCommand(cc, "MSET key1 Hello key2 World key3 !"); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); - - status = valkeyClusterAppendCommand(cc, "MGET key1 key2 key3"); - ASSERT_MSG(status == VALKEY_OK, cc->errstr); - - valkeyReply *reply; - valkeyClusterGetReply(cc, (void *)&reply); - CHECK_REPLY_OK(cc, reply); - freeReplyObject(reply); - - valkeyClusterGetReply(cc, (void *)&reply); - CHECK_REPLY_ARRAY(cc, reply, 3); - CHECK_REPLY_STR(cc, reply->element[0], "Hello"); - CHECK_REPLY_STR(cc, reply->element[1], "World"); - CHECK_REPLY_STR(cc, reply->element[2], "!"); - freeReplyObject(reply); - - valkeyClusterFree(cc); -} - //------------------------------------------------------------------------------ // Async API //------------------------------------------------------------------------------ @@ -173,10 +139,8 @@ void test_async_pipeline(void) { int main(void) { test_pipeline(); - test_pipeline_with_multinode_commands(); test_async_pipeline(); - // Asynchronous API does not support multi-key commands return 0; } diff --git a/libvalkeycluster/tests/ut_parse_cmd.c b/libvalkeycluster/tests/ut_parse_cmd.c index 8cb0f55c..8c3532d3 100644 --- a/libvalkeycluster/tests/ut_parse_cmd.c +++ b/libvalkeycluster/tests/ut_parse_cmd.c @@ -75,7 +75,7 @@ void test_valkey_parse_cmd_mset(void) { c->clen = len; valkey_parse_cmd(c); ASSERT_MSG(c->result == CMD_PARSE_OK, "Parse not OK"); - ASSERT_KEYS(c, "foo", "bar"); + ASSERT_KEYS(c, "foo"); command_destroy(c); } diff --git a/src/command.c b/src/command.c index 14fb0d11..4a861ed5 100644 --- a/src/command.c +++ b/src/command.c @@ -136,42 +136,6 @@ cmddef *valkey_lookup_cmd(const char *arg0, uint32_t arg0_len, const char *arg1, return NULL; } -/* - * Return true, if the valkey command is a vector command accepting one or - * more keys, otherwise return false - * Format: command key [ key ... ] - */ -static int valkey_argx(struct cmd *r) { - switch (r->type) { - case CMD_REQ_VALKEY_EXISTS: - case CMD_REQ_VALKEY_MGET: - case CMD_REQ_VALKEY_DEL: - return 1; - - default: - break; - } - - return 0; -} - -/* - * Return true, if the valkey command is a vector command accepting one or - * more key-value pairs, otherwise return false - * Format: command key value [ key value ... ] - */ -static int valkey_argkvx(struct cmd *r) { - switch (r->type) { - case CMD_REQ_VALKEY_MSET: - return 1; - - default: - break; - } - - return 0; -} - /* Parses a bulk string starting at 'p' and ending somewhere before 'end'. * Returns the remaining of the input after consuming the bulk string. The * pointers *str and *len are pointed to the parsed string and its length. On @@ -353,22 +317,6 @@ void valkey_parse_cmd(struct cmd *r) { if (!push_keypos(r, arg, arglen)) goto oom; - /* Special commands where we want all keys (not only the first key). */ - if (valkey_argx(r) || valkey_argkvx(r)) { - /* argx: MGET key [ key ... ] */ - /* argkvx: MSET key value [ key value ... ] */ - if (valkey_argkvx(r) && rnarg % 2 == 0) - goto error; - for (uint32_t i = 2; i < rnarg; i++) { - if ((p = valkey_parse_bulk(p, end, &arg, &arglen)) == NULL) - goto error; - if (valkey_argkvx(r) && i % 2 == 0) - continue; /* not a key */ - if (!push_keypos(r, arg, arglen)) - goto oom; - } - } - done: ASSERT(r->type > CMD_UNKNOWN && r->type < CMD_SENTINEL); r->result = CMD_PARSE_OK; @@ -424,7 +372,6 @@ struct cmd *command_get(void) { command->slot_num = -1; command->frag_seq = NULL; command->reply = NULL; - command->sub_commands = NULL; command->node_addr = NULL; command->keys = vkarray_create(1, sizeof(struct keypos)); @@ -464,10 +411,6 @@ void command_destroy(struct cmd *command) { freeReplyObject(command->reply); - if (command->sub_commands != NULL) { - listRelease(command->sub_commands); - } - if (command->node_addr != NULL) { sdsfree(command->node_addr); command->node_addr = NULL; diff --git a/src/command.h b/src/command.h index be123a7e..eafd90a3 100644 --- a/src/command.h +++ b/src/command.h @@ -90,17 +90,13 @@ struct cmd { /* Command destination */ int slot_num; /* Command should be sent to slot. * Set to -1 if command is sent to a given node, - * or if a slot can not be found or calculated, - * or if its a multi-key command cross different - * nodes (cross slot) */ + * or if a slot can not be found or calculated. */ char *node_addr; /* Command sent to this node address */ struct cmd * *frag_seq; /* sequence of fragment command, map from keys to fragments*/ valkeyReply *reply; - - hilist *sub_commands; /* just for pipeline and multi-key commands */ }; void valkey_parse_cmd(struct cmd *r); diff --git a/src/valkeycluster.c b/src/valkeycluster.c index 0977a218..af1c6046 100644 --- a/src/valkeycluster.c +++ b/src/valkeycluster.c @@ -2422,426 +2422,6 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc, return reply; } -static int command_pre_fragment(valkeyClusterContext *cc, struct cmd *command, - hilist *commands) { - - struct keypos *kp, *sub_kp; - uint32_t key_count; - uint32_t i, j; - uint32_t idx; - uint32_t key_len; - int slot_num = -1; - struct cmd *sub_command; - struct cmd **sub_commands = NULL; - char num_str[12]; - uint8_t num_str_len; - - if (command == NULL || commands == NULL) { - goto done; - } - - key_count = vkarray_n(command->keys); - - sub_commands = vk_calloc(VALKEYCLUSTER_SLOTS, sizeof(*sub_commands)); - if (sub_commands == NULL) { - goto oom; - } - - command->frag_seq = vk_calloc(key_count, sizeof(*command->frag_seq)); - if (command->frag_seq == NULL) { - goto oom; - } - - // Fill sub_command with key, slot and command length (clen, only keylength) - for (i = 0; i < key_count; i++) { - kp = vkarray_get(command->keys, i); - - slot_num = keyHashSlot(kp->start, kp->end - kp->start); - - if (slot_num < 0 || slot_num >= VALKEYCLUSTER_SLOTS) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "keyHashSlot return error"); - goto done; - } - - if (sub_commands[slot_num] == NULL) { - sub_commands[slot_num] = command_get(); - if (sub_commands[slot_num] == NULL) { - goto oom; - } - } - - command->frag_seq[i] = sub_command = sub_commands[slot_num]; - - sub_command->narg++; - - sub_kp = vkarray_push(sub_command->keys); - if (sub_kp == NULL) { - goto oom; - } - - sub_kp->start = kp->start; - sub_kp->end = kp->end; - - // Number of characters in key - key_len = (uint32_t)(kp->end - kp->start); - - sub_command->clen += key_len + uint_len(key_len); - - sub_command->slot_num = slot_num; - - if (command->type == CMD_REQ_VALKEY_MSET) { - uint32_t len = 0; - char *p; - - for (p = sub_kp->end + 1; !isdigit(*p); p++) { - } - - p = sub_kp->end + 1; - while (!isdigit(*p)) { - p++; - } - - for (; isdigit(*p); p++) { - len = len * 10 + (uint32_t)(*p - '0'); - } - - len += CRLF_LEN * 2; - len += (p - sub_kp->end); - sub_kp->remain_len = len; - sub_command->clen += len; - } - } - - /* prepend command header */ - for (i = 0; i < VALKEYCLUSTER_SLOTS; i++) { - sub_command = sub_commands[i]; - if (sub_command == NULL) { - continue; - } - - idx = 0; - if (command->type == CMD_REQ_VALKEY_MGET) { - //"*%d\r\n$4\r\nmget\r\n" - - sub_command->clen += 5 * sub_command->narg; - - sub_command->narg++; - - vk_itoa(num_str, sub_command->narg); - num_str_len = (uint8_t)(strlen(num_str)); - - sub_command->clen += 13 + num_str_len; - - sub_command->cmd = - vk_calloc(sub_command->clen, sizeof(*sub_command->cmd)); - if (sub_command->cmd == NULL) { - goto oom; - } - - sub_command->cmd[idx++] = '*'; - memcpy(sub_command->cmd + idx, num_str, num_str_len); - idx += num_str_len; - memcpy(sub_command->cmd + idx, "\r\n$4\r\nmget\r\n", 12); - idx += 12; - - for (j = 0; j < vkarray_n(sub_command->keys); j++) { - kp = vkarray_get(sub_command->keys, j); - key_len = (uint32_t)(kp->end - kp->start); - vk_itoa(num_str, key_len); - num_str_len = strlen(num_str); - - sub_command->cmd[idx++] = '$'; - memcpy(sub_command->cmd + idx, num_str, num_str_len); - idx += num_str_len; - memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN); - idx += CRLF_LEN; - memcpy(sub_command->cmd + idx, kp->start, key_len); - idx += key_len; - memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN); - idx += CRLF_LEN; - } - } else if (command->type == CMD_REQ_VALKEY_DEL) { - //"*%d\r\n$3\r\ndel\r\n" - - sub_command->clen += 5 * sub_command->narg; - - sub_command->narg++; - - vk_itoa(num_str, sub_command->narg); - num_str_len = (uint8_t)strlen(num_str); - - sub_command->clen += 12 + num_str_len; - - sub_command->cmd = - vk_calloc(sub_command->clen, sizeof(*sub_command->cmd)); - if (sub_command->cmd == NULL) { - goto oom; - } - - sub_command->cmd[idx++] = '*'; - memcpy(sub_command->cmd + idx, num_str, num_str_len); - idx += num_str_len; - memcpy(sub_command->cmd + idx, "\r\n$3\r\ndel\r\n", 11); - idx += 11; - - for (j = 0; j < vkarray_n(sub_command->keys); j++) { - kp = vkarray_get(sub_command->keys, j); - key_len = (uint32_t)(kp->end - kp->start); - vk_itoa(num_str, key_len); - num_str_len = strlen(num_str); - - sub_command->cmd[idx++] = '$'; - memcpy(sub_command->cmd + idx, num_str, num_str_len); - idx += num_str_len; - memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN); - idx += CRLF_LEN; - memcpy(sub_command->cmd + idx, kp->start, key_len); - idx += key_len; - memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN); - idx += CRLF_LEN; - } - } else if (command->type == CMD_REQ_VALKEY_EXISTS) { - //"*%d\r\n$6\r\nexists\r\n" - - sub_command->clen += 5 * sub_command->narg; - - sub_command->narg++; - - vk_itoa(num_str, sub_command->narg); - num_str_len = (uint8_t)strlen(num_str); - - sub_command->clen += 15 + num_str_len; - - sub_command->cmd = - vk_calloc(sub_command->clen, sizeof(*sub_command->cmd)); - if (sub_command->cmd == NULL) { - goto oom; - } - - sub_command->cmd[idx++] = '*'; - memcpy(sub_command->cmd + idx, num_str, num_str_len); - idx += num_str_len; - memcpy(sub_command->cmd + idx, "\r\n$6\r\nexists\r\n", 14); - idx += 14; - - for (j = 0; j < vkarray_n(sub_command->keys); j++) { - kp = vkarray_get(sub_command->keys, j); - key_len = (uint32_t)(kp->end - kp->start); - vk_itoa(num_str, key_len); - num_str_len = strlen(num_str); - - sub_command->cmd[idx++] = '$'; - memcpy(sub_command->cmd + idx, num_str, num_str_len); - idx += num_str_len; - memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN); - idx += CRLF_LEN; - memcpy(sub_command->cmd + idx, kp->start, key_len); - idx += key_len; - memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN); - idx += CRLF_LEN; - } - } else if (command->type == CMD_REQ_VALKEY_MSET) { - //"*%d\r\n$4\r\nmset\r\n" - - sub_command->clen += 3 * sub_command->narg; - - sub_command->narg *= 2; - - sub_command->narg++; - - vk_itoa(num_str, sub_command->narg); - num_str_len = (uint8_t)strlen(num_str); - - sub_command->clen += 13 + num_str_len; - - sub_command->cmd = - vk_calloc(sub_command->clen, sizeof(*sub_command->cmd)); - if (sub_command->cmd == NULL) { - goto oom; - } - - sub_command->cmd[idx++] = '*'; - memcpy(sub_command->cmd + idx, num_str, num_str_len); - idx += num_str_len; - memcpy(sub_command->cmd + idx, "\r\n$4\r\nmset\r\n", 12); - idx += 12; - - for (j = 0; j < vkarray_n(sub_command->keys); j++) { - kp = vkarray_get(sub_command->keys, j); - key_len = (uint32_t)(kp->end - kp->start); - vk_itoa(num_str, key_len); - num_str_len = strlen(num_str); - - sub_command->cmd[idx++] = '$'; - memcpy(sub_command->cmd + idx, num_str, num_str_len); - idx += num_str_len; - memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN); - idx += CRLF_LEN; - memcpy(sub_command->cmd + idx, kp->start, - key_len + kp->remain_len); - idx += key_len + kp->remain_len; - } - } else { - NOT_REACHED(); - } - - sub_command->type = command->type; - - if (listAddNodeTail(commands, sub_command) == NULL) { - goto oom; - } - sub_commands[i] = NULL; - } - -done: - vk_free(sub_commands); - - if (slot_num >= 0 && commands != NULL && listLength(commands) == 1) { - listNode *list_node = listFirst(commands); - listDelNode(commands, list_node); - if (command->frag_seq) { - vk_free(command->frag_seq); - command->frag_seq = NULL; - } - - command->slot_num = slot_num; - } - return slot_num; - -oom: - __valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - if (sub_commands != NULL) { - for (i = 0; i < VALKEYCLUSTER_SLOTS; i++) { - command_destroy(sub_commands[i]); - } - } - vk_free(sub_commands); - return -1; // failing slot_num -} - -static void *command_post_fragment(valkeyClusterContext *cc, - struct cmd *command, hilist *commands) { - struct cmd *sub_command; - listNode *list_node; - valkeyReply *reply = NULL, *sub_reply; - long long count = 0; - - listIter li; - listRewind(commands, &li); - - while ((list_node = listNext(&li)) != NULL) { - sub_command = list_node->value; - reply = sub_command->reply; - if (reply == NULL) { - return NULL; - } else if (reply->type == VALKEY_REPLY_ERROR) { - return reply; - } - - if (command->type == CMD_REQ_VALKEY_MGET) { - if (reply->type != VALKEY_REPLY_ARRAY) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "reply type error"); - return NULL; - } - } else if (command->type == CMD_REQ_VALKEY_DEL) { - if (reply->type != VALKEY_REPLY_INTEGER) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "reply type error"); - return NULL; - } - count += reply->integer; - } else if (command->type == CMD_REQ_VALKEY_EXISTS) { - if (reply->type != VALKEY_REPLY_INTEGER) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "reply type error"); - return NULL; - } - count += reply->integer; - } else if (command->type == CMD_REQ_VALKEY_MSET) { - if (reply->type != VALKEY_REPLY_STATUS || reply->len != 2 || - strcmp(reply->str, VALKEY_STATUS_OK) != 0) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "reply type error"); - return NULL; - } - } else { - NOT_REACHED(); - } - } - - reply = vk_calloc(1, sizeof(*reply)); - if (reply == NULL) { - goto oom; - } - - if (command->type == CMD_REQ_VALKEY_MGET) { - int i; - uint32_t key_count; - - reply->type = VALKEY_REPLY_ARRAY; - - key_count = vkarray_n(command->keys); - - reply->elements = key_count; - reply->element = vk_calloc(key_count, sizeof(*reply->element)); - if (reply->element == NULL) { - goto oom; - } - - for (i = key_count - 1; i >= 0; i--) { /* for each key */ - sub_reply = command->frag_seq[i]->reply; /* get it's reply */ - if (sub_reply == NULL) { - freeReplyObject(reply); - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "sub reply is null"); - return NULL; - } - - if (sub_reply->type == VALKEY_REPLY_STRING) { - reply->element[i] = sub_reply; - } else if (sub_reply->type == VALKEY_REPLY_ARRAY) { - if (sub_reply->elements == 0) { - freeReplyObject(reply); - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "sub reply elements error"); - return NULL; - } - - reply->element[i] = sub_reply->element[sub_reply->elements - 1]; - sub_reply->elements--; - } - } - } else if (command->type == CMD_REQ_VALKEY_DEL) { - reply->type = VALKEY_REPLY_INTEGER; - reply->integer = count; - } else if (command->type == CMD_REQ_VALKEY_EXISTS) { - reply->type = VALKEY_REPLY_INTEGER; - reply->integer = count; - } else if (command->type == CMD_REQ_VALKEY_MSET) { - reply->type = VALKEY_REPLY_STATUS; - uint32_t str_len = strlen(VALKEY_STATUS_OK); - reply->str = vk_malloc((str_len + 1) * sizeof(char)); - if (reply->str == NULL) { - goto oom; - } - - reply->len = str_len; - memcpy(reply->str, VALKEY_STATUS_OK, str_len); - reply->str[str_len] = '\0'; - } else { - NOT_REACHED(); - } - - return reply; - -oom: - freeReplyObject(reply); - __valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - return NULL; -} - /* * Split the command into subcommands by slot * @@ -2850,14 +2430,14 @@ static void *command_post_fragment(valkeyClusterContext *cc, * error; Otherwise if the commands > 1 , slot_num is the last subcommand slot * number. */ -static int command_format_by_slot(valkeyClusterContext *cc, struct cmd *command, - hilist *commands) { +static int command_format_by_slot(valkeyClusterContext *cc, + struct cmd *command) { struct keypos *kp; int key_count; int slot_num = -1; - if (cc == NULL || commands == NULL || command == NULL || - command->cmd == NULL || command->clen <= 0) { + if (cc == NULL || command == NULL || command->cmd == NULL || + command->clen <= 0) { goto done; } @@ -2885,8 +2465,6 @@ static int command_format_by_slot(valkeyClusterContext *cc, struct cmd *command, goto done; } - slot_num = command_pre_fragment(cc, command, commands); - done: return slot_num; @@ -2928,9 +2506,7 @@ void *valkeyClusterFormattedCommand(valkeyClusterContext *cc, char *cmd, int len) { valkeyReply *reply = NULL; int slot_num; - struct cmd *command = NULL, *sub_command; - hilist *commands = NULL; - listNode *list_node; + struct cmd *command = NULL; if (cc == NULL) { return NULL; @@ -2949,15 +2525,7 @@ void *valkeyClusterFormattedCommand(valkeyClusterContext *cc, char *cmd, command->cmd = cmd; command->clen = len; - commands = listCreate(); - if (commands == NULL) { - goto oom; - } - - commands->free = listCommandFree; - - slot_num = command_format_by_slot(cc, command, commands); - + slot_num = command_format_by_slot(cc, command); if (slot_num < 0) { goto error; } else if (slot_num >= VALKEYCLUSTER_SLOTS) { @@ -2966,41 +2534,9 @@ void *valkeyClusterFormattedCommand(valkeyClusterContext *cc, char *cmd, goto error; } - // all keys belong to one slot - if (listLength(commands) == 0) { - reply = valkey_cluster_command_execute(cc, command); - goto done; - } - - ASSERT(listLength(commands) != 1); - - listIter li; - listRewind(commands, &li); - - while ((list_node = listNext(&li)) != NULL) { - sub_command = list_node->value; - - reply = valkey_cluster_command_execute(cc, sub_command); - if (reply == NULL) { - goto error; - } else if (reply->type == VALKEY_REPLY_ERROR) { - goto done; - } - - sub_command->reply = reply; - } - - reply = command_post_fragment(cc, command, commands); - -done: - + reply = valkey_cluster_command_execute(cc, command); command->cmd = NULL; command_destroy(command); - - if (commands != NULL) { - listRelease(commands); - } - cc->retry_count = 0; return reply; @@ -3013,9 +2549,6 @@ void *valkeyClusterFormattedCommand(valkeyClusterContext *cc, char *cmd, command->cmd = NULL; command_destroy(command); } - if (commands != NULL) { - listRelease(commands); - } cc->retry_count = 0; return NULL; } @@ -3137,9 +2670,7 @@ void *valkeyClusterCommandArgv(valkeyClusterContext *cc, int argc, int valkeyClusterAppendFormattedCommand(valkeyClusterContext *cc, char *cmd, int len) { int slot_num; - struct cmd *command = NULL, *sub_command; - hilist *commands = NULL; - listNode *list_node; + struct cmd *command = NULL; if (cc->requests == NULL) { cc->requests = listCreate(); @@ -3157,14 +2688,7 @@ int valkeyClusterAppendFormattedCommand(valkeyClusterContext *cc, char *cmd, command->cmd = cmd; command->clen = len; - commands = listCreate(); - if (commands == NULL) { - goto oom; - } - - commands->free = listCommandFree; - - slot_num = command_format_by_slot(cc, command, commands); + slot_num = command_format_by_slot(cc, command); if (slot_num < 0) { goto error; @@ -3174,34 +2698,10 @@ int valkeyClusterAppendFormattedCommand(valkeyClusterContext *cc, char *cmd, goto error; } - // Append command(s) - if (listLength(commands) == 0) { - // All keys belong to one slot - if (__valkeyClusterAppendCommand(cc, command) != VALKEY_OK) { - goto error; - } - } else { - // Keys belongs to different slots - ASSERT(listLength(commands) != 1); - - listIter li; - listRewind(commands, &li); - - while ((list_node = listNext(&li)) != NULL) { - sub_command = list_node->value; - - if (__valkeyClusterAppendCommand(cc, sub_command) != VALKEY_OK) { - goto error; - } - } + if (__valkeyClusterAppendCommand(cc, command) != VALKEY_OK) { + goto error; } - if (listLength(commands) > 0) { - command->sub_commands = commands; - } else { - listRelease(commands); - } - commands = NULL; command->cmd = NULL; if (listAddNodeTail(cc->requests, command) == NULL) { @@ -3218,13 +2718,6 @@ int valkeyClusterAppendFormattedCommand(valkeyClusterContext *cc, char *cmd, command->cmd = NULL; command_destroy(command); } - if (commands != NULL) { - listRelease(commands); - } - - /* Attention: mybe here we must pop the - sub_commands that had append to the nodes. - But now we do not handle it. */ return VALKEY_ERR; } @@ -3428,12 +2921,9 @@ static int valkeyClusterClearAll(valkeyClusterContext *cc) { } int valkeyClusterGetReply(valkeyClusterContext *cc, void **reply) { - - struct cmd *command, *sub_command; - hilist *commands = NULL; - listNode *list_command, *list_sub_command; + struct cmd *command; + listNode *list_command; int slot_num; - void *sub_reply; if (cc == NULL || reply == NULL) return VALKEY_ERR; @@ -3461,14 +2951,14 @@ int valkeyClusterGetReply(valkeyClusterContext *cc, void **reply) { goto error; } + /* Get reply when the command was sent via slot */ slot_num = command->slot_num; if (slot_num >= 0) { - /* Command was sent via single slot */ listDelNode(cc->requests, list_command); return __valkeyClusterGetReply(cc, slot_num, reply); - - } else if (command->node_addr) { - /* Command was sent to a single node */ + } + /* Get reply when the command was sent to a given node */ + if (command->node_addr != NULL) { dictEntry *de; de = dictFind(cc->nodes, command->node_addr); @@ -3483,48 +2973,6 @@ int valkeyClusterGetReply(valkeyClusterContext *cc, void **reply) { } } - commands = command->sub_commands; - if (commands == NULL) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "sub_commands in command is null"); - goto error; - } - - ASSERT(listLength(commands) != 1); - - listIter li; - listRewind(commands, &li); - - while ((list_sub_command = listNext(&li)) != NULL) { - sub_command = list_sub_command->value; - if (sub_command == NULL) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "sub_command is null"); - goto error; - } - - slot_num = sub_command->slot_num; - if (slot_num < 0) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "sub_command slot_num is less then zero"); - goto error; - } - - if (__valkeyClusterGetReply(cc, slot_num, &sub_reply) != VALKEY_OK) { - goto error; - } - - sub_command->reply = sub_reply; - } - - *reply = command_post_fragment(cc, command, commands); - if (*reply == NULL) { - goto error; - } - - listDelNode(cc->requests, list_command); - return VALKEY_OK; - error: listDelNode(cc->requests, list_command); @@ -4121,7 +3569,6 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc, valkeyClusterNode *node; valkeyAsyncContext *ac; struct cmd *command = NULL; - hilist *commands = NULL; cluster_async_data *cad; if (acc == NULL) { @@ -4152,14 +3599,7 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc, memcpy(command->cmd, cmd, len); command->clen = len; - commands = listCreate(); - if (commands == NULL) { - goto oom; - } - - commands->free = listCommandFree; - - slot_num = command_format_by_slot(cc, command, commands); + slot_num = command_format_by_slot(cc, command); if (slot_num < 0) { __valkeyClusterAsyncSetError(acc, cc->err, cc->errstr); @@ -4170,16 +3610,6 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc, goto error; } - // all keys not belong to one slot - if (listLength(commands) > 0) { - ASSERT(listLength(commands) != 1); - - __valkeyClusterAsyncSetError( - acc, VALKEY_ERR_OTHER, - "Asynchronous API now not support multi-key command"); - goto error; - } - node = node_get_by_table(cc, (uint32_t)slot_num); if (node == NULL) { /* Initiate a slotmap update since the slot is not served. */ @@ -4211,11 +3641,6 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc, if (status != VALKEY_OK) { goto error; } - - if (commands != NULL) { - listRelease(commands); - } - return VALKEY_OK; oom: @@ -4224,9 +3649,6 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc, error: command_destroy(command); - if (commands != NULL) { - listRelease(commands); - } return VALKEY_ERR; } From c92d39b450633ce7a62eca858feeae04623e7e1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Thu, 13 Jun 2024 10:07:21 +0200 Subject: [PATCH 2/2] Refactor function to prepare cluster commands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename and remove unnecessary checks in internal function: command_format_by_slot(). Signed-off-by: Björn Svensson --- src/valkeycluster.c | 77 ++++++++++----------------------------------- 1 file changed, 17 insertions(+), 60 deletions(-) diff --git a/src/valkeycluster.c b/src/valkeycluster.c index af1c6046..55cc10d6 100644 --- a/src/valkeycluster.c +++ b/src/valkeycluster.c @@ -2422,52 +2422,31 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc, return reply; } -/* - * Split the command into subcommands by slot - * - * Returns slot_num - * If slot_num < 0 or slot_num >= VALKEYCLUSTER_SLOTS means this function runs - * error; Otherwise if the commands > 1 , slot_num is the last subcommand slot - * number. - */ -static int command_format_by_slot(valkeyClusterContext *cc, - struct cmd *command) { - struct keypos *kp; - int key_count; - int slot_num = -1; - - if (cc == NULL || command == NULL || command->cmd == NULL || - command->clen <= 0) { - goto done; +/* Prepare command by parsing the string to find the key and to get the slot. */ +static int prepareCommand(valkeyClusterContext *cc, struct cmd *command) { + if (command->cmd == NULL || command->clen <= 0) { + return VALKEY_ERR; } valkey_parse_cmd(command); if (command->result == CMD_PARSE_ENOMEM) { __valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - goto done; - } else if (command->result != CMD_PARSE_OK) { + return VALKEY_ERR; + } + if (command->result != CMD_PARSE_OK) { __valkeyClusterSetError(cc, VALKEY_ERR_PROTOCOL, command->errstr); - goto done; + return VALKEY_ERR; } - - key_count = vkarray_n(command->keys); - - if (key_count <= 0) { + if (vkarray_n(command->keys) <= 0) { __valkeyClusterSetError( cc, VALKEY_ERR_OTHER, "No keys in command(must have keys for valkey cluster mode)"); - goto done; - } else if (key_count == 1) { - kp = vkarray_get(command->keys, 0); - slot_num = keyHashSlot(kp->start, kp->end - kp->start); - command->slot_num = slot_num; - - goto done; + return VALKEY_ERR; } -done: - - return slot_num; + struct keypos *kp = vkarray_get(command->keys, 0); + command->slot_num = keyHashSlot(kp->start, kp->end - kp->start); + return VALKEY_OK; } /* Deprecated function, replaced with valkeyClusterSetOptionMaxRetry() */ @@ -2505,7 +2484,6 @@ int valkeyClusterSetEventCallback(valkeyClusterContext *cc, void *valkeyClusterFormattedCommand(valkeyClusterContext *cc, char *cmd, int len) { valkeyReply *reply = NULL; - int slot_num; struct cmd *command = NULL; if (cc == NULL) { @@ -2521,16 +2499,10 @@ void *valkeyClusterFormattedCommand(valkeyClusterContext *cc, char *cmd, if (command == NULL) { goto oom; } - command->cmd = cmd; command->clen = len; - slot_num = command_format_by_slot(cc, command); - if (slot_num < 0) { - goto error; - } else if (slot_num >= VALKEYCLUSTER_SLOTS) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "slot_num is out of range"); + if (prepareCommand(cc, command) != VALKEY_OK) { goto error; } @@ -2669,7 +2641,6 @@ void *valkeyClusterCommandArgv(valkeyClusterContext *cc, int argc, int valkeyClusterAppendFormattedCommand(valkeyClusterContext *cc, char *cmd, int len) { - int slot_num; struct cmd *command = NULL; if (cc->requests == NULL) { @@ -2684,17 +2655,10 @@ int valkeyClusterAppendFormattedCommand(valkeyClusterContext *cc, char *cmd, if (command == NULL) { goto oom; } - command->cmd = cmd; command->clen = len; - slot_num = command_format_by_slot(cc, command); - - if (slot_num < 0) { - goto error; - } else if (slot_num >= VALKEYCLUSTER_SLOTS) { - __valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "slot_num is out of range"); + if (prepareCommand(cc, command) != VALKEY_OK) { goto error; } @@ -3565,7 +3529,6 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc, valkeyClusterContext *cc; int status = VALKEY_OK; - int slot_num; valkeyClusterNode *node; valkeyAsyncContext *ac; struct cmd *command = NULL; @@ -3599,18 +3562,12 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc, memcpy(command->cmd, cmd, len); command->clen = len; - slot_num = command_format_by_slot(cc, command); - - if (slot_num < 0) { + if (prepareCommand(cc, command) != VALKEY_OK) { __valkeyClusterAsyncSetError(acc, cc->err, cc->errstr); goto error; - } else if (slot_num >= VALKEYCLUSTER_SLOTS) { - __valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER, - "slot_num is out of range"); - goto error; } - node = node_get_by_table(cc, (uint32_t)slot_num); + node = node_get_by_table(cc, (uint32_t)command->slot_num); if (node == NULL) { /* Initiate a slotmap update since the slot is not served. */ throttledUpdateSlotMapAsync(acc, NULL);