Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiplexed Commands Del, MSet, SInter and SUnion #33

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions src/main/java/com/mmmthatsgoodcode/redis/Protocol.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
package com.mmmthatsgoodcode.redis;

import java.nio.charset.Charset;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import javax.naming.OperationNotSupportedException;

import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableMap;
import com.mmmthatsgoodcode.redis.client.Transaction;
import com.mmmthatsgoodcode.redis.client.UnrecognizedReplyException;
import com.mmmthatsgoodcode.redis.protocol.Command;
import com.mmmthatsgoodcode.redis.protocol.Reply;
import com.mmmthatsgoodcode.redis.protocol.command.*;
import com.mmmthatsgoodcode.redis.protocol.reply.*;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufProcessor;
import io.netty.buffer.PooledByteBufAllocator;

public interface Protocol {

public enum ReplyType { BULK, ERROR, INTEGER, MULTI_BULK, STATUS, UNKNOWN }
public enum CommandType { GET, SET, SETEX, EXEC, EXISTS, MSET, MULTI, PING, SETNX, WATCH }
public enum CommandType { GET, SET, SETEX, EXEC, EXISTS, MSET, MULTI, PING, SADD, SETNX, SINTER, SUNION, WATCH }

public ByteBufAllocator getByteBufAllocator();

Expand All @@ -33,9 +26,12 @@ public interface Encoder {
public void encode(MSet command, ByteBuf out);
public void encode(Multi command, ByteBuf out);
public void encode(Ping command, ByteBuf out);
public void encode(SAdd command, ByteBuf out);
public void encode(Set command, ByteBuf out);
public void encode(Setex command, ByteBuf out);
public void encode(Setnx command, ByteBuf out);
public void encode(SInter command, ByteBuf out);
public void encode(SUnion command, ByteBuf out);
public void encode(Watch command, ByteBuf out);
public void encode(Command command, ByteBuf out) throws OperationNotSupportedException;
public void encode(Transaction command, ByteBuf out) throws OperationNotSupportedException;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/mmmthatsgoodcode/redis/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public <C extends SplittableCommand, T extends Reply> PendingReply<T> send( Spli
Map<Host, C> commandsForHost = new HashMap<Host, C>();
LOG.debug("Starting splitting");
for(Entry<Host, List<String>> keysForHost:hashedKeys.entrySet()) {
LOG.debug("fragmenting the MSet");
LOG.debug("fragmenting the command");
commandsForHost.put(keysForHost.getKey(), command.split(keysForHost.getValue()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,7 @@
import com.mmmthatsgoodcode.redis.client.Transaction;
import com.mmmthatsgoodcode.redis.client.UnrecognizedReplyException;
import com.mmmthatsgoodcode.redis.protocol.Command;
import com.mmmthatsgoodcode.redis.protocol.command.Exec;
import com.mmmthatsgoodcode.redis.protocol.command.Exists;
import com.mmmthatsgoodcode.redis.protocol.command.Get;
import com.mmmthatsgoodcode.redis.protocol.command.MSet;
import com.mmmthatsgoodcode.redis.protocol.command.Multi;
import com.mmmthatsgoodcode.redis.protocol.command.Ping;
import com.mmmthatsgoodcode.redis.protocol.command.Set;
import com.mmmthatsgoodcode.redis.protocol.command.Setex;
import com.mmmthatsgoodcode.redis.protocol.command.Setnx;
import com.mmmthatsgoodcode.redis.protocol.command.Watch;
import com.mmmthatsgoodcode.redis.protocol.model.AbstractReply;
import com.mmmthatsgoodcode.redis.protocol.command.*;
import com.mmmthatsgoodcode.redis.protocol.model.AbstractReply.ReplyHintBytes;
import com.mmmthatsgoodcode.redis.protocol.reply.BulkReply;
import com.mmmthatsgoodcode.redis.protocol.reply.ErrorReply;
Expand Down Expand Up @@ -128,6 +118,18 @@ public void encode(Multi command, ByteBuf out) {
public void encode(Ping command, ByteBuf out) {
encodeNoArgCommand(out, commandNames.get(CommandType.PING));
}

@Override
public void encode(SAdd command, ByteBuf out) {
EncodeHelper helper = new EncodeHelper(out);
helper.addArgc(2+command.getMembers().size());
helper.addArg(commandNames.get(CommandType.SADD));
helper.addArg(command.getKey().getBytes(ENCODING));

for(byte[] member : command.getMembers()){
helper.addArg(member);
}
}

@Override
public void encode(Set command, ByteBuf out) {
Expand All @@ -141,7 +143,33 @@ public void encode(Set command, ByteBuf out) {

@Override
public void encode(Setnx command, ByteBuf out) {
encode((Set) command, out);
EncodeHelper helper = new EncodeHelper(out);
helper.addArgc(3);
helper.addArg(commandNames.get(CommandType.SETNX));
helper.addArg(command.getKey().getBytes(ENCODING));
helper.addArg(command.getValue());
}

@Override
public void encode(SInter command, ByteBuf out) {
EncodeHelper helper = new EncodeHelper(out);
helper.addArgc(1+command.getKeys().size());
helper.addArg(commandNames.get(CommandType.SINTER));

for(String key : command.getKeys()){
helper.addArg(key.getBytes(ENCODING));
}
}

@Override
public void encode(SUnion command, ByteBuf out) {
EncodeHelper helper = new EncodeHelper(out);
helper.addArgc(1+command.getKeys().size());
helper.addArg(commandNames.get(CommandType.SUNION));

for(String key : command.getKeys()){
helper.addArg(key.getBytes(ENCODING));
}
}

@Override
Expand Down Expand Up @@ -173,30 +201,29 @@ public void encode(Setex command, ByteBuf out) {
}

public void encode(Command command, ByteBuf out) throws OperationNotSupportedException {
if (command instanceof Get) encode((Get) command, out);
if (command instanceof Exec) encode((Exec) command, out);
else if (command instanceof Exists) encode((Exists) command, out);
else if (command instanceof Get) encode((Get) command, out);
else if (command instanceof MSet) encode((MSet) command, out);
else if (command instanceof Multi) encode((Multi) command, out);
else if (command instanceof Ping) encode((Ping) command, out);
else if (command instanceof SAdd) encode((SAdd) command, out);
else if (command instanceof Set) encode((Set) command, out);
else if (command instanceof Setex) encode((Setex) command, out);
else if (command instanceof Exists) encode((Exists) command, out);
else if (command instanceof Exec) encode((Exec) command, out);
else if (command instanceof Setnx) encode((Setnx) command, out);
else if (command instanceof SInter) encode((SInter) command, out);
else if (command instanceof SUnion) encode((SUnion) command, out);
else if (command instanceof Watch) encode((Watch) command, out);
else if (command instanceof Ping) encode((Ping) command, out);
else if (command instanceof Multi) encode((Multi) command, out);
else if (command instanceof MSet) encode((MSet) command, out);
else throw new OperationNotSupportedException();

}

@Override
public void encode(Transaction transaction, ByteBuf out) throws OperationNotSupportedException {

for (Command command:transaction) {
encode(command, out);

}

}


}

public class Decoder implements Protocol.Decoder {
Expand Down Expand Up @@ -454,15 +481,17 @@ public boolean process(byte value) throws Exception {


private static final Map<CommandType, byte[]> commandNames = new ImmutableMap.Builder<CommandType, byte[]>()
.put(CommandType.GET, "GET".getBytes(ENCODING))
.put(CommandType.EXEC, "EXEC".getBytes(ENCODING))
.put(CommandType.EXISTS, "EXISTS".getBytes(ENCODING))
.put(CommandType.GET, "GET".getBytes(ENCODING))
.put(CommandType.MSET, "MSET".getBytes(ENCODING))
.put(CommandType.MULTI, "MULTI".getBytes(ENCODING))
.put(CommandType.PING, "PING".getBytes(ENCODING))
.put(CommandType.SADD, "SADD".getBytes(ENCODING))
.put(CommandType.SET, "SET".getBytes(ENCODING))
.put(CommandType.SETEX, "SETEX".getBytes(ENCODING))
.put(CommandType.SETNX, "SETNX".getBytes(ENCODING))
.put(CommandType.SINTER, "SINTER".getBytes(ENCODING))
.put(CommandType.SUNION, "SUNION".getBytes(ENCODING))
.put(CommandType.WATCH, "WATCH".getBytes(ENCODING))
.build();

Expand Down
70 changes: 70 additions & 0 deletions src/main/java/com/mmmthatsgoodcode/redis/protocol/command/Del.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.mmmthatsgoodcode.redis.protocol.command;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mmmthatsgoodcode.redis.protocol.model.PendingReply;
import com.mmmthatsgoodcode.redis.protocol.model.SplittableCommand;
import com.mmmthatsgoodcode.redis.protocol.reply.IntegerReply;

public class Del extends SplittableCommand<Del, IntegerReply>{

protected static final Logger LOG = LoggerFactory.getLogger(Del.class);
private int i=0;

public Del(List<String> keys) {
super(keys);
}

public Del(String key){
super(key);
}

@Override
protected Del fragment(List<String> keys) {
this.i++;

final Del parent = this;

LOG.debug("Creating new Del");
return new Del(keys){
final int nb = i;

public String toString(){
return "Child n."+nb+" "+super.toString();
}

private final PendingReply<IntegerReply> childReply = new PendingReply<IntegerReply>(this){

public void finalize(IntegerReply partialReply){
LOG.debug("Del.fragment(...).new Del() {...}.childReply.new PendingReply() {...}.finalize()");
super.finalize(partialReply);
parent.getReply().finalize(partialReply);
LOG.debug("back from super.finalize()");
}
};

public PendingReply<IntegerReply> getReply(){
LOG.debug("getReply() invoked");
return childReply;
}
};
}

@Override
public IntegerReply combine(List<IntegerReply> partialReplies) {
LOG.debug("Del.combine()");
int deletedKeys=0;

for(IntegerReply reply : partialReplies){
deletedKeys +=reply.value();
}
return new IntegerReply(deletedKeys);
}

public String toString(){
return "Del "+getKeys().toString();
}
}
23 changes: 12 additions & 11 deletions src/main/java/com/mmmthatsgoodcode/redis/protocol/command/MSet.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.mmmthatsgoodcode.redis.protocol.command;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.mmmthatsgoodcode.redis.protocol.model.PendingReply;
import com.mmmthatsgoodcode.redis.protocol.model.SplittableCommand;
import com.mmmthatsgoodcode.redis.protocol.reply.StatusReply;
Expand All @@ -17,43 +18,43 @@ public class MSet extends SplittableCommand<MSet, StatusReply>{
protected static final Logger LOG = LoggerFactory.getLogger(MSet.class);
private int i = 0;

public MSet(Map<String, byte[]> keyValues) {
super(keyValues);
public MSet(LinkedHashMap<String, byte[]> keyValues) {
super(ImmutableList.copyOf(keyValues.keySet()));
this.keysValues = keyValues;
}

@Override
protected MSet fragment(List<String> keys) {
this.i++;
// get the values from this MSet instanced for "keys"
Map<String, byte[]> temp = new HashMap<String, byte[]>();
LinkedHashMap<String, byte[]> temp = new LinkedHashMap<String, byte[]>();
for(String key : keys){
temp.put(key, this.keysValues.get(key));
}

final MSet parent = this;

// create a new MSet only with these keys&values
LOG.debug("new MSet created");
LOG.debug("Creating new MSet");
return new MSet(temp) {
final int nb = i;

public String toString(){
return "Child n."+nb;
return "Child n"+nb+super.toString();
}
private final PendingReply<StatusReply> childReply = new PendingReply<StatusReply>(parent) {
private final PendingReply<StatusReply> childReply = new PendingReply<StatusReply>(this) {

public void finalize(StatusReply partialReply) {
LOG.debug("MSet.split(...).new MSet() {...}.getReply().new PendingReply() {...}.finalize()");
parent.getReply().finalize(partialReply);
super.finalize(partialReply);
LOG.debug("back from super.finalize");
parent.getReply().finalize(partialReply);
LOG.debug("back from super.finalize()");
}

};

public PendingReply<StatusReply> getReply() {
LOG.debug("getReply invoqued");
LOG.debug("getReply() invoked");
return childReply;
}
};
Expand All @@ -73,6 +74,6 @@ public StatusReply combine(List<StatusReply> partialReplies) {
}

public String toString(){
return "parent";
return "MSet "+keysValues.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.mmmthatsgoodcode.redis.protocol.command;

import java.util.ArrayList;
import java.util.List;

import com.mmmthatsgoodcode.redis.protocol.model.KeyedCommand;
import com.mmmthatsgoodcode.redis.protocol.reply.IntegerReply;

public class SAdd extends KeyedCommand<IntegerReply> {

private final List<byte[]> members;

public SAdd(String key, byte[] member) {
super(key);
this.members = new ArrayList<byte[]>();
this.members.add(member);
}

public SAdd(String key, List<byte[]> members) {
super(key);
this.members = members;
}

public List<byte[]> getMembers(){
return members;
}

public String toString(){
return "SAdd"+getKey().toString()+" "+getMembers().toString();
}
}
Loading