-
Notifications
You must be signed in to change notification settings - Fork 173
test_hiredisvip_async_ae.c
deep edited this page Dec 6, 2016
·
1 revision
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<time.h>
#include<signal.h>
#include<pthread.h>
#include<sys/socket.h>
#include<hircluster.h>
#include<adlist.h>
#include<async.h>
#include<adapters/ae.h>
static aeEventLoop *loop;
static redisClusterAsyncContext *acc;
static int null_count=0,ok_count=0,string_count=0,error_count=0,reply_null_count=0,array_count=0;
static long long total_cmd_count, callback_count;
static long long counter = 0;
struct data
{
long long key_num;
long long value_num;
};
void getCallback(redisClusterAsyncContext *acc, void *r, void *privdata) {
redisReply *reply = r;
struct data *dt = privdata;
callback_count ++;
if (reply == NULL)
{
reply_null_count ++;
goto done;
}
switch(reply->type)
{
case REDIS_REPLY_STRING:
string_count ++;
break;
case REDIS_REPLY_ARRAY:
array_count ++;
break;
case REDIS_REPLY_INTEGER:
break;
case REDIS_REPLY_NIL:
null_count ++;
break;
case REDIS_REPLY_STATUS:
if (strcmp(reply->str, "OK") == 0) {
ok_count ++;
}
break;
case REDIS_REPLY_ERROR:
error_count ++;
break;
default:
break;
}
done:
if(callback_count%50000 == 0)
{
printf("\n\n******%lld*******\nroute version: %d\n", counter, acc->cc->route_version);
printf("null_count: %d\n", null_count);
printf("ok_count: %d\n", ok_count);
printf("array_count: %d\n", array_count);
printf("string_count: %d\n", string_count);
printf("error_count: %d\n", error_count);
printf("reply_null_count: %d\n", reply_null_count);
null_count = ok_count = array_count = string_count = error_count = reply_null_count = 0;
counter ++;
}
if (dt != NULL) {
free(dt);
}
}
void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
return;
}
printf("Connected...\n");
}
void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("disconnect error: %s\n", c->errstr);
return;
}
printf("\nDisconnected...\n");
}
static int sd_notice_recieve;
static int sd_notice_send;
static hilist *l;
static pthread_mutex_t lmutex;
#define handle_one_time_count 1000
void handle_event(aeEventLoop *el, int fd, void *privdata, int mask)
{
listNode *node;
struct data *dt;
char buf[10];
int count = 0;
if(acc == NULL)
{
printf("error: privdata is null\n");
return;
}
pthread_mutex_lock(&lmutex);
while(listLength(l) > 0) {
if ((total_cmd_count-callback_count) > 10000) {
break;
}
read(sd_notice_recieve, buf, 1);
node = listFirst(l);
dt = (struct data *)(listNodeValue(node));
listDelNode(l, node);
redisClusterAsyncCommand(acc, getCallback, dt, "set %lld %lld", dt->key_num, dt->value_num);
total_cmd_count ++;
if (++count > handle_one_time_count)
break;
}
pthread_mutex_unlock(&lmutex);
}
void *event_run(void *args)
{
aeCreateFileEvent(loop, sd_notice_recieve, AE_READABLE, handle_event, acc);
aeMain(loop);
return 0;
}
//gcc -o test_hiredisvip_async_ae_new -lhiredis_vip -lpthread test_hiredisvip_async_ae_new.c ../redis-2.8.13/src/ae.o ../redis-2.8.13/src/zmalloc.o -I/usr/local/include -I/usr/local/include/hiredis-vip -L/usr/local/lib -I../redis-2.8.13/src/
int main(int argc, char **argv)
{
char *addr;
long long count;
pthread_t thread_id;
int fds_pipe[2];
signal(SIGPIPE, SIG_IGN);
callback_count = 0;
total_cmd_count = 0;
pthread_mutex_init(&lmutex,NULL);
if(argc < 2){
printf("Error: must have the first argument address\n");
return;
}
addr = argv[1];
int flags = HIRCLUSTER_FLAG_NULL;
acc = redisClusterAsyncConnect(addr, flags);
if (acc->err) {
/* Let *c leak for now... */
printf("connect error: %s\n", acc->errstr);
return -1;
}
loop = aeCreateEventLoop(6400000);
redisClusterAeAttach(loop, acc);
redisClusterAsyncSetConnectCallback(acc,connectCallback);
redisClusterAsyncSetDisconnectCallback(acc,disconnectCallback);
pipe(fds_pipe);
sd_notice_recieve = fds_pipe[0];
sd_notice_send = fds_pipe[1];
l = listCreate();
callback_count = 0;
pthread_create(&thread_id, NULL, event_run, loop);
count = 0;
for(;;)
{
struct data *dt = malloc(sizeof(struct data));
dt->key_num = count;
dt->value_num = count;
pthread_mutex_lock(&lmutex);
if (listLength(l) > 10000) {
pthread_mutex_unlock(&lmutex);
free(dt);
continue;
}
listAddNodeTail(l, dt);
pthread_mutex_unlock(&lmutex);
count ++;
write(sd_notice_send, "c", 1);
}
pthread_join(thread_id,NULL);
return 0;
}