diff --git a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java index 2fb5e9dde..4617924ab 100644 --- a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java +++ b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java @@ -1,34 +1,46 @@ package io.tarantool.driver.integration; -import io.tarantool.driver.api.TarantoolClientConfig; -import io.tarantool.driver.api.TarantoolClusterAddressProvider; -import io.tarantool.driver.api.TarantoolServerAddress; -import io.tarantool.driver.api.connection.TarantoolConnection; -import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies; -import io.tarantool.driver.auth.SimpleTarantoolCredentials; -import io.tarantool.driver.core.ClusterTarantoolTupleClient; -import io.tarantool.driver.core.ProxyTarantoolTupleClient; -import io.tarantool.driver.core.RetryingTarantoolTupleClient; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.rnorth.ducttape.unreliables.Unreliables; -import org.testcontainers.containers.Container; -import org.testcontainers.junit.jupiter.Testcontainers; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.tarantool.driver.api.TarantoolClient; +import io.tarantool.driver.api.TarantoolClientConfig; +import io.tarantool.driver.api.TarantoolClientFactory; +import io.tarantool.driver.api.TarantoolClusterAddressProvider; +import io.tarantool.driver.api.TarantoolResult; +import io.tarantool.driver.api.TarantoolServerAddress; +import io.tarantool.driver.api.connection.TarantoolConnection; +import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies; +import io.tarantool.driver.api.tuple.TarantoolTuple; +import io.tarantool.driver.auth.SimpleTarantoolCredentials; +import io.tarantool.driver.auth.TarantoolCredentials; +import io.tarantool.driver.cluster.BinaryClusterDiscoveryEndpoint; +import io.tarantool.driver.cluster.BinaryDiscoveryClusterAddressProvider; +import io.tarantool.driver.cluster.TarantoolClusterDiscoveryConfig; +import io.tarantool.driver.cluster.TestWrappedClusterAddressProvider; +import io.tarantool.driver.core.ClusterTarantoolTupleClient; +import io.tarantool.driver.core.ProxyTarantoolTupleClient; +import io.tarantool.driver.core.RetryingTarantoolTupleClient; + /** * @author Alexey Kuzin * @author Artyom Dubinin @@ -49,9 +61,9 @@ public static void setUp() throws TimeoutException { private TarantoolClientConfig.Builder prepareConfig() { return TarantoolClientConfig.builder() - .withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD)) - .withConnectTimeout(1000) - .withReadTimeout(1000); + .withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD)) + .withConnectTimeout(1000) + .withReadTimeout(1000); } private RetryingTarantoolTupleClient setupRouterClient(int port, int retries, long delay) { @@ -59,7 +71,8 @@ private RetryingTarantoolTupleClient setupRouterClient(int port, int retries, lo prepareConfig().build(), container.getRouterHost(), container.getMappedPort(port)); return new RetryingTarantoolTupleClient(new ProxyTarantoolTupleClient(clusterClient), - TarantoolRequestRetryPolicies.byNumberOfAttempts(retries).withDelay(delay).build()); + TarantoolRequestRetryPolicies.byNumberOfAttempts(retries) + .withDelay(delay).build()); } private RetryingTarantoolTupleClient setupClusterClient( @@ -70,7 +83,143 @@ private RetryingTarantoolTupleClient setupClusterClient( ProxyTarantoolTupleClient client = new ProxyTarantoolTupleClient(clusterClient); return new RetryingTarantoolTupleClient(client, - TarantoolRequestRetryPolicies.byNumberOfAttempts(retries, e -> true).withDelay(delay).build()); + TarantoolRequestRetryPolicies.byNumberOfAttempts(retries, e -> true) + .withDelay(delay).build()); + } + + @Test + void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections() + throws ExecutionException, InterruptedException, IOException { + + TarantoolClient> clusterClient = + getTarantoolClusterClientWithDiscovery(2, 5_000); + + TarantoolClient> routerClient1 = getSimpleClient(3301); + TarantoolClient> routerClient2 = getSimpleClient(3302); + TarantoolClient> routerClient3 = getSimpleClient(3303); + // 3306 isn't in cluster topology yet + TarantoolClient> routerClient4 = getSimpleClient(3306); + + int callCounter = 15; + for (int i = 0; i < callCounter; i++) { + clusterClient.callForSingleResult( + "simple_long_running_function", Arrays.asList(0, true), Boolean.class).get(); + } + + String getAllConnectionCalls = + "return box.space.request_counters.index.count:select(0, {iterator = box.index.GT})"; + // 15 calls on 3 routers on 2 connection == 15 / 3 == 5 / 2 == 2 or 3 calls per connect + for (TarantoolClient router : Arrays.asList(routerClient1, routerClient2, routerClient3)) { + assertEquals(Arrays.asList(2, 3), getCallCountersPerConnection(getAllConnectionCalls, router)); + } + + // add new router + // put 3306 in topology as router + routerClient1.eval("cartridge = require('cartridge') " + + "replicasets = { " + + " { " + + " alias = 'app-router-fourth', " + + " roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' }, " + + " join_servers = { { uri = 'localhost:3306' } } " + + " }} " + + "cartridge.admin_edit_topology({ replicasets = replicasets }) ").join(); + + // wait until discovery get topology + Thread.sleep(5_000); + + callCounter = 16; + // 16 / 4 / 2 = 2 requests per connect + for (int i = 0; i < callCounter; i++) { + clusterClient.callForSingleResult( + "simple_long_running_function", Arrays.asList(0, true), Boolean.class).get(); + } + + for (TarantoolClient router : + Arrays.asList(routerClient1, routerClient2, routerClient3)) { + assertEquals(Arrays.asList(4, 5), getCallCountersPerConnection(getAllConnectionCalls, router)); + } + + Object routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4); + assertEquals(Arrays.asList(2, 2), routerCallCounterPerConnection); + + String pid = container.execInContainer("pgrep", "-f", "testapp@second-router") + .getStdout().replace("\n", ""); + container.execInContainer("kill", "-9", pid); + // wait until discovery get topology + Thread.sleep(5_000); + + callCounter = 12; + // 12 / 3 / 2 = 2 requests per connect + for (int i = 0; i < callCounter; i++) { + clusterClient.callForSingleResult( + "simple_long_running_function", Arrays.asList(0, true), Boolean.class).get(); + } + Thread.sleep(5_000); + for (TarantoolClient router : + Arrays.asList(routerClient1, routerClient3)) { + assertEquals(Arrays.asList(6, 7), getCallCountersPerConnection(getAllConnectionCalls, router)); + } + routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4); + assertEquals(Arrays.asList(4, 4), routerCallCounterPerConnection); + } + + private static TarantoolClient> getSimpleClient(Integer port) { + return TarantoolClientFactory.createClient() + .withAddress(container.getRouterHost(), container.getMappedPort(port)) + .withCredentials(USER_NAME, PASSWORD) + .build(); + } + + private static TarantoolClient> + getTarantoolClusterClientWithDiscovery( + int connections, int delay) { + String host = container.getRouterHost(); + int port = container.getRouterPort(); + + TarantoolCredentials credentials = new SimpleTarantoolCredentials( + USER_NAME, + PASSWORD + ); + TarantoolClientConfig config = TarantoolClientConfig.builder() + .withCredentials(credentials) + .build(); + + BinaryClusterDiscoveryEndpoint endpoint = new BinaryClusterDiscoveryEndpoint.Builder() + .withClientConfig(config) + .withEntryFunction("get_routers") + .withEndpointProvider(() -> Collections.singletonList( + new TarantoolServerAddress( + host, port + ))) + .build(); + + TarantoolClusterDiscoveryConfig clusterDiscoveryConfig = new TarantoolClusterDiscoveryConfig.Builder() + .withDelay(delay) + .withEndpoint(endpoint) + .build(); + + BinaryDiscoveryClusterAddressProvider discoveryProvider = new BinaryDiscoveryClusterAddressProvider( + clusterDiscoveryConfig); + + TarantoolClusterAddressProvider wrapperDiscoveryProvider + = new TestWrappedClusterAddressProvider(discoveryProvider, container); // because we use docker ports + + return TarantoolClientFactory.createClient() + .withAddressProvider(wrapperDiscoveryProvider) + .withCredentials(USER_NAME, PASSWORD) + .withConnections(connections) + .build(); + } + + @NotNull + private static Object getCallCountersPerConnection(String getAllConnectionCalls, TarantoolClient router) { + List luaResponse = router.eval(getAllConnectionCalls).join(); + ArrayList tuples = (ArrayList) luaResponse.get(0); // because lua has multivalue response + + Object routerCallCounterPerConnection = tuples.stream() + .map(item -> ((ArrayList) item).get(1)) + .collect(Collectors.toList()); + return routerCallCounterPerConnection; } @Test diff --git a/src/test/resources/cartridge/app/roles/api_router.lua b/src/test/resources/cartridge/app/roles/api_router.lua index 25f785214..7963a371a 100644 --- a/src/test/resources/cartridge/app/roles/api_router.lua +++ b/src/test/resources/cartridge/app/roles/api_router.lua @@ -1,11 +1,12 @@ local vshard = require('vshard') local cartridge_rpc = require('cartridge.rpc') -local fiber = require('fiber') local crud = require('crud') local uuid = require('uuid') local log = require('log') +local metadata_utils = require('utils.metadata') local crud_utils = require('utils.crud') +local counter = require('modules.counter') local function get_schema() for _, instance_uri in pairs(cartridge_rpc.get_candidates('app.roles.api_storage', { leader_only = true })) do @@ -100,51 +101,6 @@ local function raising_error() error("Test error: raising_error() called") end -local function reset_request_counters() - box.space.request_counters:replace({ 1, 0 }) -end - -local function get_router_name() - return string.sub(box.cfg.custom_proc_title, 9) -end - -local function simple_long_running_function(seconds_to_sleep) - fiber.sleep(seconds_to_sleep) - return true -end - -local function long_running_function(values) - local seconds_to_sleep = 0 - local disabled_router_name = "" - if values ~= nil then - if type(values) == "table" then - values = values or {} - seconds_to_sleep = values[1] - disabled_router_name = values[2] - else - seconds_to_sleep = values - end - end - - -- need using number instead field name as string in update function for compatibility with tarantool 1.10 - box.space.request_counters:update(1, { { '+', 2, 1 } }) - log.info('Executing long-running function ' .. - tostring(box.space.request_counters:get(1)[2]) .. - "(name: " .. disabled_router_name .. - "; sleep: " .. seconds_to_sleep .. ")") - if get_router_name() == disabled_router_name then - return nil, "Disabled by client; router_name = " .. disabled_router_name - end - if seconds_to_sleep then - fiber.sleep(seconds_to_sleep) - end - return true -end - -local function get_request_count() - return box.space.request_counters:get(1)[2] -end - -- it's like vshard error throwing local function box_error_unpack_no_connection() return nil, box.error.new(box.error.NO_CONNECTION):unpack() @@ -202,12 +158,6 @@ local function select_router_space() end local function init_router_spaces() - local request_counters = box.schema.space.create('request_counters', { - format = { { 'id', 'unsigned' }, { 'count', 'unsigned' } }, - if_not_exists = true - }) - request_counters:create_index('id', { parts = { 'id' }, if_not_exists = true }) - local router_space = box.schema.space.create('router_space', { format = { { 'id', 'unsigned' } }, if_not_exists = true @@ -220,6 +170,7 @@ end local function init(opts) if opts.is_master then init_router_spaces() + counter.init_counter_space() end patch_crud_methods_for_tests() @@ -235,11 +186,13 @@ local function init(opts) rawset(_G, 'retrying_function', retrying_function) rawset(_G, 'raising_error', raising_error) - rawset(_G, 'reset_request_counters', reset_request_counters) - rawset(_G, 'get_router_name', get_router_name) - rawset(_G, 'long_running_function', long_running_function) - rawset(_G, 'simple_long_running_function', simple_long_running_function) - rawset(_G, 'get_request_count', get_request_count) + rawset(_G, 'get_router_name', metadata_utils.get_router_name) + + rawset(_G, 'reset_request_counters', counter.reset_request_counters) + rawset(_G, 'simple_long_running_function', counter.simple_long_running_function) + rawset(_G, 'long_running_function', counter.long_running_function) + rawset(_G, 'get_request_count', counter.get_request_count) + rawset(_G, 'box_error_unpack_no_connection', box_error_unpack_no_connection) rawset(_G, 'box_error_unpack_timeout', box_error_unpack_timeout) rawset(_G, 'box_error_timeout', box_error_timeout) diff --git a/src/test/resources/cartridge/app/roles/custom.lua b/src/test/resources/cartridge/app/roles/custom.lua index 680692747..cfd04e6cd 100644 --- a/src/test/resources/cartridge/app/roles/custom.lua +++ b/src/test/resources/cartridge/app/roles/custom.lua @@ -1,5 +1,7 @@ local cartridge = require('cartridge') +local counter = require('modules.counter') + function get_routers() local function table_contains(table, element) for _, value in pairs(table) do @@ -44,10 +46,16 @@ local function init(opts) -- luacheck: no unused args if opts.is_master then box.schema.user.grant('guest', 'read,write', 'universe', nil, { if_not_exists = true }) + counter.init_counter_space() end init_httpd() + rawset(_G, 'reset_request_counters', counter.reset_request_counters) + rawset(_G, 'simple_long_running_function', counter.simple_long_running_function) + rawset(_G, 'long_running_function', counter.long_running_function) + rawset(_G, 'get_request_count', counter.get_request_count) + return true end diff --git a/src/test/resources/cartridge/instances.yml b/src/test/resources/cartridge/instances.yml index 2a802a6dd..42d94ecaf 100644 --- a/src/test/resources/cartridge/instances.yml +++ b/src/test/resources/cartridge/instances.yml @@ -13,6 +13,11 @@ testapp.third-router: advertise_uri: localhost:3303 http_port: 8083 +testapp.fourth-router: + workdir: ./tmp/db_dev/3306 + advertise_uri: localhost:3306 + http_port: 8086 + testapp.s1-storage: workdir: ./tmp/db_dev/3304 advertise_uri: localhost:3304 diff --git a/src/test/resources/cartridge/modules/counter.lua b/src/test/resources/cartridge/modules/counter.lua new file mode 100644 index 000000000..5bcefeccd --- /dev/null +++ b/src/test/resources/cartridge/modules/counter.lua @@ -0,0 +1,78 @@ +local fiber = require('fiber') +local log = require('log') +local metadata_utils = require('utils.metadata') + +local function reset_request_counters() + box.space.request_counters:replace({ 1, 0 }) +end + +local function update_request_counters(with_session_id) + with_session_id = with_session_id or false + -- need using number instead field name as string in update function for compatibility with tarantool 1.10 + if with_session_id then + box.space.request_counters:update(box.session.id(), { { '+', 2, 1 } }) + else + box.space.request_counters:update(1, { { '+', 2, 1 } }) + end +end + +local function get_request_count() + return box.space.request_counters:get(1)[2] +end + +local function simple_long_running_function(seconds_to_sleep, with_session_id) + update_request_counters(with_session_id) + fiber.sleep(seconds_to_sleep) + return true +end + +local function long_running_function(values, with_session_id) + local seconds_to_sleep = 0 + local disabled_router_name = "" + if values ~= nil then + if type(values) == "table" then + values = values or {} + seconds_to_sleep = values[1] + disabled_router_name = values[2] + else + seconds_to_sleep = values + end + end + + update_request_counters(with_session_id) + log.info('Executing long-running function ' .. + tostring(box.space.request_counters:get(1)[2]) .. + "(name: " .. disabled_router_name .. + "; sleep: " .. seconds_to_sleep .. ")") + if metadata_utils.get_router_name() == disabled_router_name then + return nil, "Disabled by client; router_name = " .. disabled_router_name + end + if seconds_to_sleep then + fiber.sleep(seconds_to_sleep) + end + return true +end + +local function reset_request_counters_on_connect() + box.space.request_counters:replace({ box.session.id(), 0 }) +end + +local function init_counter_space() + local request_counters = box.schema.space.create('request_counters', { + format = { { 'id', 'unsigned' }, { 'count', 'unsigned' } }, + if_not_exists = true + }) + request_counters:create_index('id', { parts = { 'id' }, if_not_exists = true }) + request_counters:create_index('count', { parts = { 'count' }, if_not_exists = true, unique = false }) + + + box.session.on_connect(reset_request_counters_on_connect) +end + +return { + reset_request_counters = reset_request_counters, + simple_long_running_function = simple_long_running_function, + long_running_function = long_running_function, + get_request_count = get_request_count, + init_counter_space = init_counter_space, +} diff --git a/src/test/resources/cartridge/utils/metadata.lua b/src/test/resources/cartridge/utils/metadata.lua new file mode 100644 index 000000000..a159bab29 --- /dev/null +++ b/src/test/resources/cartridge/utils/metadata.lua @@ -0,0 +1,7 @@ +local function get_router_name() + return string.sub(box.cfg.custom_proc_title, 9) +end + +return { + get_router_name = get_router_name +}