Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rohan broadcast support #129

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions command.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ redis_arg0(struct cmd *r)

case CMD_REQ_REDIS_DECR:
case CMD_REQ_REDIS_GET:
case CMD_REQ_REDIS_KEYS:
case CMD_REQ_REDIS_INCR:
case CMD_REQ_REDIS_STRLEN:

Expand Down Expand Up @@ -485,6 +486,11 @@ redis_parse_cmd(struct cmd *r)
break;

case 4:
if (str4icmp(m, 'k', 'e', 'y', 's')) {
r->type = CMD_REQ_REDIS_KEYS;
break;
}

if (str4icmp(m, 'p', 't', 't', 'l')) {
r->type = CMD_REQ_REDIS_PTTL;
break;
Expand Down
2 changes: 1 addition & 1 deletion command.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ typedef enum cmd_parse_result {
ACTION( RSP_REDIS_BULK ) \
ACTION( RSP_REDIS_MULTIBULK ) \
ACTION( SENTINEL ) \

ACTION( REQ_REDIS_KEYS )

#define DEFINE_ACTION(_name) CMD_##_name,
typedef enum cmd_type {
Expand Down
149 changes: 149 additions & 0 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -3769,6 +3769,33 @@ void *redisClusterCommand(redisClusterContext *cc, const char *format, ...) {
return reply;
}

void *redisClusterBroadcastCommand(redisClusterContext *cc, const char *format, ...){
va_list ap;
redisReply *reply = NULL;
char *cmd;
int len;

if(cc == NULL)
{
return NULL;
}
va_start(ap,format);
len = redisvFormatCommand(&cmd,format,ap);

if (len == -1) {
__redisClusterSetError(cc,REDIS_ERR_OOM,"Out of memory");
return NULL;
} else if (len == -2) {
__redisClusterSetError(cc,REDIS_ERR_OTHER,"Invalid format string");
return NULL;
}

reply=redisCLusterCommandSendAll(cc,cmd,len);
va_end(ap);
free(cmd);
return reply;
}

void *redisClusterCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen) {
redisReply *reply = NULL;
char *cmd;
Expand Down Expand Up @@ -4030,6 +4057,113 @@ static int redisCLusterSendAll(redisClusterContext *cc)
return REDIS_OK;
}

redisReply *redisCLusterCommandSendAll(redisClusterContext *cc,char *cmd,size_t len)
{
dictIterator *di;
dictEntry *de;
struct cluster_node *node;
redisContext *c = NULL;
void *aux = NULL;
int elements = 0;
if(cc == NULL || cc->nodes == NULL)
{
return NULL;
}

redisReply *reply=NULL;
reply = hi_calloc(1,sizeof(*reply));
if (reply == NULL)
{
__redisClusterSetError(cc,REDIS_ERR_OOM,"Out of memory");
return NULL;
}
reply->type = REDIS_REPLY_ARRAY;
di = dictGetIterator(cc->nodes);
while((de = dictNext(di)) != NULL)
{
int wdone = 0;
node = dictGetEntryVal(de);
if(node == NULL)
{
continue;
}

c = ctx_get_by_node(cc, node);
if(c == NULL)
{
continue;
}
if (__redisAppendCommand(c,cmd,len) != REDIS_OK)
{
continue;
}

/* Try to read pending replies */
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR){
free(reply);
return NULL;
}

if (c->flags & REDIS_BLOCK) {
/* Write until done */
do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR)
{
dictReleaseIterator(di);
free(reply);
return NULL;
}
} while (!wdone);

/* Read until there is a reply */
do {
if (redisBufferRead(c) == REDIS_ERR){
free(reply);
return NULL;
}
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR){
free(reply);
return NULL;
}
} while (aux == NULL);
}
redisReply *local_reply = (redisReply*)aux;
//Memory Allocation
if(reply->element==NULL){
size_t members=1;
if(local_reply->type==REDIS_REPLY_ARRAY){
members=local_reply->elements;
}
reply->element = hi_alloc(members*sizeof(*reply));
}
else{
if(local_reply->type==REDIS_REPLY_ARRAY){
reply->element = hi_realloc(reply->element,(elements+local_reply->elements)* sizeof(*reply));
}
else{
reply->element = hi_realloc(reply->element,(elements+1)* sizeof(*reply));
}

}
//Adding element within array
if(local_reply->type==REDIS_REPLY_ARRAY){
for(int i=0;i<local_reply->elements;i++){
reply->element[elements] = local_reply->element[i];
elements++;
}
local_reply->elements=NULL;
freeReplyObject(local_reply);
}
else{
reply->element[elements] = local_reply;
elements++;
}
}
reply->elements=elements;
dictReleaseIterator(di);
return reply;
}

static int redisCLusterClearAll(redisClusterContext *cc)
{
dictIterator *di;
Expand Down Expand Up @@ -4962,3 +5096,18 @@ void redisClusterAsyncFree(redisClusterAsyncContext *acc)
hi_free(acc);
}

void print_reply(redisReply *reply){
switch(reply->type){
case REDIS_REPLY_STRING:printf("REDIS_REPLY_STRING, Reply:%s\n",reply->str);break;
case REDIS_REPLY_ARRAY:printf("REDIS_REPLY_ARRAY with %d elements\n",reply->elements);
for(int i=0;i<(reply->elements);i++){
print_reply((reply->element)[i]);
}
break;
case REDIS_REPLY_INTEGER:printf("REDIS_REPLY_INTEGER, Reply:%lld\n",reply->integer);break;
case REDIS_REPLY_NIL:printf("REDIS_REPLY_NIL\n");break;
case REDIS_REPLY_STATUS:printf("REDIS_REPLY_STATUS, Reply:%s\n",reply->str);break;
case REDIS_REPLY_ERROR:printf("REDIS_REPLY_ERROR, Reply:%s\n",reply->str);break;
default: printf("Unknown Reply Type");
}
}
4 changes: 3 additions & 1 deletion hircluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ int test_cluster_update_route(redisClusterContext *cc);
struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, int flags);
struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, int flags);


void *redisClusterBroadcastCommand(redisClusterContext *cc, const char *format, ...);
redisReply *redisCLusterCommandSendAll(redisClusterContext *cc,char *cmd,size_t len);
void print_reply(redisReply *reply);
/*############redis cluster async############*/

struct redisClusterAsyncContext;
Expand Down