diff --git a/README.md b/README.md index 1baa0eaf..a5b6622a 100644 --- a/README.md +++ b/README.md @@ -165,12 +165,16 @@ Contributors * anthonyu ([@anthonyu](http://github.com/anthonyu)) * Artem Orobets ([@enisher](http://github.com/enisher)) * Sean Simonsen ([@SeanSimonsen](http://github.com/SeanSimonsen)) + * Rob Winch ([@rwinch](http://github.com/rwinch)) Changelog ============== -### 0.5 +### 0.6 + * Support JDK 6 + + +### 0.5 * OS detection fix * redis binary per OS/arch pair * Updated to 2.8.19 binary for Windows diff --git a/pom.xml b/pom.xml index eb975e49..51b219ac 100644 --- a/pom.xml +++ b/pom.xml @@ -100,9 +100,9 @@ maven-compiler-plugin 2.3.2 - 1.8 - 1.8 - 1.8 + 1.6 + 1.6 + 1.6 diff --git a/src/main/java/redis/embedded/AbstractRedisInstance.java b/src/main/java/redis/embedded/AbstractRedisInstance.java index 41768df7..f8d2fbf8 100644 --- a/src/main/java/redis/embedded/AbstractRedisInstance.java +++ b/src/main/java/redis/embedded/AbstractRedisInstance.java @@ -9,6 +9,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.commons.io.IOUtils; + abstract class AbstractRedisInstance implements Redis { protected List args = Collections.emptyList(); private volatile boolean active = false; @@ -43,21 +45,14 @@ public synchronized void start() throws EmbeddedRedisException { private void logErrors() { final InputStream errorStream = redisProcess.getErrorStream(); - executor.submit(() -> { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream))) { - String line; - while ((line = reader.readLine()) != null) { - System.out.println(line); - } - } catch (IOException e) { - e.printStackTrace(); - } - }); - + BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream)); + Runnable printReaderTask = new PrintReaderRunnable(reader); + executor.submit(printReaderTask); } private void awaitRedisServerReady() throws IOException { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(redisProcess.getInputStream()))) { + BufferedReader reader = new BufferedReader(new InputStreamReader(redisProcess.getInputStream())); + try { String outputLine; do { outputLine = reader.readLine(); @@ -66,6 +61,8 @@ private void awaitRedisServerReady() throws IOException { throw new RuntimeException("Can't start redis server. Check logs for details."); } } while (!outputLine.matches(redisReadyPattern())); + } finally { + IOUtils.closeQuietly(reader); } } @@ -99,4 +96,31 @@ private void tryWaitFor() { public List ports() { return Arrays.asList(port); } + + private static class PrintReaderRunnable implements Runnable { + private final BufferedReader reader; + + private PrintReaderRunnable(BufferedReader reader) { + this.reader = reader; + } + + public void run() { + try { + readLines(); + } finally { + IOUtils.closeQuietly(reader); + } + } + + public void readLines() { + try { + String line; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } } diff --git a/src/main/java/redis/embedded/RedisCluster.java b/src/main/java/redis/embedded/RedisCluster.java index d50943b8..1f18bf73 100644 --- a/src/main/java/redis/embedded/RedisCluster.java +++ b/src/main/java/redis/embedded/RedisCluster.java @@ -3,14 +3,13 @@ import com.google.common.collect.Lists; import redis.embedded.exceptions.EmbeddedRedisException; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; public class RedisCluster implements Redis { - private final List sentinels = new LinkedList<>(); - private final List servers = new LinkedList<>(); + private final List sentinels = new LinkedList(); + private final List servers = new LinkedList(); RedisCluster(List sentinels, List servers) { this.servers.addAll(servers); @@ -19,27 +18,45 @@ public class RedisCluster implements Redis { @Override public boolean isActive() { - return sentinels.stream().allMatch(Redis::isActive) && servers.stream().allMatch(Redis::isActive); + for(Redis redis : sentinels) { + if(!redis.isActive()) { + return false; + } + } + for(Redis redis : servers) { + if(!redis.isActive()) { + return false; + } + } + return true; } @Override public void start() throws EmbeddedRedisException { - sentinels.parallelStream().forEach(Redis::start); - servers.parallelStream().forEach(Redis::start); + for(Redis redis : sentinels) { + redis.start(); + } + for(Redis redis : servers) { + redis.start(); + } } @Override public void stop() throws EmbeddedRedisException { - servers.parallelStream().forEach(Redis::stop); - sentinels.parallelStream().forEach(Redis::stop); + for(Redis redis : sentinels) { + redis.stop(); + } + for(Redis redis : servers) { + redis.stop(); + } } @Override public List ports() { - return Stream.concat( - sentinels.stream().flatMap(s -> s.ports().stream()), - servers.stream().flatMap(s -> s.ports().stream()) - ).collect(Collectors.toList()); + List ports = new ArrayList(); + ports.addAll(sentinelPorts()); + ports.addAll(serverPorts()); + return ports; } public List sentinels() { @@ -47,7 +64,11 @@ public List sentinels() { } public List sentinelPorts() { - return sentinels.stream().flatMap(s -> s.ports().stream()).collect(Collectors.toList()); + List ports = new ArrayList(); + for(Redis redis : sentinels) { + ports.addAll(redis.ports()); + } + return ports; } public List servers() { @@ -55,7 +76,11 @@ public List servers() { } public List serverPorts() { - return servers.stream().flatMap(s -> s.ports().stream()).collect(Collectors.toList()); + List ports = new ArrayList(); + for(Redis redis : servers) { + ports.addAll(redis.ports()); + } + return ports; } public static RedisClusterBuilder builder() { diff --git a/src/main/java/redis/embedded/RedisClusterBuilder.java b/src/main/java/redis/embedded/RedisClusterBuilder.java index 37dc9e13..9cbe362c 100644 --- a/src/main/java/redis/embedded/RedisClusterBuilder.java +++ b/src/main/java/redis/embedded/RedisClusterBuilder.java @@ -4,11 +4,10 @@ import redis.embedded.ports.PredefinedPortProvider; import redis.embedded.ports.SequencePortProvider; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; public class RedisClusterBuilder { private RedisSentinelBuilder sentinelBuilder = new RedisSentinelBuilder(); @@ -17,7 +16,7 @@ public class RedisClusterBuilder { private int quorumSize = 1; private PortProvider sentinelPortProvider = new SequencePortProvider(26379); private PortProvider replicationGroupPortProvider = new SequencePortProvider(6379); - private final List groups = new LinkedList<>(); + private final List groups = new LinkedList(); public RedisClusterBuilder withSentinelBuilder(RedisSentinelBuilder sentinelBuilder) { this.sentinelBuilder = sentinelBuilder; @@ -84,21 +83,21 @@ public RedisCluster build() { } private List buildServers() { - return groups.stream().flatMap(g -> { - final Stream.Builder builder = Stream.builder(); - builder.accept(buildMaster(g)); - buildSlaves(builder, g); - return builder.build(); - }).collect(Collectors.toList()); + List servers = new ArrayList(); + for(ReplicationGroup g : groups) { + servers.add(buildMaster(g)); + buildSlaves(servers, g); + } + return servers; } - private void buildSlaves(Stream.Builder builder, ReplicationGroup g) { + private void buildSlaves(List servers, ReplicationGroup g) { for (Integer slavePort : g.slavePorts) { serverBuilder.reset(); serverBuilder.port(slavePort); serverBuilder.slaveOf("localhost", g.masterPort); final RedisServer slave = serverBuilder.build(); - builder.accept(slave); + servers.add(slave); } } @@ -109,7 +108,7 @@ private Redis buildMaster(ReplicationGroup g) { private List buildSentinels() { int toBuild = this.sentinelCount; - final List sentinels = new LinkedList<>(); + final List sentinels = new LinkedList(); while (toBuild-- > 0) { sentinels.add(buildSentinel()); } @@ -119,12 +118,12 @@ private List buildSentinels() { private Redis buildSentinel() { sentinelBuilder.reset(); sentinelBuilder.port(nextSentinelPort()); - groups.stream().forEach(g -> { + for(ReplicationGroup g : groups) { sentinelBuilder.masterName(g.masterName); sentinelBuilder.masterPort(g.masterPort); sentinelBuilder.quorumSize(quorumSize); sentinelBuilder.addDefaultReplicationGroup(); - }); + } return sentinelBuilder.build(); } @@ -135,7 +134,7 @@ private int nextSentinelPort() { private static class ReplicationGroup { private final String masterName; private final int masterPort; - private final List slavePorts = new LinkedList<>(); + private final List slavePorts = new LinkedList(); private ReplicationGroup(String masterName, int slaveCount, PortProvider portProvider) { this.masterName = masterName; diff --git a/src/main/java/redis/embedded/RedisSentinel.java b/src/main/java/redis/embedded/RedisSentinel.java index 1ee56262..6b8c6234 100644 --- a/src/main/java/redis/embedded/RedisSentinel.java +++ b/src/main/java/redis/embedded/RedisSentinel.java @@ -8,7 +8,7 @@ public class RedisSentinel extends AbstractRedisInstance { public RedisSentinel(List args, int port) { super(port); - this.args = new ArrayList<>(args); + this.args = new ArrayList(args); } public static RedisSentinelBuilder builder() { return new RedisSentinelBuilder(); } diff --git a/src/main/java/redis/embedded/RedisSentinelBuilder.java b/src/main/java/redis/embedded/RedisSentinelBuilder.java index 99ce480b..5e0c8573 100644 --- a/src/main/java/redis/embedded/RedisSentinelBuilder.java +++ b/src/main/java/redis/embedded/RedisSentinelBuilder.java @@ -143,7 +143,7 @@ private String resolveConfigName() { private List buildCommandArgs() { Preconditions.checkNotNull(sentinelConf); - List args = new ArrayList<>(); + List args = new ArrayList(); args.add(executable.getAbsolutePath()); args.add(sentinelConf); args.add("--sentinel"); diff --git a/src/main/java/redis/embedded/RedisServer.java b/src/main/java/redis/embedded/RedisServer.java index 2393127d..e2f7a3e4 100644 --- a/src/main/java/redis/embedded/RedisServer.java +++ b/src/main/java/redis/embedded/RedisServer.java @@ -41,7 +41,7 @@ public RedisServer(RedisExecProvider redisExecProvider, Integer port) throws IOE RedisServer(List args, int port) { super(port); - this.args = new ArrayList<>(args); + this.args = new ArrayList(args); } public static RedisServerBuilder builder() { diff --git a/src/main/java/redis/embedded/RedisServerBuilder.java b/src/main/java/redis/embedded/RedisServerBuilder.java index 7c6b99ea..b050449c 100644 --- a/src/main/java/redis/embedded/RedisServerBuilder.java +++ b/src/main/java/redis/embedded/RedisServerBuilder.java @@ -106,7 +106,7 @@ private String resolveConfigName() { } private List buildCommandArgs() { - List args = new ArrayList<>(); + List args = new ArrayList(); args.add(executable.getAbsolutePath()); if (!Strings.isNullOrEmpty(redisConf)) { diff --git a/src/main/java/redis/embedded/ports/PredefinedPortProvider.java b/src/main/java/redis/embedded/ports/PredefinedPortProvider.java index 289bd0bf..ebc32576 100644 --- a/src/main/java/redis/embedded/ports/PredefinedPortProvider.java +++ b/src/main/java/redis/embedded/ports/PredefinedPortProvider.java @@ -9,7 +9,7 @@ import java.util.List; public class PredefinedPortProvider implements PortProvider { - private final List ports = new LinkedList<>(); + private final List ports = new LinkedList(); private final Iterator current; public PredefinedPortProvider(Collection ports) { diff --git a/src/main/java/redis/embedded/util/JedisUtil.java b/src/main/java/redis/embedded/util/JedisUtil.java index f635eb40..1672a9b2 100644 --- a/src/main/java/redis/embedded/util/JedisUtil.java +++ b/src/main/java/redis/embedded/util/JedisUtil.java @@ -3,9 +3,9 @@ import redis.embedded.Redis; import redis.embedded.RedisCluster; +import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; public class JedisUtil { public static Set jedisHosts(Redis redis) { @@ -19,6 +19,10 @@ public static Set sentinelHosts(RedisCluster cluster) { } public static Set portsToJedisHosts(List ports) { - return ports.stream().map(p -> "localhost:" + p).collect(Collectors.toSet()); + Set hosts = new HashSet(); + for(Integer p : ports) { + hosts.add("localhost:" + p); + } + return hosts; } } diff --git a/src/test/java/redis/embedded/RedisClusterTest.java b/src/test/java/redis/embedded/RedisClusterTest.java index 9c42b10e..41bb8d8c 100644 --- a/src/test/java/redis/embedded/RedisClusterTest.java +++ b/src/test/java/redis/embedded/RedisClusterTest.java @@ -44,8 +44,12 @@ public void stopShouldStopEntireCluster() throws Exception { instance.stop(); //then - sentinels.stream().forEach(s -> verify(s).stop()); - servers.stream().forEach(m -> verify(m).stop()); + for(Redis s : sentinels) { + verify(s).stop(); + } + for(Redis s : servers) { + verify(s).stop(); + } } @Test @@ -59,8 +63,12 @@ public void startShouldStartEntireCluster() throws Exception { instance.start(); //then - sentinels.stream().forEach(s -> verify(s).start()); - servers.stream().forEach(m -> verify(m).start()); + for(Redis s : sentinels) { + verify(s).start(); + } + for(Redis s : servers) { + verify(s).start(); + } } @Test @@ -78,8 +86,12 @@ public void isActiveShouldCheckEntireClusterIfAllActive() throws Exception { instance.isActive(); //then - sentinels.stream().forEach(s -> verify(s).isActive()); - servers.stream().forEach(m -> verify(m).isActive()); + for(Redis s : sentinels) { + verify(s).isActive(); + } + for(Redis s : servers) { + verify(s).isActive(); + } } @Test diff --git a/src/test/java/redis/embedded/ports/EphemeralPortProviderTest.java b/src/test/java/redis/embedded/ports/EphemeralPortProviderTest.java index 14022432..fece5770 100644 --- a/src/test/java/redis/embedded/ports/EphemeralPortProviderTest.java +++ b/src/test/java/redis/embedded/ports/EphemeralPortProviderTest.java @@ -1,48 +1,28 @@ package redis.embedded.ports; import org.junit.Test; -import redis.embedded.exceptions.RedisBuildingException; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.junit.Assert.assertEquals; public class EphemeralPortProviderTest { - private final ExecutorService executor = Executors.newFixedThreadPool(10); @Test - public void nextShouldGiveNextFreeEphemeralPortConcurrently() throws Exception { + public void nextShouldGiveNextFreeEphemeralPort() throws Exception { //given + final int portCount = 20; final EphemeralPortProvider provider = new EphemeralPortProvider(); //when - final List ports = executor.invokeAll( - Stream.generate(() -> (Callable) provider::next).limit(20).collect(Collectors.toList())) - .stream() - .map(this::getResult) - .sorted() - .distinct() - .collect(Collectors.toList()); + final List ports = new ArrayList(); + for(int i = 0;i < portCount; i++) { + ports.add(provider.next()); + } //then System.out.println(ports); assertEquals(20, ports.size()); } - - private int getResult(Future f) { - int ret = 0; - try { - ret = f.get(); - } catch (Exception e) { - throw new RedisBuildingException("failed to deliver", e); - } - return ret; - } - } \ No newline at end of file diff --git a/src/test/java/redis/embedded/ports/PredefinedPortProviderTest.java b/src/test/java/redis/embedded/ports/PredefinedPortProviderTest.java index 11daa120..50048894 100644 --- a/src/test/java/redis/embedded/ports/PredefinedPortProviderTest.java +++ b/src/test/java/redis/embedded/ports/PredefinedPortProviderTest.java @@ -3,35 +3,26 @@ import org.junit.Test; import redis.embedded.exceptions.RedisBuildingException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.junit.Assert.assertEquals; public class PredefinedPortProviderTest { - private final ExecutorService executor = Executors.newFixedThreadPool(10); @Test - public void nextShouldGiveNextPortFromAssignedListConcurrently() throws Exception { + public void nextShouldGiveNextPortFromAssignedList() throws Exception { //given Collection ports = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); final PredefinedPortProvider provider = new PredefinedPortProvider(ports); //when - final List returnedPorts = executor.invokeAll( - Stream.generate(() -> (Callable) provider::next).limit(10).collect(Collectors.toList())) - .stream() - .map(this::getResult) - .sorted() - .distinct() - .collect(Collectors.toList()); + final List returnedPorts = new ArrayList(); + for(int i = 0;i < ports.size(); i++) { + returnedPorts.add(provider.next()); + } //then assertEquals(ports, returnedPorts); @@ -44,24 +35,11 @@ public void nextShouldThrowExceptionWhenRunOutsOfPorts() throws Exception { final PredefinedPortProvider provider = new PredefinedPortProvider(ports); //when - executor.invokeAll( - Stream.generate(() -> (Callable) provider::next).limit(11).collect(Collectors.toList())) - .stream() - .map(this::getResult) - .sorted() - .distinct() - .collect(Collectors.toList()); + for(int i = 0;i < ports.size(); i++) { + provider.next(); + } //then exception should be thrown... - } - - private int getResult(Future f) { - int ret = 0; - try { - ret = f.get(); - } catch (Exception e) { - throw new RedisBuildingException("failed to deliver", e); - } - return ret; + provider.next(); } } \ No newline at end of file diff --git a/src/test/java/redis/embedded/ports/SequencePortProviderTest.java b/src/test/java/redis/embedded/ports/SequencePortProviderTest.java index 79ed120f..0a74b20c 100644 --- a/src/test/java/redis/embedded/ports/SequencePortProviderTest.java +++ b/src/test/java/redis/embedded/ports/SequencePortProviderTest.java @@ -2,44 +2,27 @@ import org.junit.Test; -import java.util.Comparator; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import static org.junit.Assert.assertEquals; public class SequencePortProviderTest { - private final ExecutorService executor = Executors.newFixedThreadPool(10); @Test - public void nextShouldIncrementPortsConcurrently() throws Exception { + public void nextShouldIncrementPorts() throws Exception { //given final int startPort = 10; + final int portCount = 101; final SequencePortProvider provider = new SequencePortProvider(startPort); //when - final int max = executor.invokeAll( - Stream.generate(() -> (Callable) provider::next).limit(101).collect(Collectors.toList())) - .stream() - .map(this::getResult) - .max(Comparator.naturalOrder()) - .get(); + int max = 0; + for(int i = 0;i max) { + max = port; + } + } //then - assertEquals(110, max); - } - - private int getResult(Future f) { - int ret = 0; - try { - ret = f.get(); - } catch (Exception e) { - e.printStackTrace(); - } - return ret; + assertEquals(portCount + startPort - 1, max); } } \ No newline at end of file