Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source MySQL\MsSql\Postgres: added RDS base performance tests #8215

Merged
merged 31 commits into from
Dec 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
cb4880c
Source-MySQL: added RDS base performance tests
etsybaev Nov 23, 2021
61e80f6
removed $MYSQL_PERFORMANCE_TEST_CREDS config from github
etsybaev Nov 25, 2021
ef172ff
added few new tests
etsybaev Nov 29, 2021
2e19a54
added few new test
etsybaev Nov 29, 2021
e8a3fcc
fixed expected value
etsybaev Nov 29, 2021
137252e
Added performance tests for postgres
etsybaev Dec 2, 2021
54a0c3f
Fixed testRegular25tables50000recordsDb for Postges
etsybaev Dec 3, 2021
a39a606
added performance tests for mssql
etsybaev Dec 5, 2021
355a16f
refactored performance tests
etsybaev Dec 5, 2021
1696b8f
Merge branch 'master' into etsybaev/source-mysql-added-base-rds-perfo…
etsybaev Dec 5, 2021
664ce98
fixed template name
etsybaev Dec 5, 2021
66d0bef
minor refactor
etsybaev Dec 5, 2021
28b9096
fixed outOfMemory error
etsybaev Dec 6, 2021
3a3d65e
fixed outOfMemory error (attempt 2)
etsybaev Dec 6, 2021
0d9945a
Merge branch 'master' into etsybaev/source-mysql-added-base-rds-perfo…
etsybaev Dec 6, 2021
4300ccb
excluded big test from run for Postgres
etsybaev Dec 6, 2021
8e0dbc1
excluded big test from run for Postgres (2)
etsybaev Dec 6, 2021
06d6dd8
kept only testSmall1000tableswith10000recordsDb for postgres
etsybaev Dec 6, 2021
43f4b3a
excluded failed tests
etsybaev Dec 6, 2021
dc9de25
fixed performance tests
etsybaev Dec 7, 2021
6cae5c4
Refactored performance tests as per comments in PR
etsybaev Dec 9, 2021
bf7ba56
Merge branch 'master' into etsybaev/source-mysql-added-base-rds-perfo…
etsybaev Dec 9, 2021
cf454d9
disabled by default Fill Data scripts
etsybaev Dec 9, 2021
8bd209f
fixed codestyle
etsybaev Dec 9, 2021
8620391
Merge branch 'master' into etsybaev/source-mysql-added-base-rds-perfo…
etsybaev Dec 10, 2021
789fd4e
updated perfomance test with cpu and memory limit
andriikorotkov Dec 10, 2021
f256138
add logs
andriikorotkov Dec 10, 2021
747b7ed
Merge branch 'etsybaev/source-mysql-added-base-rds-performance-test' …
andriikorotkov Dec 10, 2021
0c76263
Merge branch 'master' into etsybaev/source-mysql-added-base-rds-perfo…
etsybaev Dec 10, 2021
414e009
updated tables rows count
andriikorotkov Dec 11, 2021
a360666
updated code as per comments in PR
etsybaev Dec 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
runtimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.4.2'
implementation 'org.junit.platform:junit-platform-launcher:1.7.0'
implementation 'org.junit.jupiter:junit-jupiter-api:5.4.2'
implementation 'org.junit.jupiter:junit-jupiter-params:5.8.1'
}

def getFullPath(String className) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardDiscoverCatalogInput;
Expand All @@ -24,6 +25,7 @@
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
Expand All @@ -33,18 +35,22 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This abstract class contains helpful functionality and boilerplate for testing a source
* connector.
*/
public abstract class AbstractSourceConnectorTest {

protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceConnectorTest.class);
private TestDestinationEnv environment;
private Path jobRoot;
protected Path localRoot;
Expand All @@ -53,6 +59,11 @@ public abstract class AbstractSourceConnectorTest {
private static final String JOB_ID = String.valueOf(0L);
private static final int JOB_ATTEMPT = 0;

private static final String CPU_REQUEST_FIELD_NAME = "cpuRequest";
private static final String CPU_LIMIT_FIELD_NAME = "cpuLimit";
private static final String MEMORY_REQUEST_FIELD_NAME = "memoryRequest";
private static final String MEMORY_LIMIT_FIELD_NAME = "memoryLimit";

/**
* Name of the docker image that the tests will run against.
*
Expand Down Expand Up @@ -169,7 +180,9 @@ protected Map<String, Integer> runReadVerifyNumberOfReceivedMsgs(final Configure
.withState(state == null ? null : new State().withState(state))
.withCatalog(catalog);

final AirbyteSource source = new DefaultAirbyteSource(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory));
final Map<String, String> mapOfResourceRequirementsParams = prepareResourceRequestMapBySystemProperties();
final AirbyteSource source =
prepareAirbyteSource(!mapOfResourceRequirementsParams.isEmpty() ? prepareResourceRequirements(mapOfResourceRequirementsParams) : null);
source.start(sourceConfig, jobRoot);

while (!source.isFinished()) {
Expand All @@ -186,4 +199,36 @@ protected Map<String, Integer> runReadVerifyNumberOfReceivedMsgs(final Configure
return mapOfExpectedRecordsCount;
}

protected ResourceRequirements prepareResourceRequirements(Map<String, String> mapOfResourceRequirementsParams) {
return new ResourceRequirements().withCpuRequest(mapOfResourceRequirementsParams.get(CPU_REQUEST_FIELD_NAME))
.withCpuLimit(mapOfResourceRequirementsParams.get(CPU_LIMIT_FIELD_NAME))
.withMemoryRequest(mapOfResourceRequirementsParams.get(MEMORY_REQUEST_FIELD_NAME))
.withMemoryLimit(mapOfResourceRequirementsParams.get(MEMORY_LIMIT_FIELD_NAME));
}

private AirbyteSource prepareAirbyteSource(ResourceRequirements resourceRequirements) {
var integrationLauncher = resourceRequirements == null ? new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory)
: new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, resourceRequirements);
return new DefaultAirbyteSource(integrationLauncher);
}

private static Map<String, String> prepareResourceRequestMapBySystemProperties() {
var cpuLimit = System.getProperty(CPU_LIMIT_FIELD_NAME);
var memoryLimit = System.getProperty(MEMORY_LIMIT_FIELD_NAME);
if (cpuLimit.isBlank() || cpuLimit.isEmpty()) {
cpuLimit = WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS.getCpuLimit();
}
if (memoryLimit.isBlank() || memoryLimit.isEmpty()) {
memoryLimit = WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS.getMemoryLimit();
}
LOGGER.error("cpu limit -->> {}", cpuLimit);
LOGGER.error("memory limit -->> {}", memoryLimit);
Map<String, String> result = new HashMap<>();
result.put(CPU_REQUEST_FIELD_NAME, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS.getCpuRequest());
result.put(CPU_LIMIT_FIELD_NAME, cpuLimit);
result.put(MEMORY_REQUEST_FIELD_NAME, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS.getMemoryRequest());
result.put(MEMORY_LIMIT_FIELD_NAME, memoryLimit);
return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.source.performancetest;

import io.airbyte.integrations.standardtest.source.AbstractSourceConnectorTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;

/**
* This abstract class contains common methods for both steams - Fill Db scripts and Performance
* tests.
*/
public abstract class AbstractSourceBasePerformanceTest extends AbstractSourceConnectorTest {

private static final String TEST_COLUMN_NAME = "test_column";
private static final String TEST_STREAM_NAME_TEMPLATE = "test_%S";

/**
* The column name will be used for a test column in the test tables. Override it if default name is
* not valid for your source.
*
* @return Test column name
*/
protected String getTestColumnName() {
return TEST_COLUMN_NAME;
}

/**
* The stream name template will be used for a test tables. Override it if default name is not valid
* for your source.
*
* @return Test steam name template
*/
protected String getTestStreamNameTemplate() {
return TEST_STREAM_NAME_TEMPLATE;
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
// DO NOTHING. Mandatory to override. DB will be setup as part of each test
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.source.performancetest;

import io.airbyte.db.Database;
import java.util.StringJoiner;
import java.util.stream.Stream;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This abstract class contains common methods for Fill Db scripts.
*/
public abstract class AbstractSourceFillDbWithTestData extends AbstractSourceBasePerformanceTest {

private static final String CREATE_DB_TABLE_TEMPLATE = "CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, %s)";
private static final String INSERT_INTO_DB_TABLE_QUERY_TEMPLATE = "INSERT INTO %s.%s (%s) VALUES %s";
private static final String TEST_DB_FIELD_TYPE = "varchar(10)";

protected static final Logger c = LoggerFactory.getLogger(AbstractSourceFillDbWithTestData.class);
private static final String TEST_VALUE_TEMPLATE_POSTGRES = "\'Value id_placeholder\'";
protected static Stream testArgs;

/**
* Setup the test database. All tables and data described in the registered tests will be put there.
*
* @return configured test database
* @throws Exception - might throw any exception during initialization.
*/
protected abstract Database setupDatabase(String dbName) throws Exception;

/**
* The test added test data to a new DB. 1. Set DB creds in static variables above 2. Set desired
* number for streams, coolumns and records 3. Run the test
*/
@Disabled
@ParameterizedTest
@MethodSource("provideParameters")
public void addTestData(String dbName,
String schemaName,
int numberOfDummyRecords,
int numberOfBatches,
int numberOfColumns,
int numberOfStreams)
throws Exception {

final Database database = setupDatabase(dbName);

database.query(ctx -> {
for (int currentSteamNumber = 0; currentSteamNumber < numberOfStreams; currentSteamNumber++) {

String currentTableName = String.format(getTestStreamNameTemplate(), currentSteamNumber);

ctx.fetch(prepareCreateTableQuery(schemaName, numberOfColumns, currentTableName));
for (int i = 0; i < numberOfBatches; i++) {
String insertQueryTemplate = prepareInsertQueryTemplate(schemaName, i,
numberOfColumns,
numberOfDummyRecords);
ctx.fetch(String.format(insertQueryTemplate, currentTableName));
}

c.info("Finished processing for stream " + currentSteamNumber);
}
return null;
});

database.close();

}

/**
* This is a data provider for fill DB script,, Each argument's group would be ran as a separate
* test. Set the "testArgs" in test class of your DB in @BeforeTest method.
*
* 1st arg - a name of DB that will be used in jdbc connection string. 2nd arg - a schemaName that
* will be ised as a NameSpace in Configured Airbyte Catalog. 3rd arg - a number of expected records
* retrieved in each stream. 4th arg - a number of columns in each stream\table that will be use for
* Airbyte Cataloq configuration 5th arg - a number of streams to read in configured airbyte
* Catalog. Each stream\table in DB should be names like "test_0", "test_1",..., test_n.
*
* Stream.of( Arguments.of("your_db_name", "your_schema_name", 100, 2, 240, 1000) );
*/
private static Stream<Arguments> provideParameters() {
return testArgs;
}

protected String prepareCreateTableQuery(final String dbSchemaName,
final int numberOfColumns,
final String currentTableName) {

StringJoiner sj = new StringJoiner(",");
for (int i = 0; i < numberOfColumns; i++) {
sj.add(String.format(" %s%s %s", getTestColumnName(), i, TEST_DB_FIELD_TYPE));
}

return String.format(CREATE_DB_TABLE_TEMPLATE, dbSchemaName, currentTableName, sj.toString());
}

protected String prepareInsertQueryTemplate(final String dbSchemaName,
final int batchNumber,
final int numberOfColumns,
final int recordsNumber) {
tuliren marked this conversation as resolved.
Show resolved Hide resolved

StringJoiner fieldsNames = new StringJoiner(",");
fieldsNames.add("id");

StringJoiner baseInsertQuery = new StringJoiner(",");
baseInsertQuery.add("id_placeholder");

for (int i = 0; i < numberOfColumns; i++) {
fieldsNames.add(getTestColumnName() + i);
baseInsertQuery.add(TEST_VALUE_TEMPLATE_POSTGRES);
}

StringJoiner insertGroupValuesJoiner = new StringJoiner(",");

int batchMessages = batchNumber * 100;

for (int currentRecordNumber = batchMessages;
currentRecordNumber < recordsNumber + batchMessages;
currentRecordNumber++) {
insertGroupValuesJoiner
.add("(" + baseInsertQuery.toString()
.replaceAll("id_placeholder", String.valueOf(currentRecordNumber)) + ")");
}

return String
.format(INSERT_INTO_DB_TABLE_QUERY_TEMPLATE, dbSchemaName, "%s", fieldsNames.toString(),
insertGroupValuesJoiner.toString());
}

}
Loading