Skip to content

Commit

Permalink
Change tests logic and split them
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtDu committed May 4, 2023
1 parent 4ea0205 commit 9c4d441
Showing 1 changed file with 142 additions and 50 deletions.
192 changes: 142 additions & 50 deletions src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.tarantool.driver.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -9,25 +10,32 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.TarantoolCartridgeContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.tarantool.driver.TarantoolUtils;
import io.tarantool.driver.api.TarantoolClient;
import io.tarantool.driver.api.TarantoolClientConfig;
import io.tarantool.driver.api.TarantoolClientFactory;
import io.tarantool.driver.api.TarantoolClusterAddressProvider;
import io.tarantool.driver.api.TarantoolResult;
import io.tarantool.driver.api.TarantoolServerAddress;
import io.tarantool.driver.api.conditions.Conditions;
import io.tarantool.driver.api.connection.TarantoolConnection;
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies;
import io.tarantool.driver.api.tuple.TarantoolTuple;
Expand Down Expand Up @@ -87,31 +95,52 @@ private RetryingTarantoolTupleClient setupClusterClient(
.withDelay(delay).build());
}




// TODO: Add parallel threads
// TODO: A lot of routers
// TODO: Parallel round robin and default as test parameter
@Test
void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections()
throws ExecutionException, InterruptedException, IOException {
throws ExecutionException, InterruptedException {


TarantoolCartridgeContainer testContainer = runIndependentContainer();

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> clusterClient =
getTarantoolClusterClientWithDiscovery(2, 5_000);
getTarantoolClusterClientWithDiscovery(testContainer,2, 2_000);

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient1 = getSimpleClient(3301);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient2 = getSimpleClient(3302);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient3 = getSimpleClient(3303);
// 3306 isn't in cluster topology yet
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient4 = getSimpleClient(3306);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient1 = getSimpleClient(testContainer, 3301);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient2 = getSimpleClient(testContainer, 3302);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient3 = getSimpleClient(testContainer, 3303);

int callCounter = 15;
for (int i = 0; i < callCounter; i++) {
int batch = 1500;
int batchPerConnect = batch / 3 / 2; // 1_500 calls on 3 routers on 2 connection == 1_500 / 6 == 250 calls per connect
for (int i = 0; i < batch; i++) {
clusterClient.callForSingleResult(
"simple_long_running_function", Arrays.asList(0, true), Boolean.class).get();
}

String getAllConnectionCalls =
"return box.space.request_counters.index.count:select(0, {iterator = box.index.GT})";
// 15 calls on 3 routers on 2 connection == 15 / 3 == 5 / 2 == 2 or 3 calls per connect
for (TarantoolClient router : Arrays.asList(routerClient1, routerClient2, routerClient3)) {
assertEquals(Arrays.asList(2, 3), getCallCountersPerConnection(getAllConnectionCalls, router));
for (TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> router : Arrays.asList(routerClient1, routerClient2, routerClient3)) {
assertEquals(Arrays.asList(batchPerConnect, batchPerConnect), getCallCountersPerConnection(router));
}
}

@Test
void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections_andRouterJoining()
throws ExecutionException, InterruptedException {

TarantoolCartridgeContainer testContainer = runIndependentContainer();

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> clusterClient =
getTarantoolClusterClientWithDiscovery(testContainer,2, 2_000);

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient1 = getSimpleClient(testContainer, 3301);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient2 = getSimpleClient(testContainer, 3302);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient3 = getSimpleClient(testContainer, 3303);
// 3306 is not in cluster topology yet
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient4 = getSimpleClient(testContainer, 3306);

// add new router
// put 3306 in topology as router
Expand All @@ -124,59 +153,120 @@ void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections()
" }} " +
"cartridge.admin_edit_topology({ replicasets = replicasets }) ").join();

// wait until discovery get topology
Thread.sleep(5_000);

callCounter = 16;
// 16 / 4 / 2 = 2 requests per connect
for (int i = 0; i < callCounter; i++) {
int batch = 2000;
int batchPerConnect = batch / 4 / 2; // 2_000 calls on 4 routers on 2 connection == 2_000 / 8 == 250 calls per connect
for (int i = 0; i < batch; i++) {
clusterClient.callForSingleResult(
"simple_long_running_function", Arrays.asList(0, true), Boolean.class).get();
}

for (TarantoolClient router :
AtomicInteger sum = new AtomicInteger();
for (TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> router :
Arrays.asList(routerClient1, routerClient2, routerClient3)) {
assertEquals(Arrays.asList(4, 5), getCallCountersPerConnection(getAllConnectionCalls, router));
assertEquals(2, getCallCountersPerConnection(router).stream().filter(cnt -> {
sum.addAndGet(cnt);
return cnt > batchPerConnect; // because forth router was in starting stage some time
}).count());
}
assertEquals(2, getCallCountersPerConnection(routerClient4).stream().filter(cnt -> {
sum.addAndGet(cnt);
return 0 < cnt && cnt < batchPerConnect;
}).count());
assertEquals(batch, sum.get());
}

@Test
void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections_andRouterDying()
throws ExecutionException, InterruptedException, IOException {

Object routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4);
assertEquals(Arrays.asList(2, 2), routerCallCounterPerConnection);
TarantoolCartridgeContainer testContainer = runIndependentContainer();

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> clusterClient =
getTarantoolClusterClientWithDiscovery(testContainer,2, 2_000);

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient1 = getSimpleClient(testContainer, 3301);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient2 = getSimpleClient(testContainer, 3302);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient3 = getSimpleClient(testContainer, 3303);
// 3306 is not in cluster topology yet
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient4 = getSimpleClient(testContainer, 3306);

// add new router
// put 3306 in topology as router
routerClient1.eval("cartridge = require('cartridge') " +
"replicasets = { " +
" { " +
" alias = 'app-router-fourth', " +
" roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' }, " +
" join_servers = { { uri = 'localhost:3306' } } " +
" }} " +
"cartridge.admin_edit_topology({ replicasets = replicasets }) ").join();

String healthyCmd = "return cartridge.is_healthy()";

TarantoolUtils.retry(() -> {
try {
assertEquals(true, testContainer.executeCommand(healthyCmd).get().get(0));
} catch (Exception e) {
throw new RuntimeException(e);
}
});

stopInstance("fourth-router");
// wait until discovery get topology
Thread.sleep(5_000);

callCounter = 12;
// 12 / 3 / 2 = 2 requests per connect
for (int i = 0; i < callCounter; i++) {
int batch = 2000;
int batchPerConnect = batch / 4 / 2; // 2_000 calls on 4 routers on 2 connection == 2_000 / 8 == 250 calls per connect
for (int i = 0; i < batch; i++) {
clusterClient.callForSingleResult(
"simple_long_running_function", Arrays.asList(0, true), Boolean.class).get();
}
Thread.sleep(5_000);
for (TarantoolClient router :

AtomicInteger sum = new AtomicInteger();
for (TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> router :
Arrays.asList(routerClient1, routerClient2, routerClient3)) {
assertEquals(Arrays.asList(6, 7), getCallCountersPerConnection(getAllConnectionCalls, router));
assertEquals(2, getCallCountersPerConnection(router).stream().filter(cnt -> {
sum.addAndGet(cnt);
return cnt > batchPerConnect;
}).count());
}
assertEquals(2, getCallCountersPerConnection(routerClient4).stream().filter(cnt -> {
sum.addAndGet(cnt);
return 0 < cnt && cnt < batchPerConnect;
}).count());
assertEquals(batch, sum.get());
}

startCartridge();
private static TarantoolCartridgeContainer runIndependentContainer() {
TarantoolCartridgeContainer container =
new TarantoolCartridgeContainer(
"cartridge/instances.yml",
"cartridge/topology.lua")
.withDirectoryBinding("cartridge")
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(ClusterConnectionIT.class)))
.waitingFor(Wait.forLogMessage(".*Listening HTTP on.*", 4))
.withStartupTimeout(Duration.ofMinutes(2));
container.start();
return container;
}

private static TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> getSimpleClient(Integer port) {
private static TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> getSimpleClient(TarantoolCartridgeContainer testContainer, Integer port) {
return TarantoolClientFactory.createClient()
.withAddress(container.getRouterHost(), container.getMappedPort(port))
.withCredentials(USER_NAME, PASSWORD)
.withAddress(testContainer.getRouterHost(), testContainer.getMappedPort(port))
.withCredentials(testContainer.getUsername(), testContainer.getPassword())
.build();
}

private static TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>>
getTarantoolClusterClientWithDiscovery(
getTarantoolClusterClientWithDiscovery(TarantoolCartridgeContainer testContainer,
int connections, int delay) {
String host = container.getRouterHost();
int port = container.getRouterPort();
String host = testContainer.getRouterHost();
int port = testContainer.getRouterPort();
String username = testContainer.getUsername();
String password = testContainer.getPassword();

TarantoolCredentials credentials = new SimpleTarantoolCredentials(
USER_NAME,
PASSWORD
username,
password
);
TarantoolClientConfig config = TarantoolClientConfig.builder()
.withCredentials(credentials)
Expand All @@ -200,24 +290,26 @@ private static TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>>
clusterDiscoveryConfig);

TarantoolClusterAddressProvider wrapperDiscoveryProvider
= new TestWrappedClusterAddressProvider(discoveryProvider, container); // because we use docker ports
= new TestWrappedClusterAddressProvider(discoveryProvider, testContainer); // because we use docker ports

return TarantoolClientFactory.createClient()
.withAddressProvider(wrapperDiscoveryProvider)
.withCredentials(USER_NAME, PASSWORD)
.withCredentials(username, password)
.withConnections(connections)
.build();
}

@NotNull
private static Object getCallCountersPerConnection(String getAllConnectionCalls, TarantoolClient router) {
List<?> luaResponse = router.eval(getAllConnectionCalls).join();
ArrayList tuples = (ArrayList) luaResponse.get(0); // because lua has multivalue response

Object routerCallCounterPerConnection = tuples.stream()
.map(item -> ((ArrayList) item).get(1))
.collect(Collectors.toList());
return routerCallCounterPerConnection;
private static List<Integer> getCallCountersPerConnection(TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> router)
throws ExecutionException, InterruptedException {
TarantoolResult<TarantoolTuple> tuples = router.space("request_counters")
.select(
Conditions.indexGreaterThan(
"count",
Collections.singletonList(0))).get();
return tuples.stream()
.map(tuple -> tuple.getInteger("count"))
.collect(Collectors.toList());
}

@Test
Expand Down

0 comments on commit 9c4d441

Please sign in to comment.