Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add balancing test with discovery and connections #387

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package io.tarantool.driver.integration;

import java.time.Duration;
import java.util.HashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.TarantoolCartridgeContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;

import java.time.Duration;
import java.util.HashMap;

abstract class CartridgeMixedInstancesContainer {

private static final Logger logger = LoggerFactory.getLogger(CartridgeMixedInstancesContainer.class);

protected static final TarantoolCartridgeContainer container;

static {
final HashMap<String, String> buildArgs = new HashMap<>();
buildArgs.put("TARANTOOL_INSTANCES_FILE", "./instances_mixed.yml");
container = new TarantoolCartridgeContainer(
"cartridge/instances_mixed.yml",
"cartridge/topology_mixed.lua", buildArgs)
.withDirectoryBinding("cartridge")
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forLogMessage(".*Listening HTTP on.*", 3))
.withStartupTimeout(Duration.ofMinutes(2));
final HashMap<String, String> env = new HashMap<>();
env.put("TARANTOOL_INSTANCES_FILE", "./instances_mixed.yml");
container = new TarantoolCartridgeContainer("cartridge/instances_mixed.yml",
"cartridge/topology_mixed.lua")
.withDirectoryBinding("cartridge")
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forLogMessage(".*Listening HTTP on.*", 3))
.withStartupTimeout(Duration.ofMinutes(2))
.withEnv(env);
}

protected static void startCluster() {
Expand Down
187 changes: 167 additions & 20 deletions src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java
Original file line number Diff line number Diff line change
@@ -1,34 +1,46 @@
package io.tarantool.driver.integration;

import io.tarantool.driver.api.TarantoolClientConfig;
import io.tarantool.driver.api.TarantoolClusterAddressProvider;
import io.tarantool.driver.api.TarantoolServerAddress;
import io.tarantool.driver.api.connection.TarantoolConnection;
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies;
import io.tarantool.driver.auth.SimpleTarantoolCredentials;
import io.tarantool.driver.core.ClusterTarantoolTupleClient;
import io.tarantool.driver.core.ProxyTarantoolTupleClient;
import io.tarantool.driver.core.RetryingTarantoolTupleClient;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.tarantool.driver.api.TarantoolClient;
import io.tarantool.driver.api.TarantoolClientConfig;
import io.tarantool.driver.api.TarantoolClientFactory;
import io.tarantool.driver.api.TarantoolClusterAddressProvider;
import io.tarantool.driver.api.TarantoolResult;
import io.tarantool.driver.api.TarantoolServerAddress;
import io.tarantool.driver.api.connection.TarantoolConnection;
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies;
import io.tarantool.driver.api.tuple.TarantoolTuple;
import io.tarantool.driver.auth.SimpleTarantoolCredentials;
import io.tarantool.driver.auth.TarantoolCredentials;
import io.tarantool.driver.cluster.BinaryClusterDiscoveryEndpoint;
import io.tarantool.driver.cluster.BinaryDiscoveryClusterAddressProvider;
import io.tarantool.driver.cluster.TarantoolClusterDiscoveryConfig;
import io.tarantool.driver.cluster.TestWrappedClusterAddressProvider;
import io.tarantool.driver.core.ClusterTarantoolTupleClient;
import io.tarantool.driver.core.ProxyTarantoolTupleClient;
import io.tarantool.driver.core.RetryingTarantoolTupleClient;

/**
* @author Alexey Kuzin
* @author Artyom Dubinin
Expand All @@ -49,17 +61,18 @@ public static void setUp() throws TimeoutException {

private TarantoolClientConfig.Builder prepareConfig() {
return TarantoolClientConfig.builder()
.withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD))
.withConnectTimeout(1000)
.withReadTimeout(1000);
.withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD))
.withConnectTimeout(1000)
.withReadTimeout(1000);
}

private RetryingTarantoolTupleClient setupRouterClient(int port, int retries, long delay) {
ClusterTarantoolTupleClient clusterClient = new ClusterTarantoolTupleClient(
prepareConfig().build(), container.getRouterHost(), container.getMappedPort(port));

return new RetryingTarantoolTupleClient(new ProxyTarantoolTupleClient(clusterClient),
TarantoolRequestRetryPolicies.byNumberOfAttempts(retries).withDelay(delay).build());
TarantoolRequestRetryPolicies.byNumberOfAttempts(retries)
.withDelay(delay).build());
}

private RetryingTarantoolTupleClient setupClusterClient(
Expand All @@ -70,7 +83,141 @@ private RetryingTarantoolTupleClient setupClusterClient(

ProxyTarantoolTupleClient client = new ProxyTarantoolTupleClient(clusterClient);
return new RetryingTarantoolTupleClient(client,
TarantoolRequestRetryPolicies.byNumberOfAttempts(retries, e -> true).withDelay(delay).build());
TarantoolRequestRetryPolicies.byNumberOfAttempts(retries, e -> true)
.withDelay(delay).build());
}

@Test
void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections()
throws ExecutionException, InterruptedException, IOException {

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> clusterClient =
getTarantoolClusterClientWithDiscovery(2, 5_000);

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient1 = getSimpleClient(3301);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient2 = getSimpleClient(3302);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient3 = getSimpleClient(3303);
// 3306 isn't in cluster topology yet
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient4 = getSimpleClient(3306);

int callCounter = 15;
for (int i = 0; i < callCounter; i++) {
clusterClient.callForSingleResult(
"simple_long_running_function", Arrays.asList(0, true), Boolean.class).get();
}

String getAllConnectionCalls =
"return box.space.request_counters.index.count:select(0, {iterator = box.index.GT})";
// 15 calls on 3 routers on 2 connection == 15 / 3 == 5 / 2 == 2 or 3 calls per connect
for (TarantoolClient router : Arrays.asList(routerClient1, routerClient2, routerClient3)) {
assertEquals(Arrays.asList(2, 3), getCallCountersPerConnection(getAllConnectionCalls, router));
}

// add new router
// put 3306 in topology as router
routerClient1.eval("cartridge = require('cartridge') " +
"replicasets = { " +
" { " +
" alias = 'app-router-fourth', " +
" roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' }, " +
" join_servers = { { uri = 'localhost:3306' } } " +
" }} " +
"cartridge.admin_edit_topology({ replicasets = replicasets }) ").join();

// wait until discovery get topology
Thread.sleep(5_000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems highly unstable on different machines. Please use polling (until the cluster gets the proper state) over sleeping.


callCounter = 16;
// 16 / 4 / 2 = 2 requests per connect
for (int i = 0; i < callCounter; i++) {
clusterClient.callForSingleResult(
"simple_long_running_function", Arrays.asList(0, true), Boolean.class).get();
}

for (TarantoolClient router :
Arrays.asList(routerClient1, routerClient2, routerClient3)) {
assertEquals(Arrays.asList(4, 5), getCallCountersPerConnection(getAllConnectionCalls, router));
}

Object routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: Why is it a raw Object?

assertEquals(Arrays.asList(2, 2), routerCallCounterPerConnection);

stopInstance("fourth-router");
// wait until discovery get topology
Thread.sleep(5_000);

callCounter = 12;
// 12 / 3 / 2 = 2 requests per connect
for (int i = 0; i < callCounter; i++) {
clusterClient.callForSingleResult(
"simple_long_running_function", Arrays.asList(0, true), Boolean.class).get();
}
Thread.sleep(5_000);
for (TarantoolClient router :
Arrays.asList(routerClient1, routerClient2, routerClient3)) {
assertEquals(Arrays.asList(6, 7), getCallCountersPerConnection(getAllConnectionCalls, router));
}

startCartridge();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this call do?

}

private static TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> getSimpleClient(Integer port) {
return TarantoolClientFactory.createClient()
.withAddress(container.getRouterHost(), container.getMappedPort(port))
.withCredentials(USER_NAME, PASSWORD)
.build();
}

private static TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>>
getTarantoolClusterClientWithDiscovery(
int connections, int delay) {
String host = container.getRouterHost();
int port = container.getRouterPort();

TarantoolCredentials credentials = new SimpleTarantoolCredentials(
USER_NAME,
PASSWORD
);
TarantoolClientConfig config = TarantoolClientConfig.builder()
.withCredentials(credentials)
.build();

BinaryClusterDiscoveryEndpoint endpoint = new BinaryClusterDiscoveryEndpoint.Builder()
.withClientConfig(config)
.withEntryFunction("get_routers")
.withEndpointProvider(() -> Collections.singletonList(
new TarantoolServerAddress(
host, port
)))
.build();

TarantoolClusterDiscoveryConfig clusterDiscoveryConfig = new TarantoolClusterDiscoveryConfig.Builder()
.withDelay(delay)
.withEndpoint(endpoint)
.build();

BinaryDiscoveryClusterAddressProvider discoveryProvider = new BinaryDiscoveryClusterAddressProvider(
clusterDiscoveryConfig);

TarantoolClusterAddressProvider wrapperDiscoveryProvider
= new TestWrappedClusterAddressProvider(discoveryProvider, container); // because we use docker ports

return TarantoolClientFactory.createClient()
.withAddressProvider(wrapperDiscoveryProvider)
.withCredentials(USER_NAME, PASSWORD)
.withConnections(connections)
.build();
}

@NotNull
private static Object getCallCountersPerConnection(String getAllConnectionCalls, TarantoolClient router) {
List<?> luaResponse = router.eval(getAllConnectionCalls).join();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't it a call or better callForSingleResult/callForMultiResult?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are showing the readers/users a bad code that they will copy to their projects

ArrayList tuples = (ArrayList) luaResponse.get(0); // because lua has multivalue response

Object routerCallCounterPerConnection = tuples.stream()
.map(item -> ((ArrayList) item).get(1))
.collect(Collectors.toList());
return routerCallCounterPerConnection;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void httpClusterDiscovererTest() throws TarantoolClientException {
HTTPDiscoveryClusterAddressProvider addressProvider = getHttpProvider();
Collection<TarantoolServerAddress> nodes = addressProvider.getAddresses();

assertEquals(nodes.size(), 3);
assertEquals(nodes.size(), 4);
Set<TarantoolServerAddress> nodeSet = new HashSet<>(nodes);
assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER1_URI)));
assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER2_URI)));
Expand All @@ -70,7 +70,7 @@ public void binaryClusterDiscovererTest() {
TarantoolClusterAddressProvider addressProvider = getBinaryProvider();

Collection<TarantoolServerAddress> nodes = addressProvider.getAddresses();
assertEquals(nodes.size(), 3);
assertEquals(nodes.size(), 4);
Set<TarantoolServerAddress> nodeSet = new HashSet<>(nodes);
assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER1_URI)));
assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER2_URI)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -245,6 +246,8 @@ void test_should_closeConnections_ifAddressProviderReturnsNewAddresses() throws
// restart routers for resetting connections
stopInstances(Arrays.asList("router", "second-router"));
startCartridge();
String status = container.execInContainer("cartridge", "status", "--run-dir=/tmp/run").getStderr();
assertEquals(6, StringUtils.countMatches(status, "RUNNING"));

final TarantoolServerAddress firstAddress =
new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3301));
Expand Down Expand Up @@ -298,7 +301,7 @@ public void setRefreshCallback(Runnable runnable) {
numberOfSwitching.incrementAndGet();
runnable.run();
}
}, 500, 100, TimeUnit.MILLISECONDS);
}, 0, 100, TimeUnit.MILLISECONDS);
}
}).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ protected static void startInstance(String instanceName) throws IOException, Int
}

protected static void stopInstance(String instanceName) throws IOException, InterruptedException {
container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", "--data-dir=/tmp/data", instanceName);
container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", instanceName);
}
}
Loading