Skip to content

Commit

Permalink
add background thread to track MSSQL container status
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Feb 12, 2024
1 parent 463eec1 commit 4671059
Show file tree
Hide file tree
Showing 18 changed files with 184 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* Source operation skeleton for JDBC compatible databases.
*/
public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implements JdbcCompatibleSourceOperations<Datatype> {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcCompatibleSourceOperations.class);
/**
* A Date representing the earliest date in CE. Any date before this is in BCE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

Expand Down Expand Up @@ -66,9 +69,11 @@ GenericContainer<?> container() {

private static final ConcurrentMap<ContainerKey, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();

private static final MdcScope.Builder TESTCONTAINER_LOG_MDC_BUILDER = new MdcScope.Builder()
.setLogPrefix("testcontainer")
.setPrefixColor(LoggingHelper.Color.RED_BACKGROUND);
private static final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageName imageName, List<String> methods, int containerId, String realContainerId) {
return new MdcScope.Builder()
.setLogPrefix("testcontainer " + containerId + " (" + imageName + "[" + StringUtils.join(methods, ",") + "]: " + realContainerId + ") ")
.setPrefixColor(LoggingHelper.Color.RED_BACKGROUND);
}

/**
* Creates a new, unshared testcontainer instance. This usually wraps the default constructor for
Expand Down Expand Up @@ -100,6 +105,8 @@ public final C exclusive(String imageName, String... methods) {
return (C) createAndStartContainer(DockerImageName.parse(imageName), Stream.of(methods).toList());
}

private static final AtomicInteger containerId = new AtomicInteger(0);

private GenericContainer<?> createAndStartContainer(DockerImageName imageName, List<String> methodNames) {
LOGGER.info("Creating new shared container based on {} with {}.", imageName, methodNames);
try {
Expand All @@ -108,8 +115,12 @@ private GenericContainer<?> createAndStartContainer(DockerImageName imageName, L
for (String methodName : methodNames) {
methods.add(getClass().getMethod(methodName, container.getClass()));
}
final var logConsumer = new Slf4jLogConsumer(LOGGER);
TESTCONTAINER_LOG_MDC_BUILDER.produceMappings(logConsumer::withMdc);
final var logConsumer = new Slf4jLogConsumer(LOGGER) {
public void accept(OutputFrame frame) {
super.accept(frame);
}
};
getTestContainerLogMdcBuilder(imageName, methodNames, containerId.getAndIncrement(), container.getContainerId()).produceMappings(logConsumer::withMdc);
container.withLogConsumer(logConsumer);
for (Method method : methods) {
LOGGER.info("Calling {} in {} on new shared container based on {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
Expand Down Expand Up @@ -54,11 +62,36 @@ abstract public class TestDatabase<C extends JdbcDatabaseContainer<?>, T extends
private DataSource dataSource;
private DSLContext dslContext;

protected final int databaseId;
private static final AtomicInteger nextDatabaseId= new AtomicInteger(0);

protected static final Map<String, Map<Integer, Queue<String>>> logs = new ConcurrentHashMap<>();

protected TestDatabase(C container) {
this.container = container;
this.suffix = Strings.addRandomSuffix("", "_", 10);
this.databaseId = nextDatabaseId.getAndIncrement();
log ("SGX creating database " + databaseId + " on container " + container.getContainerId());
}

private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
private void log(String logLine) {
LOGGER.info(logLine);
logs.putIfAbsent(getContainer().getContainerId(), new ConcurrentHashMap<>());
logs.get(getContainer().getContainerId()).putIfAbsent(databaseId, new ConcurrentLinkedDeque<>());
logs.get(getContainer().getContainerId()).get(databaseId).add(dateFormat.format(new Date()) + " " + logLine);
}

protected enum Status {
STARTING,
INITIALIZING,
RUNNING,
STOPPING,
STOPPED
}

protected Status status = Status.STARTING;

@SuppressWarnings("unchecked")
protected T self() {
return (T) this;
Expand Down Expand Up @@ -97,6 +130,7 @@ public T with(String fmtSql, Object... fmtArgs) {
* {@link DataSource} and {@link DSLContext} owned by this object.
*/
final public T initialized() {
status = Status.INITIALIZING;
inContainerBootstrapCmd().forEach(this::execInContainer);
this.dataSource = DataSourceFactory.create(
getUserName(),
Expand All @@ -106,6 +140,7 @@ final public T initialized() {
connectionProperties,
JdbcConnector.getConnectionTimeout(connectionProperties, getDatabaseDriver().getDriverClassName()));
this.dslContext = DSLContextFactory.create(dataSource, getSqlDialect());
status = Status.RUNNING;
return self();
}

Expand Down Expand Up @@ -170,7 +205,9 @@ public Database getDatabase() {
protected void execSQL(final List<String> sqls) {
try {
for (String sql : sqls) {
log("SGX databaseId " + databaseId + " executing SQL: " + sql);
getDslContext().execute(sql);
log("SGX databaseId " + databaseId + " completed SQL: " + sql);
}
} catch (DataAccessException e) {
throw new RuntimeException(e);
Expand All @@ -182,12 +219,12 @@ protected void execInContainer(List<String> cmd) {
return;
}
try {
LOGGER.debug("executing {}", Strings.join(cmd, " "));
log(String.format("SGX databaseId " + databaseId + " executing {}", Strings.join(cmd, " ")));
final var exec = getContainer().execInContainer(cmd.toArray(new String[0]));
if (exec.getExitCode() == 0) {
LOGGER.debug("execution success\nstdout:\n{}\nstderr:\n{}", exec.getStdout(), exec.getStderr());
log(String.format("SGX databaseId " + databaseId + " execution success\nstdout:\n{}\nstderr:\n{}", exec.getStdout(), exec.getStderr()));
} else {
LOGGER.error("execution failure, code {}\nstdout:\n{}\nstderr:\n{}", exec.getExitCode(), exec.getStdout(), exec.getStderr());
LOGGER.error(String.format("SGX databaseId " + databaseId + " execution failure, code {}\nstdout:\n{}\nstderr:\n{}", exec.getExitCode(), exec.getStdout(), exec.getStderr()));
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down Expand Up @@ -227,8 +264,11 @@ public B integrationTestConfigBuilder() {

@Override
public void close() {
status = Status.STOPPING;
execSQL(this.cleanupSQL);
execInContainer(inContainerUndoBootstrapCmd());
LOGGER.info ("closing database " + databaseId);
status = Status.STOPPED;
}

static public class ConfigBuilder<T extends TestDatabase<?, ?, ?>, B extends ConfigBuilder<T, B>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ void testUpdate() throws Exception {

@SuppressWarnings({"BusyWait", "CodeBlock2Expr"})
@Test
@DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.")
// @DisplayName("Verify that when data is inserted into the database while a sync is happening and
// after the first sync, it all gets replicated.")
protected void testRecordsProducedDuringAndAfterSync() throws Exception {

final int recordsToCreate = 20;
Expand Down Expand Up @@ -460,7 +461,8 @@ protected void assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(f
}

@Test
@DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.")
// @DisplayName("When both incremental CDC and full refresh are configured for different streams in
// a sync, the data is replicated as expected.")
void testCdcAndFullRefreshInSameSync() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(getConfiguredCatalog());

Expand Down Expand Up @@ -533,7 +535,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
}

@Test
@DisplayName("When no records exist, no records are returned.")
// @DisplayName("When no records exist, no records are returned.")
void testNoData() throws Exception {

testdb.with("DELETE FROM %s.%s", modelsSchema(), MODELS_STREAM_NAME);
Expand All @@ -552,7 +554,8 @@ protected void assertExpectedStateMessagesForNoData(final List<AirbyteStateMessa
}

@Test
@DisplayName("When no changes have been made to the database since the previous sync, no records are returned.")
// @DisplayName("When no changes have been made to the database since the previous sync, no records
// are returned.")
void testNoDataOnSecondSync() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = source()
.read(config(), getConfiguredCatalog(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,15 @@ void testSpec() throws Exception {
final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class);

assertEquals(expected, actual);
throw new RuntimeException("SGX passed");
}

@Test
void testCheckSuccess() throws Exception {
final AirbyteConnectionStatus actual = source().check(config());
final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
assertEquals(expected, actual);
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -229,6 +231,7 @@ protected void testCheckFailure() throws Exception {
((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake");
final AirbyteConnectionStatus actual = source().check(config);
assertEquals(Status.FAILED, actual.getStatus());
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -244,6 +247,7 @@ void testDiscover() throws Exception {
assertTrue(expectedStream.isPresent(), String.format("Unexpected stream %s", actualStream.getName()));
assertEquals(expectedStream.get(), actualStream);
});
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -257,6 +261,7 @@ protected void testDiscoverWithNonCursorFields() throws Exception {
assertEquals(TABLE_NAME_WITHOUT_CURSOR_TYPE.toLowerCase(), stream.getName().toLowerCase());
assertEquals(1, stream.getSupportedSyncModes().size());
assertEquals(SyncMode.FULL_REFRESH, stream.getSupportedSyncModes().get(0));
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -271,6 +276,7 @@ protected void testDiscoverWithNullableCursorFields() throws Exception {
assertEquals(2, stream.getSupportedSyncModes().size());
assertTrue(stream.getSupportedSyncModes().contains(SyncMode.FULL_REFRESH));
assertTrue(stream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL));
throw new RuntimeException("SGX passed");
}

protected AirbyteCatalog filterOutOtherSchemas(final AirbyteCatalog catalog) {
Expand Down Expand Up @@ -322,6 +328,7 @@ protected void testDiscoverWithMultipleSchemas() throws Exception {
expected.getStreams().sort(schemaTableCompare);
actual.getStreams().sort(schemaTableCompare);
assertEquals(expected, filterOutOtherSchemas(actual));
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -334,6 +341,7 @@ void testReadSuccess() throws Exception {
final List<AirbyteMessage> expectedMessages = getTestMessages();
assertThat(expectedMessages, Matchers.containsInAnyOrder(actualMessages.toArray()));
assertThat(actualMessages, Matchers.containsInAnyOrder(expectedMessages.toArray()));
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -349,6 +357,7 @@ protected void testReadOneColumn() throws Exception {
assertEquals(expectedMessages.size(), actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
throw new RuntimeException("SGX passed");
}

protected List<AirbyteMessage> getAirbyteMessagesReadOneColumn() {
Expand Down Expand Up @@ -394,6 +403,7 @@ protected void testReadMultipleTables() throws Exception {
assertEquals(expectedMessages.size(), actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
throw new RuntimeException("SGX passed");
}

protected List<AirbyteMessage> getAirbyteMessagesSecondSync(final String streamName) {
Expand Down Expand Up @@ -430,6 +440,7 @@ protected void testTablesWithQuoting() throws Exception {
assertEquals(expectedMessages.size(), actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
throw new RuntimeException("SGX passed");
}

protected List<AirbyteMessage> getAirbyteMessagesForTablesWithQuoting(final ConfiguredAirbyteStream streamForTableWithSpaces) {
Expand Down Expand Up @@ -457,6 +468,7 @@ void testReadFailure() {
doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream();

assertThrows(RuntimeException.class, () -> source().read(config(), catalog, null));
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -466,6 +478,7 @@ void testIncrementalNoPreviousState() throws Exception {
null,
"3",
getTestMessages());
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -475,6 +488,7 @@ void testIncrementalIntCheckCursor() throws Exception {
"2",
"3",
List.of(getTestMessages().get(2)));
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -484,6 +498,7 @@ void testIncrementalStringCheckCursor() throws Exception {
"patent",
"vash",
List.of(getTestMessages().get(0), getTestMessages().get(2)));
throw new RuntimeException("SGX passed");
}

@Test
Expand All @@ -498,6 +513,7 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {
"vash",
expectedRecordMessages,
streamWithSpaces);
throw new RuntimeException("SGX passed");
}

protected List<AirbyteMessage> getAirbyteMessagesCheckCursorSpaceInColumnName(final ConfiguredAirbyteStream streamWithSpaces) {
Expand All @@ -519,6 +535,7 @@ protected List<AirbyteMessage> getAirbyteMessagesCheckCursorSpaceInColumnName(fi
@Test
void testIncrementalDateCheckCursor() throws Exception {
incrementalDateCheck();
throw new RuntimeException("SGX passed");
}

protected void incrementalDateCheck() throws Exception {
Expand All @@ -540,6 +557,7 @@ void testIncrementalCursorChanges() throws Exception {
"data",
"vash",
getTestMessages());
throw new RuntimeException("SGX passed");
}

@Test
Expand Down Expand Up @@ -574,6 +592,7 @@ protected void testReadOneTableIncrementallyTwice() throws Exception {
assertEquals(expectedMessages.size(), actualMessagesSecondSync.size());
assertTrue(expectedMessages.containsAll(actualMessagesSecondSync));
assertTrue(actualMessagesSecondSync.containsAll(expectedMessages));
throw new RuntimeException("SGX passed");
}

protected void executeStatementReadIncrementallyTwice() {
Expand Down Expand Up @@ -681,6 +700,7 @@ protected void testReadMultipleTablesIncrementally() throws Exception {
assertEquals(expectedMessagesFirstSync.size(), actualMessagesFirstSync.size());
assertTrue(expectedMessagesFirstSync.containsAll(actualMessagesFirstSync));
assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync));
throw new RuntimeException("SGX passed");
}

protected List<AirbyteMessage> getAirbyteMessagesSecondStreamWithNamespace(final String streamName2) {
Expand Down Expand Up @@ -807,6 +827,7 @@ public void testIncrementalWithConcurrentInsertion() throws Exception {
} else {
assertEquals(List.of("c", "d", "e", "f"), thirdSyncExpectedNames);
}
throw new RuntimeException("SGX passed");
}

protected JsonNode getStateData(final AirbyteMessage airbyteMessage, final String streamName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ public UnexpectedRecord(String streamName, String unexpectedValue) {
"The stream '" + entry.streamName + "' checking type '" + testByName.get(entry.streamName).getSourceType() + "' initialized at "
+ testByName.get(entry.streamName).getDeclarationLocation() + " is missing values: " + entry.missedValues)
.collect(Collectors.joining("\n")) +
unexpectedValues.stream().map((entry) -> // stream each entry, map it to string value
"The stream '" + entry.streamName + "' checking type '" + testByName.get(entry.streamName).getSourceType() + "' initialized at "
+ testByName.get(entry.streamName).getDeclarationLocation() + " got unexpected values: " + entry.unexpectedValue)
.collect(Collectors.joining("\n"))); // and join them
unexpectedValues.stream().map((entry) -> // stream each entry, map it to string value
"The stream '" + entry.streamName + "' checking type '" + testByName.get(entry.streamName).getSourceType() + "' initialized at "
+ testByName.get(entry.streamName).getDeclarationLocation() + " got unexpected values: " + entry.unexpectedValue)
.collect(Collectors.joining("\n"))); // and join them
}

protected String getValueFromJsonNode(final JsonNode jsonNode) throws IOException {
Expand Down
Loading

0 comments on commit 4671059

Please sign in to comment.