diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 121c583..2439716 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -143,6 +143,8 @@ add_executable(clusterclient_all_nodes clusterclient_all_nodes.c) target_link_libraries(clusterclient_all_nodes hiredis_cluster hiredis ${SSL_LIBRARY}) add_executable(clusterclient_async clusterclient_async.c) target_link_libraries(clusterclient_async hiredis_cluster hiredis ${SSL_LIBRARY} ${LIBEVENT_LIBRARY}) +add_executable(clusterclient_reconnect_async clusterclient_reconnect_async.c) +target_link_libraries(clusterclient_reconnect_async hiredis_cluster hiredis ${SSL_LIBRARY} ${LIBEVENT_LIBRARY}) add_test(NAME set-get-test COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/set-get-test.sh" "$" @@ -173,3 +175,7 @@ add_test(NAME dbsize-to-all-nodes-test COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/dbsize-to-all-nodes-test.sh" "$" WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") +add_test(NAME reconnect-test + COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/reconnect-test.sh" + "$" + WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") diff --git a/tests/clusterclient_reconnect_async.c b/tests/clusterclient_reconnect_async.c new file mode 100644 index 0000000..c0a0b01 --- /dev/null +++ b/tests/clusterclient_reconnect_async.c @@ -0,0 +1,111 @@ +/* + * This program connects to a Redis node and then reads commands from stdin, such + * as "SET foo bar", one per line and prints the results to stdout. + * + * The behaviour is similar to that of clusterclient_async.c, but it sends the + * next command after receiving a reply from the previous command. It also works + * for standalone Redis nodes (without cluster mode), and uses the + * redisClusterAsyncCommandToNode function to send the command to the first node. + * If it receives any I/O error, the program performs a reconnect. + */ + +#include "adapters/libevent.h" +#include "hircluster.h" +#include "test_utils.h" +#include +#include +#include +#include + +/* Unfortunately there is no error code for this error to match */ +#define REDIS_ENOCLUSTER "ERR This instance has cluster support disabled" + +void sendNextCommand(int, short, void *); + +void connectToRedis(redisClusterAsyncContext *acc) { + /* reset Redis context in case of reconnect */ + redisClusterAsyncDisconnect(acc); + + int status = redisClusterConnect2(acc->cc); + if (status == REDIS_OK) { + // cluster mode + } else if (acc->cc->err && strcmp(acc->cc->errstr, REDIS_ENOCLUSTER) == 0) { + printf("[no cluster]\n"); + acc->cc->err = 0; + memset(acc->cc->errstr, '\0', strlen(acc->cc->errstr)); + } else { + printf("Connect error: %s\n", acc->cc->errstr); + exit(-1); + } +} + +void replyCallback(redisClusterAsyncContext *acc, void *r, void *privdata) { + UNUSED(privdata); + redisReply *reply = (redisReply *)r; + + if (reply == NULL && + (acc->err == REDIS_ERR_IO || acc->err == REDIS_ERR_EOF)) { + printf("[reconnect]\n"); + connectToRedis(acc); + } else { + printf("%s\n", reply->str); + } + + // schedule reading from stdin and sending next command + event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); +} + +void sendNextCommand(int fd, short kind, void *arg) { + UNUSED(fd); + UNUSED(kind); + redisClusterAsyncContext *acc = arg; + + char command[256]; + if (fgets(command, 256, stdin)) { + size_t len = strlen(command); + if (command[len - 1] == '\n') // Chop trailing line break + command[len - 1] = '\0'; + + dictIterator di; + dictInitIterator(&di, acc->cc->nodes); + + dictEntry *de = dictNext(&di); + assert(de); + cluster_node *node = dictGetEntryVal(de); + assert(node); + + int status = redisClusterAsyncCommandToNode(acc, node, replyCallback, + NULL, command); + ASSERT_MSG(status == REDIS_OK, acc->errstr); + } else { + // disconnect if nothing is left to read from stdin + redisClusterAsyncDisconnect(acc); + } +} + +int main(int argc, char **argv) { + if (argc <= 1) { + fprintf(stderr, "Usage: %s HOST:PORT\n", argv[0]); + exit(1); + } + const char *initnode = argv[1]; + + redisClusterAsyncContext *acc = redisClusterAsyncContextInit(); + assert(acc); + redisClusterSetOptionAddNodes(acc->cc, initnode); + redisClusterSetOptionRouteUseSlots(acc->cc); + + struct event_base *base = event_base_new(); + int status = redisClusterLibeventAttach(acc, base); + assert(status == REDIS_OK); + + connectToRedis(acc); + // schedule reading from stdin and sending next command + event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); + + event_base_dispatch(base); + + redisClusterAsyncFree(acc); + event_base_free(base); + return 0; +} diff --git a/tests/scripts/reconnect-test.sh b/tests/scripts/reconnect-test.sh new file mode 100755 index 0000000..04de6a3 --- /dev/null +++ b/tests/scripts/reconnect-test.sh @@ -0,0 +1,64 @@ +#!/bin/sh + +# Usage: $0 /path/to/clusterclient-binary + +clientprog=${1:-./clusterclient_reconnect_async} +testname=reconnect-test + +# Sync process just waiting for server to be ready to accept connection. +perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' & +syncpid=$! + +# Start simulated server +timeout 5s ./simulated-redis.pl -p 7400 -d --sigcont $syncpid <<'EOF' & +EXPECT CONNECT +EXPECT ["CLUSTER", "SLOTS"] +SEND -ERR This instance has cluster support disabled +EXPECT CLOSE +EXPECT CONNECT +EXPECT ["SET", "foo", "bar"] +SEND +OK +EXPECT ["GET", "foo"] +SEND "bar" +CLOSE +EXPECT CONNECT +EXPECT ["CLUSTER", "SLOTS"] +SEND -ERR This instance has cluster support disabled +EXPECT CLOSE +EXPECT CONNECT +EXPECT ["GET", "foo"] +SEND "bar" +EXPECT CLOSE +EOF +server=$! + +# Wait until server is ready to accept client connection +wait $syncpid; + +# Run client +timeout 3s "$clientprog" 127.0.0.1:7400 > "$testname.out" <<'EOF' +SET foo bar +GET foo +GET foo +GET foo +EOF +clientexit=$? + +# Wait for server to exit +wait $server; serverexit=$? + +# Check exit statuses +if [ $serverexit -ne 0 ]; then + echo "Simulated server exited with status $serverexit" + exit $serverexit +fi +if [ $clientexit -ne 0 ]; then + echo "$clientprog exited with status $clientexit" + exit $clientexit +fi + +# Check the output from clusterclient +printf '[no cluster]\nOK\nbar\n[reconnect]\n[no cluster]\nbar\n' | cmp "$testname.out" - || exit 99 + +# Clean up +rm "$testname.out"