diff --git a/README.md b/README.md index 18a8faf5..11954618 100644 --- a/README.md +++ b/README.md @@ -40,27 +40,35 @@ RedisServer redisServer = RedisServer.builder() .build(); ``` +## Setting up a cluster + Our Embedded Redis has support for HA Redis clusters with Sentinels and master-slave replication -A simple redis integration test with Redis cluster setup similar to that from production would look like this: +#### Using ephemeral ports +A simple redis integration test with Redis cluster on ephemeral ports, with setup similar to that from production would look like this: ```java public class SomeIntegrationTestThatRequiresRedis { private RedisCluster cluster; - + private Set jedisSentinelHosts; + @Before public void setup() throws Exception { //creates a cluster with 3 sentinels, quorum size of 2 and 3 replication groups, each with one master and one slave - cluster = RedisCluster.builder().sentinelCount(3).quorumSize(2) + cluster = RedisCluster.builder().ephemeral().sentinelCount(3).quorumSize(2) .replicationGroup("master1", 1) .replicationGroup("master2", 1) .replicationGroup("master3", 1) .build(); cluster.start(); + + //retrieve ports on which sentinels have been started, using a simple Jedis utility class + jedisSentinelHosts = JedisUtil.sentinelHosts(cluster); } @Test public void test() throws Exception { // testing code that requires redis running + JedisSentinelPool pool = new JedisSentinelPool("master1", jedisSentinelHosts); } @After @@ -70,6 +78,34 @@ public class SomeIntegrationTestThatRequiresRedis { } ``` +#### Retrieving ports +The above example starts Redis cluster on ephemeral ports, which you can later get with ```cluster.ports()```, +which will return a list of all ports of the cluster. You can also get ports of sentinels with ```cluster.sentinelPorts()``` +or servers with ```cluster.serverPorts()```. ```JedisUtil``` class contains utility methods for use with Jedis client. + +#### Using predefined ports +You can also start Redis cluster on predefined ports and even mix both approaches: +```java +public class SomeIntegrationTestThatRequiresRedis { + private RedisCluster cluster; + + @Before + public void setup() throws Exception { + final List sentinels = Arrays.asList(26739, 26912); + final List group1 = Arrays.asList(6667, 6668); + final List group2 = Arrays.asList(6387, 6379); + //creates a cluster with 3 sentinels, quorum size of 2 and 3 replication groups, each with one master and one slave + cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(2) + .serverPorts(group1).replicationGroup("master1", 1) + .serverPorts(group2).replicationGroup("master2", 1) + .ephemeralServers().replicationGroup("master3", 1) + .build(); + cluster.start(); + } +//(...) +``` +The above will create and start a cluster with sentinels on ports ```26739, 26912```, first replication group on ```6667, 6668```, +second replication group on ```6387, 6379``` and third replication group on ephemeral ports. Redis version ============== diff --git a/src/main/java/redis/embedded/AbstractRedisInstance.java b/src/main/java/redis/embedded/AbstractRedisInstance.java index aabce247..113a4301 100644 --- a/src/main/java/redis/embedded/AbstractRedisInstance.java +++ b/src/main/java/redis/embedded/AbstractRedisInstance.java @@ -3,6 +3,7 @@ import redis.embedded.exceptions.EmbeddedRedisException; import java.io.*; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; @@ -15,10 +16,15 @@ abstract class AbstractRedisInstance implements Redis { protected List args = Collections.emptyList(); private volatile boolean active = false; private Process redisProcess; + private final int port; private final ExecutorService executor = Executors.newSingleThreadExecutor(); - @Override + protected AbstractRedisInstance(int port) { + this.port = port; + } + + @Override public boolean isActive() { return active; } @@ -91,4 +97,9 @@ private void tryWaitFor() { throw new EmbeddedRedisException("Failed to stop redis instance", e); } } + + @Override + public List ports() { + return Arrays.asList(port); + } } diff --git a/src/main/java/redis/embedded/PortProvider.java b/src/main/java/redis/embedded/PortProvider.java new file mode 100644 index 00000000..6d171e7a --- /dev/null +++ b/src/main/java/redis/embedded/PortProvider.java @@ -0,0 +1,8 @@ +package redis.embedded; + +/** + * Created by piotrturek on 29/01/15. + */ +public interface PortProvider { + int next(); +} diff --git a/src/main/java/redis/embedded/Redis.java b/src/main/java/redis/embedded/Redis.java index 9e77a16a..6f58801f 100644 --- a/src/main/java/redis/embedded/Redis.java +++ b/src/main/java/redis/embedded/Redis.java @@ -2,6 +2,8 @@ import redis.embedded.exceptions.EmbeddedRedisException; +import java.util.List; + /** * Created by piotrturek on 22/01/15. */ @@ -11,4 +13,6 @@ public interface Redis { void start() throws EmbeddedRedisException; void stop() throws EmbeddedRedisException; + + List ports(); } diff --git a/src/main/java/redis/embedded/RedisCluster.java b/src/main/java/redis/embedded/RedisCluster.java index 7b2516c8..3b8de358 100644 --- a/src/main/java/redis/embedded/RedisCluster.java +++ b/src/main/java/redis/embedded/RedisCluster.java @@ -1,9 +1,12 @@ package redis.embedded; +import com.google.common.collect.Lists; import redis.embedded.exceptions.EmbeddedRedisException; import java.util.LinkedList; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Created by piotrturek on 22/01/15. @@ -34,6 +37,30 @@ public void stop() throws EmbeddedRedisException { sentinels.parallelStream().forEach(Redis::stop); } + @Override + public List ports() { + return Stream.concat( + sentinels.stream().flatMap(s -> s.ports().stream()), + servers.stream().flatMap(s -> s.ports().stream()) + ).collect(Collectors.toList()); + } + + public List sentinels() { + return Lists.newLinkedList(sentinels); + } + + public List sentinelPorts() { + return sentinels.stream().flatMap(s -> s.ports().stream()).collect(Collectors.toList()); + } + + public List servers() { + return Lists.newLinkedList(servers); + } + + public List serverPorts() { + return servers.stream().flatMap(s -> s.ports().stream()).collect(Collectors.toList()); + } + public static RedisClusterBuilder builder() { return new RedisClusterBuilder(); } diff --git a/src/main/java/redis/embedded/RedisClusterBuilder.java b/src/main/java/redis/embedded/RedisClusterBuilder.java index acc8b5d5..b243f351 100644 --- a/src/main/java/redis/embedded/RedisClusterBuilder.java +++ b/src/main/java/redis/embedded/RedisClusterBuilder.java @@ -1,5 +1,10 @@ package redis.embedded; +import redis.embedded.ports.EphemeralPortProvider; +import redis.embedded.ports.PredefinedPortProvider; +import redis.embedded.ports.SequencePortProvider; + +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.stream.Collectors; @@ -13,8 +18,8 @@ public class RedisClusterBuilder { private RedisServerBuilder serverBuilder = new RedisServerBuilder(); private int sentinelCount = 1; private int quorumSize = 1; - private int currentSentinelPort = 26379; - private int currentReplicationGroupPort = 6379; + private PortProvider sentinelPortProvider = new SequencePortProvider(26379); + private PortProvider replicationGroupPortProvider = new SequencePortProvider(6379); private final List groups = new LinkedList<>(); public RedisClusterBuilder withSentinelBuilder(RedisSentinelBuilder sentinelBuilder) { @@ -27,13 +32,41 @@ public RedisClusterBuilder withServerBuilder(RedisServerBuilder serverBuilder) { return this; } + public RedisClusterBuilder sentinelPorts(Collection ports) { + this.sentinelPortProvider = new PredefinedPortProvider(ports); + this.sentinelCount = ports.size(); + return this; + } + + public RedisClusterBuilder serverPorts(Collection ports) { + this.replicationGroupPortProvider = new PredefinedPortProvider(ports); + return this; + } + + public RedisClusterBuilder ephemeralSentinels() { + this.sentinelPortProvider = new EphemeralPortProvider(); + return this; + } + + public RedisClusterBuilder ephemeralServers() { + this.replicationGroupPortProvider = new EphemeralPortProvider(); + return this; + } + + + public RedisClusterBuilder ephemeral() { + ephemeralSentinels(); + ephemeralServers(); + return this; + } + public RedisClusterBuilder sentinelCount(int sentinelCount) { this.sentinelCount = sentinelCount; return this; } public RedisClusterBuilder sentinelStartingPort(int startingPort) { - this.currentSentinelPort = startingPort; + this.sentinelPortProvider = new SequencePortProvider(startingPort); return this; } @@ -43,7 +76,7 @@ public RedisClusterBuilder quorumSize(int quorumSize) { } public RedisClusterBuilder replicationGroup(String masterName, int slaveCount) { - this.groups.add(new ReplicationGroup(masterName, slaveCount, nextReplicationGroupPort(slaveCount))); + this.groups.add(new ReplicationGroup(masterName, slaveCount, this.replicationGroupPortProvider)); return this; } @@ -99,28 +132,19 @@ private Redis buildSentinel() { } private int nextSentinelPort() { - return currentSentinelPort++; - } - - private int nextReplicationGroupPort(int slaveCount) { - final int toReturn = this.currentReplicationGroupPort; - currentReplicationGroupPort += slaveCount + 1; - return toReturn; + return sentinelPortProvider.next(); } private static class ReplicationGroup { - private final int slaveCount; private final String masterName; private final int masterPort; private final List slavePorts = new LinkedList<>(); - private ReplicationGroup(String masterName, int slaveCount, int portsStart) { + private ReplicationGroup(String masterName, int slaveCount, PortProvider portProvider) { this.masterName = masterName; - this.slaveCount = slaveCount; - masterPort = portsStart; + masterPort = portProvider.next(); while (slaveCount-- > 0) { - portsStart += 1; - slavePorts.add(portsStart); + slavePorts.add(portProvider.next()); } } } diff --git a/src/main/java/redis/embedded/RedisSentinel.java b/src/main/java/redis/embedded/RedisSentinel.java index 82b9ac47..6905100a 100644 --- a/src/main/java/redis/embedded/RedisSentinel.java +++ b/src/main/java/redis/embedded/RedisSentinel.java @@ -9,7 +9,8 @@ public class RedisSentinel extends AbstractRedisInstance { private static final String REDIS_READY_PATTERN = ".*Sentinel runid is.*"; - public RedisSentinel(List args) { + public RedisSentinel(List args, int port) { + super(port); this.args = new ArrayList<>(args); } diff --git a/src/main/java/redis/embedded/RedisSentinelBuilder.java b/src/main/java/redis/embedded/RedisSentinelBuilder.java index 41c73770..50c8e23d 100644 --- a/src/main/java/redis/embedded/RedisSentinelBuilder.java +++ b/src/main/java/redis/embedded/RedisSentinelBuilder.java @@ -105,7 +105,7 @@ public RedisSentinelBuilder setting(String configLine) { public RedisSentinel build() { tryResolveConfAndExec(); List args = buildCommandArgs(); - return new RedisSentinel(args); + return new RedisSentinel(args, port); } private void tryResolveConfAndExec() { diff --git a/src/main/java/redis/embedded/RedisServer.java b/src/main/java/redis/embedded/RedisServer.java index eb115742..c6d45a62 100644 --- a/src/main/java/redis/embedded/RedisServer.java +++ b/src/main/java/redis/embedded/RedisServer.java @@ -17,6 +17,7 @@ public RedisServer() throws IOException { } public RedisServer(Integer port) throws IOException { + super(port); File executable = JarUtil.extractExecutableFromJar(RedisRunScriptEnum.getRedisRunScript()); this.args = Arrays.asList( executable.getAbsolutePath(), @@ -25,13 +26,15 @@ public RedisServer(Integer port) throws IOException { } public RedisServer(File executable, Integer port) { + super(port); this.args = Arrays.asList( executable.getAbsolutePath(), "--port", Integer.toString(port) ); } - RedisServer(List args) { + RedisServer(List args, int port) { + super(port); this.args = new ArrayList<>(args); } diff --git a/src/main/java/redis/embedded/RedisServerBuilder.java b/src/main/java/redis/embedded/RedisServerBuilder.java index 39cb1ce5..a9c6a685 100644 --- a/src/main/java/redis/embedded/RedisServerBuilder.java +++ b/src/main/java/redis/embedded/RedisServerBuilder.java @@ -20,7 +20,7 @@ public class RedisServerBuilder { private static final String CONF_FILENAME = "embedded-redis-server"; private File executable; - private Integer port; + private Integer port = 6379; private InetSocketAddress slaveOf; private String redisConf; @@ -76,7 +76,7 @@ public RedisServerBuilder setting(String configLine) { public RedisServer build() { tryResolveConfAndExec(); List args = buildCommandArgs(); - return new RedisServer(args); + return new RedisServer(args, port); } public void reset() { diff --git a/src/main/java/redis/embedded/ports/EphemeralPortProvider.java b/src/main/java/redis/embedded/ports/EphemeralPortProvider.java new file mode 100644 index 00000000..a02d41ff --- /dev/null +++ b/src/main/java/redis/embedded/ports/EphemeralPortProvider.java @@ -0,0 +1,26 @@ +package redis.embedded.ports; + +import redis.embedded.PortProvider; +import redis.embedded.exceptions.RedisBuildingException; + +import java.io.IOException; +import java.net.ServerSocket; + +/** + * Created by piotrturek on 29/01/15. + */ +public class EphemeralPortProvider implements PortProvider { + @Override + public int next() { + try { + final ServerSocket socket = new ServerSocket(0); + socket.setReuseAddress(false); + int port = socket.getLocalPort(); + socket.close(); + return port; + } catch (IOException e) { + //should not ever happen + throw new RedisBuildingException("Could not provide ephemeral port", e); + } + } +} diff --git a/src/main/java/redis/embedded/ports/PredefinedPortProvider.java b/src/main/java/redis/embedded/ports/PredefinedPortProvider.java new file mode 100644 index 00000000..937be744 --- /dev/null +++ b/src/main/java/redis/embedded/ports/PredefinedPortProvider.java @@ -0,0 +1,30 @@ +package redis.embedded.ports; + +import redis.embedded.PortProvider; +import redis.embedded.exceptions.RedisBuildingException; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** + * Created by piotrturek on 29/01/15. + */ +public class PredefinedPortProvider implements PortProvider { + private final List ports = new LinkedList<>(); + private final Iterator current; + + public PredefinedPortProvider(Collection ports) { + this.ports.addAll(ports); + this.current = this.ports.iterator(); + } + + @Override + public synchronized int next() { + if (!current.hasNext()) { + throw new RedisBuildingException("Run out of Redis ports!"); + } + return current.next(); + } +} diff --git a/src/main/java/redis/embedded/ports/SequencePortProvider.java b/src/main/java/redis/embedded/ports/SequencePortProvider.java new file mode 100644 index 00000000..f6782560 --- /dev/null +++ b/src/main/java/redis/embedded/ports/SequencePortProvider.java @@ -0,0 +1,28 @@ +package redis.embedded.ports; + +import redis.embedded.PortProvider; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created by piotrturek on 29/01/15. + */ +public class SequencePortProvider implements PortProvider { + private AtomicInteger currentPort = new AtomicInteger(26379); + + public SequencePortProvider() { + } + + public SequencePortProvider(int currentPort) { + this.currentPort.set(currentPort); + } + + public void setCurrentPort(int port) { + currentPort.set(port); + } + + @Override + public int next() { + return currentPort.getAndIncrement(); + } +} diff --git a/src/main/java/redis/embedded/util/JedisUtil.java b/src/main/java/redis/embedded/util/JedisUtil.java new file mode 100644 index 00000000..a0085c54 --- /dev/null +++ b/src/main/java/redis/embedded/util/JedisUtil.java @@ -0,0 +1,27 @@ +package redis.embedded.util; + +import redis.embedded.Redis; +import redis.embedded.RedisCluster; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Created by piotrturek on 01/02/15. + */ +public class JedisUtil { + public static Set jedisHosts(Redis redis) { + final List ports = redis.ports(); + return portsToJedisHosts(ports); + } + + public static Set sentinelHosts(RedisCluster cluster) { + final List ports = cluster.sentinelPorts(); + return portsToJedisHosts(ports); + } + + public static Set portsToJedisHosts(List ports) { + return ports.stream().map(p -> "localhost:" + p).collect(Collectors.toSet()); + } +} diff --git a/src/test/java/redis/embedded/RedisClusterTest.java b/src/test/java/redis/embedded/RedisClusterTest.java index 0217e420..9c42b10e 100644 --- a/src/test/java/redis/embedded/RedisClusterTest.java +++ b/src/test/java/redis/embedded/RedisClusterTest.java @@ -5,9 +5,11 @@ import org.junit.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisSentinelPool; +import redis.embedded.util.JedisUtil; import java.util.Arrays; import java.util.List; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.mockito.BDDMockito.given; @@ -156,6 +158,27 @@ public void testSimpleOperationsAfterRunWithTwoSentinelsSingleMasterMultipleSlav } } + @Test + public void testSimpleOperationsAfterRunWithTwoPredefinedSentinelsSingleMasterMultipleSlaves() throws Exception { + //given + List sentinelPorts = Arrays.asList(26381, 26382); + final RedisCluster cluster = RedisCluster.builder().sentinelPorts(sentinelPorts).replicationGroup("ourmaster", 2).build(); + cluster.start(); + final Set sentinelHosts = JedisUtil.portsToJedisHosts(sentinelPorts); + + //when + JedisSentinelPool pool = null; + Jedis jedis = null; + try { + pool = new JedisSentinelPool("ourmaster", sentinelHosts); + jedis = testPool(pool); + } finally { + if (jedis != null) + pool.returnResource(jedis); + cluster.stop(); + } + } + @Test public void testSimpleOperationsAfterRunWithThreeSentinelsThreeMastersOneSlavePerMasterCluster() throws Exception { //given @@ -194,6 +217,45 @@ public void testSimpleOperationsAfterRunWithThreeSentinelsThreeMastersOneSlavePe } } + @Test + public void testSimpleOperationsAfterRunWithThreeSentinelsThreeMastersOneSlavePerMasterEphemeralCluster() throws Exception { + //given + final String master1 = "master1"; + final String master2 = "master2"; + final String master3 = "master3"; + final RedisCluster cluster = RedisCluster.builder().ephemeral().sentinelCount(3).quorumSize(2) + .replicationGroup(master1, 1) + .replicationGroup(master2, 1) + .replicationGroup(master3, 1) + .build(); + cluster.start(); + final Set sentinelHosts = JedisUtil.sentinelHosts(cluster); + + //when + JedisSentinelPool pool1 = null; + JedisSentinelPool pool2 = null; + JedisSentinelPool pool3 = null; + Jedis jedis1 = null; + Jedis jedis2 = null; + Jedis jedis3 = null; + try { + pool1 = new JedisSentinelPool(master1, sentinelHosts); + pool2 = new JedisSentinelPool(master2, sentinelHosts); + pool3 = new JedisSentinelPool(master3, sentinelHosts); + jedis1 = testPool(pool1); + jedis2 = testPool(pool2); + jedis3 = testPool(pool3); + } finally { + if (jedis1 != null) + pool1.returnResource(jedis1); + if (jedis2 != null) + pool2.returnResource(jedis2); + if (jedis3 != null) + pool3.returnResource(jedis3); + cluster.stop(); + } + } + private Jedis testPool(JedisSentinelPool pool) { Jedis jedis; jedis = pool.getResource(); diff --git a/src/test/java/redis/embedded/ports/EphemeralPortProviderTest.java b/src/test/java/redis/embedded/ports/EphemeralPortProviderTest.java new file mode 100644 index 00000000..14022432 --- /dev/null +++ b/src/test/java/redis/embedded/ports/EphemeralPortProviderTest.java @@ -0,0 +1,48 @@ +package redis.embedded.ports; + +import org.junit.Test; +import redis.embedded.exceptions.RedisBuildingException; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; + +public class EphemeralPortProviderTest { + private final ExecutorService executor = Executors.newFixedThreadPool(10); + + @Test + public void nextShouldGiveNextFreeEphemeralPortConcurrently() throws Exception { + //given + final EphemeralPortProvider provider = new EphemeralPortProvider(); + + //when + final List ports = executor.invokeAll( + Stream.generate(() -> (Callable) provider::next).limit(20).collect(Collectors.toList())) + .stream() + .map(this::getResult) + .sorted() + .distinct() + .collect(Collectors.toList()); + + //then + System.out.println(ports); + assertEquals(20, ports.size()); + } + + private int getResult(Future f) { + int ret = 0; + try { + ret = f.get(); + } catch (Exception e) { + throw new RedisBuildingException("failed to deliver", e); + } + return ret; + } + +} \ No newline at end of file diff --git a/src/test/java/redis/embedded/ports/PredefinedPortProviderTest.java b/src/test/java/redis/embedded/ports/PredefinedPortProviderTest.java new file mode 100644 index 00000000..11daa120 --- /dev/null +++ b/src/test/java/redis/embedded/ports/PredefinedPortProviderTest.java @@ -0,0 +1,67 @@ +package redis.embedded.ports; + +import org.junit.Test; +import redis.embedded.exceptions.RedisBuildingException; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; + +public class PredefinedPortProviderTest { + private final ExecutorService executor = Executors.newFixedThreadPool(10); + + @Test + public void nextShouldGiveNextPortFromAssignedListConcurrently() throws Exception { + //given + Collection ports = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + final PredefinedPortProvider provider = new PredefinedPortProvider(ports); + + //when + final List returnedPorts = executor.invokeAll( + Stream.generate(() -> (Callable) provider::next).limit(10).collect(Collectors.toList())) + .stream() + .map(this::getResult) + .sorted() + .distinct() + .collect(Collectors.toList()); + + //then + assertEquals(ports, returnedPorts); + } + + @Test(expected = RedisBuildingException.class) + public void nextShouldThrowExceptionWhenRunOutsOfPorts() throws Exception { + //given + Collection ports = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + final PredefinedPortProvider provider = new PredefinedPortProvider(ports); + + //when + executor.invokeAll( + Stream.generate(() -> (Callable) provider::next).limit(11).collect(Collectors.toList())) + .stream() + .map(this::getResult) + .sorted() + .distinct() + .collect(Collectors.toList()); + + //then exception should be thrown... + } + + private int getResult(Future f) { + int ret = 0; + try { + ret = f.get(); + } catch (Exception e) { + throw new RedisBuildingException("failed to deliver", e); + } + return ret; + } +} \ No newline at end of file diff --git a/src/test/java/redis/embedded/ports/SequencePortProviderTest.java b/src/test/java/redis/embedded/ports/SequencePortProviderTest.java new file mode 100644 index 00000000..79ed120f --- /dev/null +++ b/src/test/java/redis/embedded/ports/SequencePortProviderTest.java @@ -0,0 +1,45 @@ +package redis.embedded.ports; + +import org.junit.Test; + +import java.util.Comparator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; + +public class SequencePortProviderTest { + private final ExecutorService executor = Executors.newFixedThreadPool(10); + + @Test + public void nextShouldIncrementPortsConcurrently() throws Exception { + //given + final int startPort = 10; + final SequencePortProvider provider = new SequencePortProvider(startPort); + + //when + final int max = executor.invokeAll( + Stream.generate(() -> (Callable) provider::next).limit(101).collect(Collectors.toList())) + .stream() + .map(this::getResult) + .max(Comparator.naturalOrder()) + .get(); + + //then + assertEquals(110, max); + } + + private int getResult(Future f) { + int ret = 0; + try { + ret = f.get(); + } catch (Exception e) { + e.printStackTrace(); + } + return ret; + } +} \ No newline at end of file