diff --git a/README.md b/README.md index 8a5b449..8c71885 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # redis-migrate-tool -**redis-migrate-tool** is a convenient and useful tool for migrating data between [redis](https://github.com/antirez/redis). +**redis-migrate-tool** is a convenient and useful tool for migrating data between [redis](https://github.com/antirez/redis). it has supported redis 5.0 rdb format v9. ## [中文介绍](http://www.oschina.net/p/redis-migrate-tool) diff --git a/src/rmt.c b/src/rmt.c index a67e284..ae500bd 100644 --- a/src/rmt.c +++ b/src/rmt.c @@ -497,7 +497,9 @@ int main(int argc,char *argv[]) if (rmt_ctx == NULL) { return RMT_ERROR; } - + + set_rate_limiting(rmti.rate_limiting); + core_core(rmt_ctx); destroy_context(rmt_ctx); diff --git a/src/rmt_core.c b/src/rmt_core.c index 940d178..9a09be6 100644 --- a/src/rmt_core.c +++ b/src/rmt_core.c @@ -415,7 +415,7 @@ static int writeThreadCron(struct aeEventLoop *eventLoop, long long id, void *cl if (trgroup->password) { sds reply; - reply = rmt_send_sync_cmd_read_line(tc->sd, "auth", trgroup->password, NULL); + reply = rmt_send_sync_auth(tc->sd, trgroup->password); if (sdslen(reply) == 0 || reply[0] == '-') { log_error("ERROR: password to %s is wrong", trnode->addr); sdsfree(reply); @@ -1597,11 +1597,10 @@ int prepare_send_msg(redis_node *srnode, struct msg *msg, redis_node *trnode) if (trgroup->password) { sds reply; - reply = rmt_send_sync_cmd_read_line(tc->sd, "auth", trgroup->password, NULL); + reply = rmt_send_sync_auth(tc->sd, trgroup->password); if (sdslen(reply) == 0 || reply[0] == '-') { log_error("ERROR: password to %s is wrong", trnode->addr); sdsfree(reply); - return RMT_ERROR; } sdsfree(reply); } diff --git a/src/rmt_core.h b/src/rmt_core.h index 4f516b8..f35b01c 100644 --- a/src/rmt_core.h +++ b/src/rmt_core.h @@ -172,6 +172,7 @@ struct instance { char *listen; int max_clients; + int rate_limiting; }; typedef struct rmtContext { @@ -291,6 +292,8 @@ void source_group_destroy(redis_group *srgroup); redis_group *target_group_create(rmtContext *ctx); void target_group_destroy(redis_group *trgroup); +void set_rate_limiting(int rl); + void redis_migrate(rmtContext *ctx, int type); void redis_check_data(rmtContext *ctx, int type); void redis_testinsert_data(rmtContext *ctx, int type); diff --git a/src/rmt_option.c b/src/rmt_option.c index 283a241..de9ffbc 100644 --- a/src/rmt_option.c +++ b/src/rmt_option.c @@ -32,6 +32,8 @@ #define RMT_OPTION_LISTEN_DEFAULT "127.0.0.1:8888" #define RMT_OPTION_MAX_CLIENTS_DEFAULT 100 +#define RMT_OPTION_RATE_LIMITING_DEFAULT 0 + static struct option long_options[] = { { "help", no_argument, NULL, 'h' }, { "version", no_argument, NULL, 'V' }, @@ -52,10 +54,11 @@ static struct option long_options[] = { { "from", required_argument, NULL, 'f' }, { "to", required_argument, NULL, 't' }, { "step", required_argument, NULL, 's' }, + { "rate-limiting", required_argument, NULL, 'l' }, { NULL, 0, NULL, 0 } }; -static char short_options[] = "hVdnIo:v:c:p:m:C:r:R:T:b:S:f:t:s:"; +static char short_options[] = "hVdnIo:v:c:p:m:C:r:R:T:b:S:f:t:s:l:"; void rmt_show_usage(void) @@ -88,6 +91,7 @@ rmt_show_usage(void) " -f, --from=S : set source redis address (default: %s)" CRLF " -t, --to=S : set target redis group address (default: %s)" CRLF " -s, --step=N : set step (default: %d)" CRLF + " -l, --rate-limiting : rate limit of payload to backend server (default %d)" CRLF "", RMT_LOG_DEFAULT, RMT_LOG_MIN, RMT_LOG_MAX, RMT_LOG_PATH != NULL ? RMT_LOG_PATH : "stderr", @@ -101,7 +105,8 @@ rmt_show_usage(void) RMT_OPTION_BUFFER_DEFAULT, RMT_SOURCE_ADDR, RMT_TARGET_ADDR, - RMT_OPTION_STEP_DEFAULT); + RMT_OPTION_STEP_DEFAULT, + RMT_OPTION_RATE_LIMITING_DEFAULT); rmt_show_command_usage(); } @@ -139,6 +144,7 @@ rmt_set_default_options(struct instance *nci) nci->listen = RMT_OPTION_LISTEN_DEFAULT; nci->max_clients = RMT_OPTION_MAX_CLIENTS_DEFAULT; + nci->rate_limiting = RMT_OPTION_RATE_LIMITING_DEFAULT; } r_status @@ -307,7 +313,14 @@ rmt_get_options(int argc, char **argv, struct instance *nci) nci->step = value; break; - + case 'l': + value = rmt_atoi(optarg, rmt_strlen(optarg)); + if (value < 0) { + log_stderr("redis-migrate-tool: option -s requires a number >=0"); + return RMT_ERROR; + } + nci->rate_limiting = value; + break; case '?': switch (optopt) { case 'o': diff --git a/src/rmt_redis.c b/src/rmt_redis.c index dfc58e6..58850d0 100644 --- a/src/rmt_redis.c +++ b/src/rmt_redis.c @@ -30,7 +30,7 @@ /* The current RDB version. When the format changes in a way that is no longer * backward compatible this number gets incremented. */ -#define REDIS_RDB_VERSION 7 +#define REDIS_RDB_VERSION 9 /* Defines related to the dump file format. To store 32 bits lengths for short * keys requires a lot of space, so we check the most significant 2 bits of @@ -47,7 +47,8 @@ * values, will fit inside. */ #define REDIS_RDB_6BITLEN 0 #define REDIS_RDB_14BITLEN 1 -#define REDIS_RDB_32BITLEN 2 +#define REDIS_RDB_32BITLEN 0x80 +#define REDIS_RDB_64BITLEN 0x81 #define REDIS_RDB_ENCVAL 3 #define REDIS_RDB_LENERR UINT_MAX @@ -66,6 +67,7 @@ #define REDIS_RDB_TYPE_SET 2 #define REDIS_RDB_TYPE_ZSET 3 #define REDIS_RDB_TYPE_HASH 4 +#define REDIS_RDB_TYPE_ZSET_2 5 /* Object types for encoded objects. */ #define REDIS_RDB_TYPE_HASH_ZIPMAP 9 @@ -79,6 +81,9 @@ #define rdbIsObjectType(t) ((t >= 0 && t <= 4) || (t >= 9 && t <= 13)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ +#define REDIS_RDB_OPCODE_MODULE_AUX 247 +#define REDIS_RDB_OPCODE_IDLE 248 +#define REDIS_RDB_OPCODE_FREQ 249 #define REDIS_RDB_OPCODE_AUX 250 #define REDIS_RDB_OPCODE_RESIZEDB 251 #define REDIS_RDB_OPCODE_EXPIRETIME_MS 252 @@ -162,8 +167,16 @@ static dictType groupNodesDictType = { dictGroupNodeDestructor /* val destructor */ }; +static int rateLimiting = 0; +static long long rateLimitingLastMsec = 0; +static int rateLimitingRequests = 0; + static int rmtRedisSlaveAgainOnline(redis_node *srnode); +void set_rate_limiting(int rl) { + rateLimiting = rl; +} + int redis_replication_init(redis_repl *rr) { if (rr == NULL) { @@ -845,6 +858,30 @@ char *rmt_send_sync_cmd_read_line(int fd, ...) { return sdsnew(buf); } +/* send auth in resp */ +char *rmt_send_sync_auth(int fd, sds passwd) { + sds cmd = sdsempty(); + char *arg, buf[256] = {'\0'}; + + cmd = sdscatprintf(cmd, "*2\r\n$4\r\nauth\r\n$%d\r\n%s\r\n", sdslen(passwd), passwd); + + /* Transfer command to the server. */ + if (rmt_sync_write(fd,cmd,(ssize_t)sdslen(cmd),1000) == -1) { + sdsfree(cmd); + return sdscatprintf(sdsempty(),"-Writing to redis: %s", + strerror(errno)); + } + sdsfree(cmd); + + /* Read the reply from the server. */ + if (rmt_sync_readline(fd,buf,sizeof(buf),250) == -1) + { + return sdscatprintf(sdsempty(),"-Reading from redis: %s", + strerror(errno)); + } + return sdsnew(buf); +} + /* ========================== Redis Replication ============================ */ /* Send a short redis command to master. @@ -5745,10 +5782,9 @@ static int redis_rdb_file_read(redis_rdb *rdb, void *buf, size_t len) return RMT_OK; } -static uint32_t redis_rdb_file_load_len(redis_rdb *rdb, int *isencoded) +static uint64_t redis_rdb_file_load_len(redis_rdb *rdb, int *isencoded) { unsigned char buf[2]; - uint32_t len; int type; if(rdb->fp == NULL) @@ -5779,14 +5815,24 @@ static uint32_t redis_rdb_file_load_len(redis_rdb *rdb, int *isencoded) } return (uint32_t)(((buf[0]&0x3F)<<8)|buf[1]); - } else { + } else if (buf[0] == REDIS_RDB_32BITLEN) { /* Read a 32 bit len. */ + uint32_t len; if (redis_rdb_file_read(rdb, &len, 4) != RMT_OK){ return REDIS_RDB_LENERR; } return ntohl(len); + } else if (buf[0] == REDIS_RDB_64BITLEN) { + /* Read a 64 bit len. */ + uint64_t len; + if (redis_rdb_file_read(rdb, &len, 8) != RMT_OK){ + return REDIS_RDB_LENERR; + } + return ntohu64(len); } + /* Never reached */ + return REDIS_RDB_LENERR; } /* Loads an integer-encoded object with the specified encoding type "enctype". @@ -5832,8 +5878,21 @@ static sds redis_rdb_file_load_double_str(redis_rdb *rdb) { } } +static sds redis_rdb_file_load_binary_double_str(redis_rdb *rdb) { + char buf[256]; + unsigned char len; + double score; + if (redis_rdb_file_read(rdb, &score, sizeof(score)) != RMT_OK) return NULL; + memrev64ifbe(&score); + + // convert double to str + sprintf(buf, "%lf", score); + + return sdsnewlen(buf, strlen(buf)); +} + static sds redis_rdb_file_load_lzf_str(redis_rdb *rdb) { - unsigned int len, clen; + uint64_t len, clen; unsigned char *c = NULL; sds val = NULL; @@ -5854,7 +5913,7 @@ static sds redis_rdb_file_load_lzf_str(redis_rdb *rdb) { static sds redis_rdb_file_load_str(redis_rdb *rdb) { int isencoded; - uint32_t len; + uint64_t len; sds str; if((len = redis_rdb_file_load_len(rdb, &isencoded)) @@ -5908,8 +5967,6 @@ static struct array *redis_rdb_file_load_value(redis_rdb *rdb, int rdbtype) value = NULL; elems = NULL; - log_debug(LOG_DEBUG, "rdbtype: %d", rdbtype); - if (rdbtype == REDIS_RDB_TYPE_STRING) { value = redis_value_create(1); if(value == NULL) @@ -5939,7 +5996,7 @@ static struct array *redis_rdb_file_load_value(redis_rdb *rdb, int rdbtype) str = array_push(value); if ((*str = redis_rdb_file_load_str(rdb)) == NULL) goto error; } - }else if (rdbtype == REDIS_RDB_TYPE_ZSET) { + }else if (rdbtype == REDIS_RDB_TYPE_ZSET || rdbtype == REDIS_RDB_TYPE_ZSET_2) { if ((len = redis_rdb_file_load_len(rdb,NULL)) == REDIS_RDB_LENERR) goto error; value = redis_value_create((uint32_t)(2*len)); @@ -5950,14 +6007,22 @@ static struct array *redis_rdb_file_load_value(redis_rdb *rdb, int rdbtype) while(len--) { if ((elem1 = redis_rdb_file_load_str(rdb)) == NULL) goto error; - if ((elem2 = redis_rdb_file_load_double_str(rdb)) == NULL) { - sdsfree(elem1); - goto error; + if (rdbtype == REDIS_RDB_TYPE_ZSET_2) { + if ((elem2 = redis_rdb_file_load_binary_double_str(rdb)) == NULL) { + sdsfree(elem1); + log_error("ERROR: redis_rdb_file_load_binary_double_str failed"); + goto error; + } + } else { + if ((elem2 = redis_rdb_file_load_double_str(rdb)) == NULL) { + sdsfree(elem1); + log_error("ERROR: redis_rdb_file_load_double_str failed"); + goto error; + } } str = array_push(value); *str = elem2; - ASSERT(sdsIsNum(*str) == 1); str = array_push(value); *str = elem1; } @@ -5987,7 +6052,7 @@ static struct array *redis_rdb_file_load_value(redis_rdb *rdb, int rdbtype) } while (len--) { - unsigned char *zl; + sds zl; unsigned int count; unsigned char *eptr, *sptr; unsigned char *vstr; @@ -6011,6 +6076,8 @@ static struct array *redis_rdb_file_load_value(redis_rdb *rdb, int rdbtype) eptr = ziplistNext(zl,eptr); } + + sdsfree(zl); } } else if (rdbtype == REDIS_RDB_TYPE_HASH_ZIPMAP || rdbtype == REDIS_RDB_TYPE_LIST_ZIPLIST || @@ -6208,6 +6275,7 @@ static int redis_object_type_get_by_rdbtype(int dbtype) return REDIS_SET; break; case REDIS_RDB_TYPE_ZSET: + case REDIS_RDB_TYPE_ZSET_2: case REDIS_RDB_TYPE_ZSET_ZIPLIST: return REDIS_ZSET; @@ -6283,6 +6351,24 @@ int redis_key_value_send(redis_node *srnode, sds key, struct msg *msg = NULL; uint32_t i; int mbuf_count = 0; + long long now_usec = rmt_usec_now(); + + + if (rateLimiting > 0) { + if (rateLimitingLastMsec == 0) { + rateLimitingRequests = 0; + } else if (now_usec - rateLimitingLastMsec <= 1 * 1000) { + rateLimitingRequests++; + /* check if trigger rate limiter */ + if (rateLimitingRequests * 1000 > rateLimiting) { + /* Just sleep wait until 1ms reached */ + usleep(now_usec - rateLimitingLastMsec); + } + } else { + rateLimitingRequests = 0; + } + rateLimitingLastMsec = now_usec; + } if (expiretime_type == RMT_TIME_SECOND) { if(expiretime * 1000 < now){ @@ -6403,6 +6489,7 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time) struct array *value; int data_type; int mbuf_count, mbuf_count_max; + long long lfu_freq, lru_idle; ASSERT(rdb->type == REDIS_RDB_TYPE_FILE); @@ -6442,7 +6529,7 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time) rdb->rdbver = rmt_atoi(buf+len, 4); if (rdb->rdbver < 1 || rdb->rdbver > REDIS_RDB_VERSION) { - log_error("ERROR: Can't handle RDB format version %d", + log_error("ERROR: Can't handle RDB format filename:%s version %d", rdb->fname, rdb->rdbver); goto error; } @@ -6492,7 +6579,25 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time) } expiretime_type = RMT_TIME_MILLISECOND; - } else if (type == REDIS_RDB_OPCODE_EOF) { + } else if (type == REDIS_RDB_OPCODE_FREQ) { + uint8_t byte = 0; + if (redis_rdb_file_read(rdb, &byte, 1) != RMT_OK) { + log_error("ERROR: redis rdb file:%s read freq error", rdb->fname); + goto eoferr; + } + + lfu_freq = byte; + continue; + } else if (type == REDIS_RDB_OPCODE_IDLE) { + uint64_t qword; + if ((qword = redis_rdb_file_load_len(rdb, NULL)) == REDIS_RDB_LENERR) { + log_error("ERROR: redis rdb file:%s read idle error", rdb->fname); + goto eoferr; + } + + lru_idle = qword; + continue; + } else if (type == REDIS_RDB_OPCODE_EOF) { break; } else if (type == REDIS_RDB_OPCODE_SELECTDB) { if ((dbid = redis_rdb_file_load_len(rdb, NULL)) @@ -6529,7 +6634,10 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time) sdsfree(auxkey); sdsfree(auxval); continue; - } + } else if (type == REDIS_RDB_OPCODE_MODULE_AUX) { + //redis has only checkmode now + continue; + } if ((key = redis_rdb_file_load_str(rdb)) == NULL) { log_error("ERROR: redis rdb file %s read key error", @@ -6549,9 +6657,6 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time) goto error; } - log_debug(LOG_DEBUG, "key: %s, value array length: %u", - key, array_n(value)); - if (rdb->handler != NULL && (srgroup->kind == GROUP_TYPE_SINGLE || srgroup->get_backend_node == NULL || srgroup->get_backend_node(srgroup, key, sdslen(key)) == srnode) && diff --git a/src/rmt_redis.h b/src/rmt_redis.h index a69bc43..8164c4b 100644 --- a/src/rmt_redis.h +++ b/src/rmt_redis.h @@ -189,7 +189,7 @@ int redis_rdb_init(redis_rdb *rdb, const char *addr, int type); void redis_rdb_deinit(redis_rdb *rdb); char *rmt_send_sync_cmd_read_line(int fd, ...); - +char *rmt_send_sync_auth(int fd, sds passwd); int rmtConnectRedisMaster(redis_node *srnode); void rmtRedisSlaveOffline(redis_node *srnode); diff --git a/src/rmt_util.h b/src/rmt_util.h index 7be11d8..e06bc0d 100644 --- a/src/rmt_util.h +++ b/src/rmt_util.h @@ -427,6 +427,14 @@ uint64_t intrev64(uint64_t v); #define intrev64ifbe(v) intrev64(v) #endif +#if (BYTE_ORDER == BIG_ENDIAN) +#define htonu64(v) (v) +#define ntohu64(v) (v) +#else +#define htonu64(v) intrev64(v) +#define ntohu64(v) intrev64(v) +#endif + sds getAbsolutePath(char *filename); int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase);