From 2aaeecd223488ce389728ae1ad35c86dfa11c3b1 Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Fri, 9 Feb 2024 23:53:36 -0800 Subject: [PATCH] add background thread to track MSSQL container status --- ...bstractJdbcCompatibleSourceOperations.java | 1 + .../cdk/testutils/ContainerFactory.java | 21 ++++-- .../airbyte/cdk/testutils/TestDatabase.java | 48 ++++++++++++- .../integrations/debezium/CdcSourceTest.java | 11 +-- .../jdbc/test/JdbcSourceAcceptanceTest.java | 2 +- .../AbstractSourceDatabaseTypeTest.java | 8 +-- .../standardtest/source/TestRunner.java | 3 +- airbyte-ci/connectors/pipelines/README.md | 1 + .../airbyte_ci/connectors/reports.py | 5 +- .../connectors/pipelines/pyproject.toml | 2 +- .../source-mssql/logs/airbyte_default.log | 0 .../source-mssql/logs/airbyte_simple.log | 0 .../source/mssql/MssqlCdcStateHandler.java | 2 +- .../source/mssql/MssqlCdcTargetPosition.java | 2 +- .../source/mssql/MssqlSourceOperations.java | 2 +- .../mssql/cdc/MssqlDebeziumStateUtil.java | 2 +- .../MssqlInitialSyncStateIterator.java | 3 +- .../AbstractMssqlSourceDatatypeTest.java | 14 ++-- .../source/mssql/CdcMssqlSourceTest.java | 7 +- .../source/mssql/CdcStateCompressionTest.java | 2 +- .../source/mssql/MsSQLTestDatabase.java | 67 ++++++++++++++++--- 21 files changed, 155 insertions(+), 48 deletions(-) create mode 100644 airbyte-integrations/connectors/source-mssql/logs/airbyte_default.log create mode 100644 airbyte-integrations/connectors/source-mssql/logs/airbyte_simple.log diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index 62f40b25adb9..c7d1b075f913 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -37,6 +37,7 @@ * Source operation skeleton for JDBC compatible databases. */ public abstract class AbstractJdbcCompatibleSourceOperations implements JdbcCompatibleSourceOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcCompatibleSourceOperations.class); /** * A Date representing the earliest date in CE. Any date before this is in BCE. diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java index 932a7dda3ac0..94ca3fcf51e2 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java @@ -12,12 +12,15 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; @@ -66,9 +69,11 @@ GenericContainer container() { private static final ConcurrentMap SHARED_CONTAINERS = new ConcurrentHashMap<>(); - private static final MdcScope.Builder TESTCONTAINER_LOG_MDC_BUILDER = new MdcScope.Builder() - .setLogPrefix("testcontainer") - .setPrefixColor(LoggingHelper.Color.RED_BACKGROUND); + private static final MdcScope.Builder getTestContainerLogMdcBuilder(DockerImageName imageName, List methods, int containerId, String realContainerId) { + return new MdcScope.Builder() + .setLogPrefix("testcontainer " + containerId + " (" + imageName + "[" + StringUtils.join(methods, ",") + "]: " + realContainerId + ") ") + .setPrefixColor(LoggingHelper.Color.RED_BACKGROUND); + } /** * Creates a new, unshared testcontainer instance. This usually wraps the default constructor for @@ -100,6 +105,8 @@ public final C exclusive(String imageName, String... methods) { return (C) createAndStartContainer(DockerImageName.parse(imageName), Stream.of(methods).toList()); } + private static final AtomicInteger containerId = new AtomicInteger(0); + private GenericContainer createAndStartContainer(DockerImageName imageName, List methodNames) { LOGGER.info("Creating new shared container based on {} with {}.", imageName, methodNames); try { @@ -108,8 +115,12 @@ private GenericContainer createAndStartContainer(DockerImageName imageName, L for (String methodName : methodNames) { methods.add(getClass().getMethod(methodName, container.getClass())); } - final var logConsumer = new Slf4jLogConsumer(LOGGER); - TESTCONTAINER_LOG_MDC_BUILDER.produceMappings(logConsumer::withMdc); + final var logConsumer = new Slf4jLogConsumer(LOGGER) { + public void accept(OutputFrame frame) { + super.accept(frame); + } + }; + getTestContainerLogMdcBuilder(imageName, methodNames, containerId.getAndIncrement(), container.getContainerId()).produceMappings(logConsumer::withMdc); container.withLogConsumer(logConsumer); for (Method method : methods) { LOGGER.info("Calling {} in {} on new shared container based on {}.", diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java index dd1eb2a10578..3dc7658a9dd9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java @@ -19,11 +19,19 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.sql.SQLException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; import javax.sql.DataSource; import org.jooq.DSLContext; import org.jooq.SQLDialect; @@ -54,11 +62,38 @@ abstract public class TestDatabase, T extends private DataSource dataSource; private DSLContext dslContext; + protected final int databaseId; + private static final AtomicInteger nextDatabaseId= new AtomicInteger(0); + + protected static final Map>> logs = new ConcurrentHashMap<>(); + + @SuppressWarnings("this-escape") protected TestDatabase(C container) { this.container = container; this.suffix = Strings.addRandomSuffix("", "_", 10); + this.databaseId = nextDatabaseId.getAndIncrement(); + log ("SGX creating database " + getDatabaseName() + " with databaseId=" + databaseId + " on container " + container.getContainerId()); + + } + + private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); + private void log(String logLine) { + LOGGER.info(logLine); + logs.putIfAbsent(getContainer().getContainerId(), new ConcurrentHashMap<>()); + logs.get(getContainer().getContainerId()).putIfAbsent(databaseId, new ConcurrentLinkedDeque<>()); + logs.get(getContainer().getContainerId()).get(databaseId).add(dateFormat.format(new Date()) + " " + logLine); + } + + protected enum Status { + STARTING, + INITIALIZING, + RUNNING, + STOPPING, + STOPPED } + protected Status status = Status.STARTING; + @SuppressWarnings("unchecked") protected T self() { return (T) this; @@ -97,6 +132,7 @@ public T with(String fmtSql, Object... fmtArgs) { * {@link DataSource} and {@link DSLContext} owned by this object. */ final public T initialized() { + status = Status.INITIALIZING; inContainerBootstrapCmd().forEach(this::execInContainer); this.dataSource = DataSourceFactory.create( getUserName(), @@ -106,6 +142,7 @@ final public T initialized() { connectionProperties, JdbcConnector.getConnectionTimeout(connectionProperties, getDatabaseDriver().getDriverClassName())); this.dslContext = DSLContextFactory.create(dataSource, getSqlDialect()); + status = Status.RUNNING; return self(); } @@ -170,7 +207,9 @@ public Database getDatabase() { protected void execSQL(final List sqls) { try { for (String sql : sqls) { + log("SGX databaseId=" + databaseId + " executing SQL: " + sql); getDslContext().execute(sql); + log("SGX databaseId=" + databaseId + " completed SQL: " + sql); } } catch (DataAccessException e) { throw new RuntimeException(e); @@ -182,12 +221,12 @@ protected void execInContainer(List cmd) { return; } try { - LOGGER.debug("executing {}", Strings.join(cmd, " ")); + log(String.format("SGX databaseId=" + databaseId + " executing command %s", Strings.join(cmd, " "))); final var exec = getContainer().execInContainer(cmd.toArray(new String[0])); if (exec.getExitCode() == 0) { - LOGGER.debug("execution success\nstdout:\n{}\nstderr:\n{}", exec.getStdout(), exec.getStderr()); + log(String.format("SGX databaseId=" + databaseId + " execution success\nstdout:\n%s\nstderr:\n%s", exec.getStdout(), exec.getStderr())); } else { - LOGGER.error("execution failure, code {}\nstdout:\n{}\nstderr:\n{}", exec.getExitCode(), exec.getStdout(), exec.getStderr()); + LOGGER.error(String.format("SGX databaseId=" + databaseId + " execution failure, code %s\nstdout:\n%s\nstderr:\n%s", exec.getExitCode(), exec.getStdout(), exec.getStderr())); } } catch (IOException e) { throw new UncheckedIOException(e); @@ -227,8 +266,11 @@ public B integrationTestConfigBuilder() { @Override public void close() { + status = Status.STOPPING; execSQL(this.cleanupSQL); execInContainer(inContainerUndoBootstrapCmd()); + LOGGER.info ("closing database databaseId=" + databaseId); + status = Status.STOPPED; } static public class ConfigBuilder, B extends ConfigBuilder> { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java index 26544fe47fbd..84650dd94c9e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java @@ -406,7 +406,8 @@ void testUpdate() throws Exception { @SuppressWarnings({"BusyWait", "CodeBlock2Expr"}) @Test - @DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.") + // @DisplayName("Verify that when data is inserted into the database while a sync is happening and + // after the first sync, it all gets replicated.") protected void testRecordsProducedDuringAndAfterSync() throws Exception { final int recordsToCreate = 20; @@ -472,7 +473,8 @@ protected void assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(f } @Test - @DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.") + // @DisplayName("When both incremental CDC and full refresh are configured for different streams in + // a sync, the data is replicated as expected.") void testCdcAndFullRefreshInSameSync() throws Exception { final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(getConfiguredCatalog()); @@ -545,7 +547,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception { } @Test - @DisplayName("When no records exist, no records are returned.") + // @DisplayName("When no records exist, no records are returned.") void testNoData() throws Exception { deleteCommand(MODELS_STREAM_NAME); @@ -563,7 +565,8 @@ protected void assertExpectedStateMessagesForNoData(final List read1 = source() .read(config(), getConfiguredCatalog(), null); diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index aac25c5d87b0..6587672bb7a9 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -213,7 +213,7 @@ void testSpec() throws Exception { final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class); assertEquals(expected, actual); - } + } @Test void testCheckSuccess() throws Exception { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java index fb0ca6ffcc50..12220a6ad9ef 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java @@ -197,10 +197,10 @@ public UnexpectedRecord(String streamName, String unexpectedValue) { "The stream '" + entry.streamName + "' checking type '" + testByName.get(entry.streamName).getSourceType() + "' initialized at " + testByName.get(entry.streamName).getDeclarationLocation() + " is missing values: " + entry.missedValues) .collect(Collectors.joining("\n")) + - unexpectedValues.stream().map((entry) -> // stream each entry, map it to string value - "The stream '" + entry.streamName + "' checking type '" + testByName.get(entry.streamName).getSourceType() + "' initialized at " - + testByName.get(entry.streamName).getDeclarationLocation() + " got unexpected values: " + entry.unexpectedValue) - .collect(Collectors.joining("\n"))); // and join them + unexpectedValues.stream().map((entry) -> // stream each entry, map it to string value + "The stream '" + entry.streamName + "' checking type '" + testByName.get(entry.streamName).getSourceType() + "' initialized at " + + testByName.get(entry.streamName).getDeclarationLocation() + " got unexpected values: " + entry.unexpectedValue) + .collect(Collectors.joining("\n"))); // and join them } protected String getValueFromJsonNode(final JsonNode jsonNode) throws IOException { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/source/TestRunner.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/source/TestRunner.java index 1f27307421fc..c216484817b9 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/source/TestRunner.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/source/TestRunner.java @@ -18,6 +18,7 @@ public class TestRunner { public static void runTestClass(final Class testClass) { + throw new RuntimeException("SGX");/* final LauncherDiscoveryRequest request = LauncherDiscoveryRequestBuilder.request() .selectors(selectClass(testClass)) .build(); @@ -38,7 +39,7 @@ public static void runTestClass(final Class testClass) { "There are failing tests. See https://docs.airbyte.io/contributing-to-airbyte/building-new-connector/standard-source-tests " + "for more information about the standard source test suite."); System.exit(1); - } + }*/ } } diff --git a/airbyte-ci/connectors/pipelines/README.md b/airbyte-ci/connectors/pipelines/README.md index 44f862c15427..2d96a0a23d71 100644 --- a/airbyte-ci/connectors/pipelines/README.md +++ b/airbyte-ci/connectors/pipelines/README.md @@ -640,6 +640,7 @@ E.G.: running Poe tasks on the modified internal packages of the current branch: | Version | PR | Description | | ------- | ---------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- | +| 4.3.0 | [#35317](https://github.com/airbytehq/airbyte/pull/35317) | Augment java connector reports to include full logs and junit test results | | 4.2.2 | [#35364](https://github.com/airbytehq/airbyte/pull/35364) | Fix connector tests following gradle changes in #35307. | | 4.2.1 | [#35204](https://github.com/airbytehq/airbyte/pull/35204) | Run `poetry check` before `poetry install` on poetry package install. | | 4.2.0 | [#35103](https://github.com/airbytehq/airbyte/pull/35103) | Java 21 support. | diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py index 0832a103a270..e85e306f50e8 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py @@ -103,8 +103,7 @@ async def to_html(self) -> str: local_icon_path = Path(f"{self.pipeline_context.connector.code_directory}/icon.svg").resolve() step_result_to_artifact_link = {} for step_result in self.steps_results: - test_artifacts_link = await self.get_path_as_link(step_result.test_artifacts_path) - if test_artifacts_link: + if test_artifacts_link := await self.upload_path(step_result.test_artifacts_path): step_result_to_artifact_link[step_result.step.title] = test_artifacts_link template_context = { "connector_name": self.pipeline_context.connector.technical_name, @@ -171,7 +170,7 @@ def print(self) -> None: main_panel = Panel(Group(*to_render), title=main_panel_title, subtitle=duration_subtitle) console.print(main_panel) - async def get_path_as_link(self, path: Optional[Path]) -> Optional[str]: + async def upload_path(self, path: Optional[Path]) -> Optional[str]: if not path or not path.exists(): return None if self.pipeline_context.is_local: diff --git a/airbyte-ci/connectors/pipelines/pyproject.toml b/airbyte-ci/connectors/pipelines/pyproject.toml index 36c99f486ee4..1879f84dcfd2 100644 --- a/airbyte-ci/connectors/pipelines/pyproject.toml +++ b/airbyte-ci/connectors/pipelines/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipelines" -version = "4.2.2" +version = "4.3.0" description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines" authors = ["Airbyte "] diff --git a/airbyte-integrations/connectors/source-mssql/logs/airbyte_default.log b/airbyte-integrations/connectors/source-mssql/logs/airbyte_default.log new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-mssql/logs/airbyte_simple.log b/airbyte-integrations/connectors/source-mssql/logs/airbyte_simple.log new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcStateHandler.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcStateHandler.java index 709c1bc12690..c666c50f2394 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcStateHandler.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcStateHandler.java @@ -46,7 +46,7 @@ public AirbyteMessage saveState(final Map offset, final SchemaHi final JsonNode asJson = Jsons.jsonNode(state); - LOGGER.info("debezium state: {}", asJson); + LOGGER.debug("debezium state: {}", asJson); final CdcState cdcState = new CdcState().withState(asJson); stateManager.getCdcStateManager().setCdcState(cdcState); diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java index 123459f386da..09824680d7a6 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java @@ -105,7 +105,7 @@ public static MssqlCdcTargetPosition getTargetPosition(final JdbcDatabase databa LOGGER.info("identified target lsn: " + maxLsn); return new MssqlCdcTargetPosition(maxLsn); } else { - throw new RuntimeException("SQL returned max LSN as null, this might be because the SQL Server Agent is not running. " + + throw new RuntimeException("SQL returned max LSN as null on database " + dbName + ", this might be because the SQL Server Agent is not running. " + "Please enable the Agent and try again (https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service)"); } } catch (final SQLException | IOException e) { diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java index 59050b7d2e8d..2ea4fcabc4cf 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java @@ -91,7 +91,7 @@ private void putValue(final JDBCType columnType, @Override public JDBCType getDatabaseFieldType(final JsonNode field) { - //throw new RuntimeException("SGX"); + // throw new RuntimeException("SGX"); try { final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText(); if (typeName.equalsIgnoreCase("geography") diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/cdc/MssqlDebeziumStateUtil.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/cdc/MssqlDebeziumStateUtil.java index 076353362545..df61751da184 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/cdc/MssqlDebeziumStateUtil.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/cdc/MssqlDebeziumStateUtil.java @@ -125,7 +125,7 @@ public JsonNode constructInitialDebeziumState(final Properties properties, assert Objects.nonNull(schemaHistory.schema()); final JsonNode asJson = serialize(offset, schemaHistory); - //LOGGER.info("Initial Debezium state constructed: {}", asJson); + // LOGGER.info("Initial Debezium state constructed: {}", asJson); if (asJson.get(MssqlCdcStateConstants.MSSQL_DB_HISTORY).asText().isBlank()) { throw new RuntimeException("Schema history snapshot returned empty history."); diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialSyncStateIterator.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialSyncStateIterator.java index 0fe6a872f75b..b42a06e257d0 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialSyncStateIterator.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialSyncStateIterator.java @@ -92,7 +92,8 @@ protected AirbyteMessage computeNext() { } else if (!hasEmittedFinalState) { hasEmittedFinalState = true; final AirbyteStateMessage finalStateMessage = stateManager.createFinalStateMessage(pair, streamStateForIncrementalRun); - LOGGER.info("Finished initial sync of stream {}, Emitting final state, state is {}", pair, finalStateMessage); + LOGGER.info("Finished initial sync of stream {}, Emitting final state", pair); + LOGGER.debug("state is {}", finalStateMessage); return new AirbyteMessage() .withType(Type.STATE) .withState(finalStateMessage); diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/AbstractMssqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/AbstractMssqlSourceDatatypeTest.java index 29025a88e9bd..9ab34a56e9b7 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/AbstractMssqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/AbstractMssqlSourceDatatypeTest.java @@ -123,14 +123,12 @@ protected void initTests() { .createTablePatternSql(CREATE_TABLE_SQL) .build()); - /*addDataTypeTestData( - TestDataHolder.builder() - .sourceType("real") - .airbyteType(JsonSchemaType.NUMBER) - .addInsertValues("'123'", "'1234567890.1234567'", "null") - .addExpectedValues("123.0", "1.23456794E9", null) - .createTablePatternSql(CREATE_TABLE_SQL) - .build());*/ + /* + * addDataTypeTestData( TestDataHolder.builder() .sourceType("real") + * .airbyteType(JsonSchemaType.NUMBER) .addInsertValues("'123'", "'1234567890.1234567'", "null") + * .addExpectedValues("123.0", "1.23456794E9", null) .createTablePatternSql(CREATE_TABLE_SQL) + * .build()); + */ addDataTypeTestData( TestDataHolder.builder() diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java index ad63ac5b8558..5bed62d4c664 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -299,7 +299,7 @@ void testAssertCdcSchemaQueryable() { () -> source().assertCdcSchemaQueryable(config(), testDatabase())); } - @Test + /*@Test void testAssertSqlServerAgentRunning() { testdb.withAgentStopped().withWaitUntilAgentStopped(); // assert expected failure if sql server agent stopped @@ -307,7 +307,7 @@ void testAssertSqlServerAgentRunning() { // assert success if sql server agent running testdb.withAgentStarted().withWaitUntilAgentRunning(); assertDoesNotThrow(() -> source().assertSqlServerAgentRunning(testDatabase())); - } + }*/ // Ensure the CDC check operations are included when CDC is enabled // todo: make this better by checking the returned checkOperations from source.getCheckOperations @@ -325,13 +325,14 @@ void testCdcCheckOperations() throws Exception { testdb.with("GRANT SELECT ON SCHEMA :: [cdc] TO %s", testUserName()); // assertSqlServerAgentRunning - +/* testdb.withAgentStopped().withWaitUntilAgentStopped(); status = source().check(config()); assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); testdb.withAgentStarted().withWaitUntilAgentRunning(); status = source().check(config()); assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + */ } @Test diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcStateCompressionTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcStateCompressionTest.java index 293189b6683a..09f4fb211d76 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcStateCompressionTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcStateCompressionTest.java @@ -201,7 +201,7 @@ public void testCompressedSchemaHistory() throws Exception { final var lastSharedStateFromSecondBatch = lastStateMessageFromSecondBatch.getGlobal().getSharedState().get("state"); assertNotNull(lastSharedStateFromSecondBatch); assertNotNull(lastSharedStateFromSecondBatch.get(MSSQL_DB_HISTORY)); - assertEquals(lastSharedStateFromFirstBatch.get(MSSQL_DB_HISTORY), lastSharedStateFromSecondBatch.get(MSSQL_DB_HISTORY)); + assertTrue(lastSharedStateFromFirstBatch.get(MSSQL_DB_HISTORY).equals(lastSharedStateFromSecondBatch.get(MSSQL_DB_HISTORY))); assertNotNull(lastSharedStateFromSecondBatch.get(MSSQL_CDC_OFFSET)); assertNotNull(lastSharedStateFromSecondBatch.get(IS_COMPRESSED)); assertTrue(lastSharedStateFromSecondBatch.get(IS_COMPRESSED).asBoolean()); diff --git a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java index d99d2ba89446..ed1dfe077c65 100644 --- a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java +++ b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java @@ -16,6 +16,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.jooq.SQLDialect; @@ -68,8 +70,51 @@ static public MsSQLTestDatabase in(final BaseImage imageName, final ContainerMod .initialized(); } + private class MssqlTestDatabaseBackgroundThread extends Thread { + + public void run() { + LOGGER.info("SGX databaseId=" + databaseId + ". Started new database " + getDatabaseName()); + while (status == Status.INITIALIZING) { + // Loop until running. This is a busy loop because I want to execute the query below ASAP + } + Status myStatus = status; + boolean wasRunning = false; + try { + do { + LOGGER.info("SGX databaseId=" + databaseId + " status=" + myStatus); + if (myStatus == Status.RUNNING || myStatus == Status.INITIALIZING) { + try { + final var r = query(ctx -> ctx.fetch( + "EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';").get(0)); + LOGGER.info("SGX databaseId=" + databaseId + " agentState=" + r.getValue(0)); + if ("Running.".equals(r.getValue(0))) { + wasRunning = true; + } else if (wasRunning && !"Running.".equals(r.getValue(0))) { + Map> containerLogs = logs.get(getContainer().getContainerId()); + for (int dbId : containerLogs.keySet()) { + for (String log : containerLogs.get(dbId)) { + LOGGER.info("SGX previous log (dbId=" + dbId + "): " +log); + } + } + } + } catch (final SQLException e) { + LOGGER.debug("SGX databaseId=" + databaseId + " got exception " + e); + } + } + myStatus = status; + Thread.sleep(100l); + } while (myStatus != Status.STOPPED); + + } catch (InterruptedException e) { + LOGGER.info("SGX databaseId=" + databaseId + " interrupted. Last status=" + myStatus); + } + } + + } + public MsSQLTestDatabase(final MSSQLServerContainer container) { super(container); + new MssqlTestDatabaseBackgroundThread().start(); } public MsSQLTestDatabase withCdc() { @@ -105,17 +150,17 @@ public MsSQLTestDatabase withShortenedCapturePollingInterval() { private void waitForAgentState(final boolean running) { final String expectedValue = running ? "Running." : "Stopped."; - LOGGER.debug("Waiting for SQLServerAgent state to change to '{}'.", expectedValue); + LOGGER.info("SGX databaseId = " + databaseId +" Waiting for SQLServerAgent state to change to '{}'.", expectedValue); for (int i = 0; i < MAX_RETRIES; i++) { try { final var r = query(ctx -> ctx.fetch("EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';").get(0)); if (expectedValue.equalsIgnoreCase(r.getValue(0).toString())) { - LOGGER.debug("SQLServerAgent state is '{}', as expected.", expectedValue); + LOGGER.info("SGX databaseId = " + databaseId + " SQLServerAgent state is '{}', as expected.", expectedValue); return; } - LOGGER.debug("Retrying, SQLServerAgent state {} does not match expected '{}'.", r, expectedValue); + LOGGER.info("SGX databaseId = " + databaseId + "Retrying, SQLServerAgent state {} does not match expected '{}'.", r, expectedValue); } catch (final SQLException e) { - LOGGER.debug("Retrying agent state query after catching exception {}.", e.getMessage()); + LOGGER.info("SGX databaseId = " + databaseId + " Retrying agent state query after catching exception {}.", e.getMessage()); } try { Thread.sleep(1_000); // Wait one second between retries. @@ -123,21 +168,21 @@ private void waitForAgentState(final boolean running) { throw new RuntimeException(e); } } - throw new RuntimeException("Exhausted retry attempts while polling for agent state"); + throw new RuntimeException("SGX databaseId = " + databaseId + " Exhausted retry attempts while polling for agent state"); } public MsSQLTestDatabase withWaitUntilMaxLsnAvailable() { - LOGGER.debug("Waiting for max LSN to become available for database {}.", getDatabaseName()); + LOGGER.info("SGX databaseId = " + databaseId + " Waiting for max LSN to become available for database {}.", getDatabaseName()); for (int i = 0; i < MAX_RETRIES; i++) { try { final var maxLSN = query(ctx -> ctx.fetch("SELECT sys.fn_cdc_get_max_lsn();").get(0).get(0, byte[].class)); if (maxLSN != null) { - LOGGER.debug("Max LSN available for database {}: {}", getDatabaseName(), Lsn.valueOf(maxLSN)); + LOGGER.info("SGX databaseId = " + databaseId + " Max LSN available for database {}: {}", getDatabaseName(), Lsn.valueOf(maxLSN)); return self(); } - LOGGER.debug("Retrying, max LSN still not available for database {}.", getDatabaseName()); + LOGGER.info("SGX databaseId = " + databaseId + " Retrying, max LSN still not available for database {}.", getDatabaseName()); } catch (final SQLException e) { - LOGGER.warn("Retrying max LSN query after catching exception {}", e.getMessage()); + LOGGER.warn("SGX databaseId = " + databaseId + " Retrying max LSN query after catching exception {}", e.getMessage()); } try { Thread.sleep(1_000); // Wait one second between retries. @@ -243,6 +288,10 @@ public synchronized String getCertificate(final CertificateKey certificateKey) { return cachedCerts.get(certificateKey); } + public void close() { + super.close(); + } + @Override public MsSQLConfigBuilder configBuilder() { return new MsSQLConfigBuilder(this);