Skip to content

Commit

Permalink
Add balancing test with discovery and connections
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtDu committed Apr 26, 2023
1 parent 27df177 commit 7f9bdf4
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 77 deletions.
189 changes: 169 additions & 20 deletions src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java
Original file line number Diff line number Diff line change
@@ -1,34 +1,46 @@
package io.tarantool.driver.integration;

import io.tarantool.driver.api.TarantoolClientConfig;
import io.tarantool.driver.api.TarantoolClusterAddressProvider;
import io.tarantool.driver.api.TarantoolServerAddress;
import io.tarantool.driver.api.connection.TarantoolConnection;
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies;
import io.tarantool.driver.auth.SimpleTarantoolCredentials;
import io.tarantool.driver.core.ClusterTarantoolTupleClient;
import io.tarantool.driver.core.ProxyTarantoolTupleClient;
import io.tarantool.driver.core.RetryingTarantoolTupleClient;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.tarantool.driver.api.TarantoolClient;
import io.tarantool.driver.api.TarantoolClientConfig;
import io.tarantool.driver.api.TarantoolClientFactory;
import io.tarantool.driver.api.TarantoolClusterAddressProvider;
import io.tarantool.driver.api.TarantoolResult;
import io.tarantool.driver.api.TarantoolServerAddress;
import io.tarantool.driver.api.connection.TarantoolConnection;
import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies;
import io.tarantool.driver.api.tuple.TarantoolTuple;
import io.tarantool.driver.auth.SimpleTarantoolCredentials;
import io.tarantool.driver.auth.TarantoolCredentials;
import io.tarantool.driver.cluster.BinaryClusterDiscoveryEndpoint;
import io.tarantool.driver.cluster.BinaryDiscoveryClusterAddressProvider;
import io.tarantool.driver.cluster.TarantoolClusterDiscoveryConfig;
import io.tarantool.driver.cluster.TestWrappedClusterAddressProvider;
import io.tarantool.driver.core.ClusterTarantoolTupleClient;
import io.tarantool.driver.core.ProxyTarantoolTupleClient;
import io.tarantool.driver.core.RetryingTarantoolTupleClient;

/**
* @author Alexey Kuzin
* @author Artyom Dubinin
Expand All @@ -49,17 +61,18 @@ public static void setUp() throws TimeoutException {

private TarantoolClientConfig.Builder prepareConfig() {
return TarantoolClientConfig.builder()
.withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD))
.withConnectTimeout(1000)
.withReadTimeout(1000);
.withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD))
.withConnectTimeout(1000)
.withReadTimeout(1000);
}

private RetryingTarantoolTupleClient setupRouterClient(int port, int retries, long delay) {
ClusterTarantoolTupleClient clusterClient = new ClusterTarantoolTupleClient(
prepareConfig().build(), container.getRouterHost(), container.getMappedPort(port));

return new RetryingTarantoolTupleClient(new ProxyTarantoolTupleClient(clusterClient),
TarantoolRequestRetryPolicies.byNumberOfAttempts(retries).withDelay(delay).build());
TarantoolRequestRetryPolicies.byNumberOfAttempts(retries)
.withDelay(delay).build());
}

private RetryingTarantoolTupleClient setupClusterClient(
Expand All @@ -70,7 +83,143 @@ private RetryingTarantoolTupleClient setupClusterClient(

ProxyTarantoolTupleClient client = new ProxyTarantoolTupleClient(clusterClient);
return new RetryingTarantoolTupleClient(client,
TarantoolRequestRetryPolicies.byNumberOfAttempts(retries, e -> true).withDelay(delay).build());
TarantoolRequestRetryPolicies.byNumberOfAttempts(retries, e -> true)
.withDelay(delay).build());
}

@Test
void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections()
throws ExecutionException, InterruptedException, IOException {

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> clusterClient =
getTarantoolClusterClientWithDiscovery(2, 5_000);

TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient1 = getSimpleClient(3301);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient2 = getSimpleClient(3302);
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient3 = getSimpleClient(3303);
// 3306 isn't in cluster topology yet
TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> routerClient4 = getSimpleClient(3306);

int callCounter = 15;
for (int i = 0; i < callCounter; i++) {
clusterClient.callForSingleResult(
"simple_long_running_function", Arrays.asList(0, true), Boolean.class).get();
}

String getAllConnectionCalls =
"return box.space.request_counters.index.count:select(0, {iterator = box.index.GT})";
// 15 calls on 3 routers on 2 connection == 15 / 3 == 5 / 2 == 2 or 3 calls per connect
for (TarantoolClient router : Arrays.asList(routerClient1, routerClient2, routerClient3)) {
assertEquals(Arrays.asList(2, 3), getCallCountersPerConnection(getAllConnectionCalls, router));
}

// add new router
// put 3306 in topology as router
routerClient1.eval("cartridge = require('cartridge') " +
"replicasets = { " +
" { " +
" alias = 'app-router-fourth', " +
" roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' }, " +
" join_servers = { { uri = 'localhost:3306' } } " +
" }} " +
"cartridge.admin_edit_topology({ replicasets = replicasets }) ").join();

// wait until discovery get topology
Thread.sleep(5_000);

callCounter = 16;
// 16 / 4 / 2 = 2 requests per connect
for (int i = 0; i < callCounter; i++) {
clusterClient.callForSingleResult(
"simple_long_running_function", Arrays.asList(0, true), Boolean.class).get();
}

for (TarantoolClient router :
Arrays.asList(routerClient1, routerClient2, routerClient3)) {
assertEquals(Arrays.asList(4, 5), getCallCountersPerConnection(getAllConnectionCalls, router));
}

Object routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4);
assertEquals(Arrays.asList(2, 2), routerCallCounterPerConnection);

String pid = container.execInContainer("pgrep", "-f", "testapp@second-router")
.getStdout().replace("\n", "");
container.execInContainer("kill", "-9", pid);
// wait until discovery get topology
Thread.sleep(5_000);

callCounter = 12;
// 12 / 3 / 2 = 2 requests per connect
for (int i = 0; i < callCounter; i++) {
clusterClient.callForSingleResult(
"simple_long_running_function", Arrays.asList(0, true), Boolean.class).get();
}
Thread.sleep(5_000);
for (TarantoolClient router :
Arrays.asList(routerClient1, routerClient3)) {
assertEquals(Arrays.asList(6, 7), getCallCountersPerConnection(getAllConnectionCalls, router));
}
routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4);
assertEquals(Arrays.asList(4, 4), routerCallCounterPerConnection);
}

private static TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>> getSimpleClient(Integer port) {
return TarantoolClientFactory.createClient()
.withAddress(container.getRouterHost(), container.getMappedPort(port))
.withCredentials(USER_NAME, PASSWORD)
.build();
}

private static TarantoolClient<TarantoolTuple, TarantoolResult<TarantoolTuple>>
getTarantoolClusterClientWithDiscovery(
int connections, int delay) {
String host = container.getRouterHost();
int port = container.getRouterPort();

TarantoolCredentials credentials = new SimpleTarantoolCredentials(
USER_NAME,
PASSWORD
);
TarantoolClientConfig config = TarantoolClientConfig.builder()
.withCredentials(credentials)
.build();

BinaryClusterDiscoveryEndpoint endpoint = new BinaryClusterDiscoveryEndpoint.Builder()
.withClientConfig(config)
.withEntryFunction("get_routers")
.withEndpointProvider(() -> Collections.singletonList(
new TarantoolServerAddress(
host, port
)))
.build();

TarantoolClusterDiscoveryConfig clusterDiscoveryConfig = new TarantoolClusterDiscoveryConfig.Builder()
.withDelay(delay)
.withEndpoint(endpoint)
.build();

BinaryDiscoveryClusterAddressProvider discoveryProvider = new BinaryDiscoveryClusterAddressProvider(
clusterDiscoveryConfig);

TarantoolClusterAddressProvider wrapperDiscoveryProvider
= new TestWrappedClusterAddressProvider(discoveryProvider, container); // because we use docker ports

return TarantoolClientFactory.createClient()
.withAddressProvider(wrapperDiscoveryProvider)
.withCredentials(USER_NAME, PASSWORD)
.withConnections(connections)
.build();
}

@NotNull
private static Object getCallCountersPerConnection(String getAllConnectionCalls, TarantoolClient router) {
List<?> luaResponse = router.eval(getAllConnectionCalls).join();
ArrayList tuples = (ArrayList) luaResponse.get(0); // because lua has multivalue response

Object routerCallCounterPerConnection = tuples.stream()
.map(item -> ((ArrayList) item).get(1))
.collect(Collectors.toList());
return routerCallCounterPerConnection;
}

@Test
Expand Down
67 changes: 10 additions & 57 deletions src/test/resources/cartridge/app/roles/api_router.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
local vshard = require('vshard')
local cartridge_rpc = require('cartridge.rpc')
local fiber = require('fiber')
local crud = require('crud')
local uuid = require('uuid')
local log = require('log')

local metadata_utils = require('utils.metadata')
local crud_utils = require('utils.crud')
local counter = require('modules.counter')

local function get_schema()
for _, instance_uri in pairs(cartridge_rpc.get_candidates('app.roles.api_storage', { leader_only = true })) do
Expand Down Expand Up @@ -100,51 +101,6 @@ local function raising_error()
error("Test error: raising_error() called")
end

local function reset_request_counters()
box.space.request_counters:replace({ 1, 0 })
end

local function get_router_name()
return string.sub(box.cfg.custom_proc_title, 9)
end

local function simple_long_running_function(seconds_to_sleep)
fiber.sleep(seconds_to_sleep)
return true
end

local function long_running_function(values)
local seconds_to_sleep = 0
local disabled_router_name = ""
if values ~= nil then
if type(values) == "table" then
values = values or {}
seconds_to_sleep = values[1]
disabled_router_name = values[2]
else
seconds_to_sleep = values
end
end

-- need using number instead field name as string in update function for compatibility with tarantool 1.10
box.space.request_counters:update(1, { { '+', 2, 1 } })
log.info('Executing long-running function ' ..
tostring(box.space.request_counters:get(1)[2]) ..
"(name: " .. disabled_router_name ..
"; sleep: " .. seconds_to_sleep .. ")")
if get_router_name() == disabled_router_name then
return nil, "Disabled by client; router_name = " .. disabled_router_name
end
if seconds_to_sleep then
fiber.sleep(seconds_to_sleep)
end
return true
end

local function get_request_count()
return box.space.request_counters:get(1)[2]
end

-- it's like vshard error throwing
local function box_error_unpack_no_connection()
return nil, box.error.new(box.error.NO_CONNECTION):unpack()
Expand Down Expand Up @@ -202,12 +158,6 @@ local function select_router_space()
end

local function init_router_spaces()
local request_counters = box.schema.space.create('request_counters', {
format = { { 'id', 'unsigned' }, { 'count', 'unsigned' } },
if_not_exists = true
})
request_counters:create_index('id', { parts = { 'id' }, if_not_exists = true })

local router_space = box.schema.space.create('router_space', {
format = { { 'id', 'unsigned' } },
if_not_exists = true
Expand All @@ -220,6 +170,7 @@ end
local function init(opts)
if opts.is_master then
init_router_spaces()
counter.init_counter_space()
end
patch_crud_methods_for_tests()

Expand All @@ -235,11 +186,13 @@ local function init(opts)
rawset(_G, 'retrying_function', retrying_function)
rawset(_G, 'raising_error', raising_error)

rawset(_G, 'reset_request_counters', reset_request_counters)
rawset(_G, 'get_router_name', get_router_name)
rawset(_G, 'long_running_function', long_running_function)
rawset(_G, 'simple_long_running_function', simple_long_running_function)
rawset(_G, 'get_request_count', get_request_count)
rawset(_G, 'get_router_name', metadata_utils.get_router_name)

rawset(_G, 'reset_request_counters', counter.reset_request_counters)
rawset(_G, 'simple_long_running_function', counter.simple_long_running_function)
rawset(_G, 'long_running_function', counter.long_running_function)
rawset(_G, 'get_request_count', counter.get_request_count)

rawset(_G, 'box_error_unpack_no_connection', box_error_unpack_no_connection)
rawset(_G, 'box_error_unpack_timeout', box_error_unpack_timeout)
rawset(_G, 'box_error_timeout', box_error_timeout)
Expand Down
8 changes: 8 additions & 0 deletions src/test/resources/cartridge/app/roles/custom.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
local cartridge = require('cartridge')

local counter = require('modules.counter')

function get_routers()
local function table_contains(table, element)
for _, value in pairs(table) do
Expand Down Expand Up @@ -44,10 +46,16 @@ local function init(opts)
-- luacheck: no unused args
if opts.is_master then
box.schema.user.grant('guest', 'read,write', 'universe', nil, { if_not_exists = true })
counter.init_counter_space()
end

init_httpd()

rawset(_G, 'reset_request_counters', counter.reset_request_counters)
rawset(_G, 'simple_long_running_function', counter.simple_long_running_function)
rawset(_G, 'long_running_function', counter.long_running_function)
rawset(_G, 'get_request_count', counter.get_request_count)

return true
end

Expand Down
5 changes: 5 additions & 0 deletions src/test/resources/cartridge/instances.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ testapp.third-router:
advertise_uri: localhost:3303
http_port: 8083

testapp.fourth-router:
workdir: ./tmp/db_dev/3306
advertise_uri: localhost:3306
http_port: 8086

testapp.s1-storage:
workdir: ./tmp/db_dev/3304
advertise_uri: localhost:3304
Expand Down
Loading

0 comments on commit 7f9bdf4

Please sign in to comment.