Skip to content

Commit

Permalink
* added ability to create cluster (or parts of it) on ephemeral, pred…
Browse files Browse the repository at this point in the history
…efined or sequence based ports

* small utils for jedis client;
* updated readme with examples of new functionality
  • Loading branch information
turu committed Feb 1, 2015
1 parent 71a7bfd commit 1af8510
Show file tree
Hide file tree
Showing 18 changed files with 473 additions and 26 deletions.
42 changes: 39 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,35 @@ RedisServer redisServer = RedisServer.builder()
.build();
```

## Setting up a cluster

Our Embedded Redis has support for HA Redis clusters with Sentinels and master-slave replication

A simple redis integration test with Redis cluster setup similar to that from production would look like this:
#### Using ephemeral ports
A simple redis integration test with Redis cluster on ephemeral ports, with setup similar to that from production would look like this:
```java
public class SomeIntegrationTestThatRequiresRedis {
private RedisCluster cluster;

private Set<String> jedisSentinelHosts;

@Before
public void setup() throws Exception {
//creates a cluster with 3 sentinels, quorum size of 2 and 3 replication groups, each with one master and one slave
cluster = RedisCluster.builder().sentinelCount(3).quorumSize(2)
cluster = RedisCluster.builder().ephemeral().sentinelCount(3).quorumSize(2)
.replicationGroup("master1", 1)
.replicationGroup("master2", 1)
.replicationGroup("master3", 1)
.build();
cluster.start();

//retrieve ports on which sentinels have been started, using a simple Jedis utility class
jedisSentinelHosts = JedisUtil.sentinelHosts(cluster);
}

@Test
public void test() throws Exception {
// testing code that requires redis running
JedisSentinelPool pool = new JedisSentinelPool("master1", jedisSentinelHosts);
}

@After
Expand All @@ -70,6 +78,34 @@ public class SomeIntegrationTestThatRequiresRedis {
}
```

#### Retrieving ports
The above example starts Redis cluster on ephemeral ports, which you can later get with ```cluster.ports()```,
which will return a list of all ports of the cluster. You can also get ports of sentinels with ```cluster.sentinelPorts()```
or servers with ```cluster.serverPorts()```. ```JedisUtil``` class contains utility methods for use with Jedis client.

#### Using predefined ports
You can also start Redis cluster on predefined ports and even mix both approaches:
```java
public class SomeIntegrationTestThatRequiresRedis {
private RedisCluster cluster;

@Before
public void setup() throws Exception {
final List<Integer> sentinels = Arrays.asList(26739, 26912);
final List<Integer> group1 = Arrays.asList(6667, 6668);
final List<Integer> group2 = Arrays.asList(6387, 6379);
//creates a cluster with 3 sentinels, quorum size of 2 and 3 replication groups, each with one master and one slave
cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(2)
.serverPorts(group1).replicationGroup("master1", 1)
.serverPorts(group2).replicationGroup("master2", 1)
.ephemeralServers().replicationGroup("master3", 1)
.build();
cluster.start();
}
//(...)
```
The above will create and start a cluster with sentinels on ports ```26739, 26912```, first replication group on ```6667, 6668```,
second replication group on ```6387, 6379``` and third replication group on ephemeral ports.

Redis version
==============
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/redis/embedded/AbstractRedisInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import redis.embedded.exceptions.EmbeddedRedisException;

import java.io.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand All @@ -15,10 +16,15 @@ abstract class AbstractRedisInstance implements Redis {
protected List<String> args = Collections.emptyList();
private volatile boolean active = false;
private Process redisProcess;
private final int port;

private final ExecutorService executor = Executors.newSingleThreadExecutor();

@Override
protected AbstractRedisInstance(int port) {
this.port = port;
}

@Override
public boolean isActive() {
return active;
}
Expand Down Expand Up @@ -91,4 +97,9 @@ private void tryWaitFor() {
throw new EmbeddedRedisException("Failed to stop redis instance", e);
}
}

@Override
public List<Integer> ports() {
return Arrays.asList(port);
}
}
8 changes: 8 additions & 0 deletions src/main/java/redis/embedded/PortProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package redis.embedded;

/**
* Created by piotrturek on 29/01/15.
*/
public interface PortProvider {
int next();
}
4 changes: 4 additions & 0 deletions src/main/java/redis/embedded/Redis.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import redis.embedded.exceptions.EmbeddedRedisException;

import java.util.List;

/**
* Created by piotrturek on 22/01/15.
*/
Expand All @@ -11,4 +13,6 @@ public interface Redis {
void start() throws EmbeddedRedisException;

void stop() throws EmbeddedRedisException;

List<Integer> ports();
}
27 changes: 27 additions & 0 deletions src/main/java/redis/embedded/RedisCluster.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package redis.embedded;

import com.google.common.collect.Lists;
import redis.embedded.exceptions.EmbeddedRedisException;

import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Created by piotrturek on 22/01/15.
Expand Down Expand Up @@ -34,6 +37,30 @@ public void stop() throws EmbeddedRedisException {
sentinels.parallelStream().forEach(Redis::stop);
}

@Override
public List<Integer> ports() {
return Stream.concat(
sentinels.stream().flatMap(s -> s.ports().stream()),
servers.stream().flatMap(s -> s.ports().stream())
).collect(Collectors.toList());
}

public List<Redis> sentinels() {
return Lists.newLinkedList(sentinels);
}

public List<Integer> sentinelPorts() {
return sentinels.stream().flatMap(s -> s.ports().stream()).collect(Collectors.toList());
}

public List<Redis> servers() {
return Lists.newLinkedList(servers);
}

public List<Integer> serverPorts() {
return servers.stream().flatMap(s -> s.ports().stream()).collect(Collectors.toList());
}

public static RedisClusterBuilder builder() {
return new RedisClusterBuilder();
}
Expand Down
58 changes: 41 additions & 17 deletions src/main/java/redis/embedded/RedisClusterBuilder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package redis.embedded;

import redis.embedded.ports.EphemeralPortProvider;
import redis.embedded.ports.PredefinedPortProvider;
import redis.embedded.ports.SequencePortProvider;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -13,8 +18,8 @@ public class RedisClusterBuilder {
private RedisServerBuilder serverBuilder = new RedisServerBuilder();
private int sentinelCount = 1;
private int quorumSize = 1;
private int currentSentinelPort = 26379;
private int currentReplicationGroupPort = 6379;
private PortProvider sentinelPortProvider = new SequencePortProvider(26379);
private PortProvider replicationGroupPortProvider = new SequencePortProvider(6379);
private final List<ReplicationGroup> groups = new LinkedList<>();

public RedisClusterBuilder withSentinelBuilder(RedisSentinelBuilder sentinelBuilder) {
Expand All @@ -27,13 +32,41 @@ public RedisClusterBuilder withServerBuilder(RedisServerBuilder serverBuilder) {
return this;
}

public RedisClusterBuilder sentinelPorts(Collection<Integer> ports) {
this.sentinelPortProvider = new PredefinedPortProvider(ports);
this.sentinelCount = ports.size();
return this;
}

public RedisClusterBuilder serverPorts(Collection<Integer> ports) {
this.replicationGroupPortProvider = new PredefinedPortProvider(ports);
return this;
}

public RedisClusterBuilder ephemeralSentinels() {
this.sentinelPortProvider = new EphemeralPortProvider();
return this;
}

public RedisClusterBuilder ephemeralServers() {
this.replicationGroupPortProvider = new EphemeralPortProvider();
return this;
}


public RedisClusterBuilder ephemeral() {
ephemeralSentinels();
ephemeralServers();
return this;
}

public RedisClusterBuilder sentinelCount(int sentinelCount) {
this.sentinelCount = sentinelCount;
return this;
}

public RedisClusterBuilder sentinelStartingPort(int startingPort) {
this.currentSentinelPort = startingPort;
this.sentinelPortProvider = new SequencePortProvider(startingPort);
return this;
}

Expand All @@ -43,7 +76,7 @@ public RedisClusterBuilder quorumSize(int quorumSize) {
}

public RedisClusterBuilder replicationGroup(String masterName, int slaveCount) {
this.groups.add(new ReplicationGroup(masterName, slaveCount, nextReplicationGroupPort(slaveCount)));
this.groups.add(new ReplicationGroup(masterName, slaveCount, this.replicationGroupPortProvider));
return this;
}

Expand Down Expand Up @@ -99,28 +132,19 @@ private Redis buildSentinel() {
}

private int nextSentinelPort() {
return currentSentinelPort++;
}

private int nextReplicationGroupPort(int slaveCount) {
final int toReturn = this.currentReplicationGroupPort;
currentReplicationGroupPort += slaveCount + 1;
return toReturn;
return sentinelPortProvider.next();
}

private static class ReplicationGroup {
private final int slaveCount;
private final String masterName;
private final int masterPort;
private final List<Integer> slavePorts = new LinkedList<>();

private ReplicationGroup(String masterName, int slaveCount, int portsStart) {
private ReplicationGroup(String masterName, int slaveCount, PortProvider portProvider) {
this.masterName = masterName;
this.slaveCount = slaveCount;
masterPort = portsStart;
masterPort = portProvider.next();
while (slaveCount-- > 0) {
portsStart += 1;
slavePorts.add(portsStart);
slavePorts.add(portProvider.next());
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/redis/embedded/RedisSentinel.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
public class RedisSentinel extends AbstractRedisInstance {
private static final String REDIS_READY_PATTERN = ".*Sentinel runid is.*";

public RedisSentinel(List<String> args) {
public RedisSentinel(List<String> args, int port) {
super(port);
this.args = new ArrayList<>(args);
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/embedded/RedisSentinelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public RedisSentinelBuilder setting(String configLine) {
public RedisSentinel build() {
tryResolveConfAndExec();
List<String> args = buildCommandArgs();
return new RedisSentinel(args);
return new RedisSentinel(args, port);
}

private void tryResolveConfAndExec() {
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/redis/embedded/RedisServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public RedisServer() throws IOException {
}

public RedisServer(Integer port) throws IOException {
super(port);
File executable = JarUtil.extractExecutableFromJar(RedisRunScriptEnum.getRedisRunScript());
this.args = Arrays.asList(
executable.getAbsolutePath(),
Expand All @@ -25,13 +26,15 @@ public RedisServer(Integer port) throws IOException {
}

public RedisServer(File executable, Integer port) {
super(port);
this.args = Arrays.asList(
executable.getAbsolutePath(),
"--port", Integer.toString(port)
);
}

RedisServer(List<String> args) {
RedisServer(List<String> args, int port) {
super(port);
this.args = new ArrayList<>(args);
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/redis/embedded/RedisServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class RedisServerBuilder {
private static final String CONF_FILENAME = "embedded-redis-server";

private File executable;
private Integer port;
private Integer port = 6379;
private InetSocketAddress slaveOf;
private String redisConf;

Expand Down Expand Up @@ -76,7 +76,7 @@ public RedisServerBuilder setting(String configLine) {
public RedisServer build() {
tryResolveConfAndExec();
List<String> args = buildCommandArgs();
return new RedisServer(args);
return new RedisServer(args, port);
}

public void reset() {
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/redis/embedded/ports/EphemeralPortProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package redis.embedded.ports;

import redis.embedded.PortProvider;
import redis.embedded.exceptions.RedisBuildingException;

import java.io.IOException;
import java.net.ServerSocket;

/**
* Created by piotrturek on 29/01/15.
*/
public class EphemeralPortProvider implements PortProvider {
@Override
public int next() {
try {
final ServerSocket socket = new ServerSocket(0);
socket.setReuseAddress(false);
int port = socket.getLocalPort();
socket.close();
return port;
} catch (IOException e) {
//should not ever happen
throw new RedisBuildingException("Could not provide ephemeral port", e);
}
}
}
Loading

0 comments on commit 1af8510

Please sign in to comment.