From 0ba82ae7868545e91766fe5053315294f686c54e Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Fri, 23 Feb 2024 18:22:29 -0800 Subject: [PATCH] fix 2? --- .../integrations/debezium/CdcSourceTest.java | 7 +++ .../source/mssql/MssqlCdcTargetPosition.java | 23 +++----- .../source/mssql/CdcMssqlSourceTest.java | 6 +++ .../source/mssql/MsSQLTestDatabase.java | 53 +++++++++++++------ 4 files changed, 55 insertions(+), 34 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java index 23cfed118197..7f745b0d16b5 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java @@ -36,6 +36,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.StreamDescriptor; import io.airbyte.protocol.models.v0.SyncMode; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -403,6 +404,11 @@ void testUpdate() throws Exception { assertCdcMetaData(recordMessages2.get(0).getData(), true); } + protected void waitForCdcRecords(String schemaName, String tableName, int recordCount) + throws SQLException { + + } + @SuppressWarnings({"BusyWait", "CodeBlock2Expr"}) @Test // Verify that when data is inserted into the database while a sync is happening and after the first @@ -418,6 +424,7 @@ protected void testRecordsProducedDuringAndAfterSync() throws Exception { "F-" + recordsCreated)); writeModelRecord(record); } + waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, recordsToCreate); final AutoCloseableIterator firstBatchIterator = source() .read(config(), getConfiguredCatalog(), null); diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java index 123459f386da..af172931ca50 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java @@ -15,7 +15,6 @@ import io.debezium.connector.sqlserver.Lsn; import java.io.IOException; import java.sql.SQLException; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -25,9 +24,6 @@ public class MssqlCdcTargetPosition implements CdcTargetPosition { private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcTargetPosition.class); - - public static final Duration MAX_LSN_QUERY_DELAY = Duration.ZERO; - public static final Duration MAX_LSN_QUERY_DELAY_TEST = Duration.ofSeconds(1); public final Lsn targetLsn; public MssqlCdcTargetPosition(final Lsn targetLsn) { @@ -83,31 +79,24 @@ public int hashCode() { public static MssqlCdcTargetPosition getTargetPosition(final JdbcDatabase database, final String dbName) { try { - // We might have to wait a bit before querying the max_lsn to give the CDC capture job - // a chance to catch up. This is important in tests, where reads might occur in quick succession - // which might leave the CT tables (which Debezium consumes) in a stale state. final JsonNode sourceConfig = database.getSourceConfig(); - final Duration delay = (sourceConfig != null && sourceConfig.has("is_test") && sourceConfig.get("is_test").asBoolean()) - ? MAX_LSN_QUERY_DELAY_TEST - : MAX_LSN_QUERY_DELAY; final String maxLsnQuery = """ USE [%s]; - WAITFOR DELAY '%02d:%02d:%02d'; SELECT sys.fn_cdc_get_max_lsn() AS max_lsn; - """.formatted(dbName, delay.toHours(), delay.toMinutesPart(), delay.toSecondsPart()); + """.formatted(dbName); // Query the high-water mark. final List jsonNodes = database.bufferedResultSetQuery( connection -> connection.createStatement().executeQuery(maxLsnQuery), JdbcUtils.getDefaultSourceOperations()::rowToJson); Preconditions.checkState(jsonNodes.size() == 1); + final Lsn maxLsn; if (jsonNodes.get(0).get("max_lsn") != null) { - final Lsn maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue()); - LOGGER.info("identified target lsn: " + maxLsn); - return new MssqlCdcTargetPosition(maxLsn); + maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue()); } else { - throw new RuntimeException("SQL returned max LSN as null, this might be because the SQL Server Agent is not running. " + - "Please enable the Agent and try again (https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service)"); + maxLsn = Lsn.NULL; } + LOGGER.info("identified target lsn: " + maxLsn); + return new MssqlCdcTargetPosition(maxLsn); } catch (final SQLException | IOException e) { throw new RuntimeException(e); } diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java index 48236462c1ef..901a687be357 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -49,6 +49,7 @@ import io.airbyte.protocol.models.v0.AirbyteStreamState; import io.airbyte.protocol.models.v0.SyncMode; import io.debezium.connector.sqlserver.Lsn; +import java.sql.SQLException; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -470,4 +471,9 @@ private void assertStateTypes(final List stateMessages, fin } } + protected void waitForCdcRecords(String schemaName, String tableName, int recordCount) + throws SQLException { + testdb.waitForCdcRecords(schemaName, tableName, recordCount); + } + } diff --git a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java index b3e51aafc985..dd02c4ad0558 100644 --- a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java +++ b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.sql.SQLException; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -183,13 +184,7 @@ public void innerRun() throws Exception { public MsSQLTestDatabase(final MSSQLServerContainer container) { super(container); - bgThreads = new AbstractMssqlTestDatabaseBackgroundThread[] { - new MssqlTestDatabaseBackgroundThreadAgentState(), - new MssqlTestDatabaseBackgroundThreadFnCdcGetMaxLsn(), - new MssqlTestDatabaseBackgroundThreadLsnTimeMapping(), - new MssqlTestDatabaseBackgroundThreadEnableInternalTable(), - new MssqlTestDatabaseBackgroundThreadQueryChangeTables(), - new MssqlTestDatabaseBackgroundThreadQueryInternalTable()}; + bgThreads = new AbstractMssqlTestDatabaseBackgroundThread[] {}; LOGGER.info("SGX creating new database. databaseId=" + this.databaseId + ", databaseName=" + getDatabaseName()); } @@ -204,13 +199,6 @@ public MsSQLTestDatabase withCdc() { LOGGER.info("enabling CDC on database {} with id {}", getDatabaseName(), databaseId); with("EXEC sys.sp_cdc_enable_db;"); LOGGER.info("CDC enabled on database {} with id {}", getDatabaseName(), databaseId); - try { - LOGGER.info("sleeping"); - Thread.sleep(300_000); - LOGGER.info("resuming"); - } catch (InterruptedException e) { - - } return this; } @@ -222,7 +210,19 @@ public synchronized MsSQLTestDatabase withCdcForTable(String schemaName, String \t@role_name = %s, \t@supports_net_changes = 0"""; String sqlRoleName = roleName == null ? "NULL" : "N'%s'".formatted(roleName); - return with(enableCdcSqlFmt.formatted(schemaName, tableName, sqlRoleName)); + Instant startTime = Instant.now(); + Instant timeout = startTime.plusSeconds(300); + while (timeout.isAfter(Instant.now())) { + try { + getDslContext().execute(enableCdcSqlFmt.formatted(schemaName, tableName, sqlRoleName)); + return this; + } catch (Exception e) { + if (!e.getMessage().contains("The error returned was 14258: 'Cannot perform this operation while SQLServerAgent is starting.")) { + throw new RuntimeException(e); + } + } + } + throw new RuntimeException("Couldn't enable CDC for table %s.%s".formatted(schemaName, tableName)); } public MsSQLTestDatabase withoutCdc() { @@ -248,8 +248,7 @@ public MsSQLTestDatabase withWaitUntilAgentStopped() { } public MsSQLTestDatabase withShortenedCapturePollingInterval() { - return with("EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = %d;", - MssqlCdcTargetPosition.MAX_LSN_QUERY_DELAY_TEST.toSeconds()); + return with("EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = 1;"); } private void waitForAgentState(final boolean running) { @@ -297,6 +296,26 @@ public MsSQLTestDatabase withWaitUntilMaxLsnAvailable() { throw new RuntimeException("Exhausted retry attempts while polling for max LSN availability"); } + public void waitForCdcRecords(String schemaName, String tableName, int recordCount) + throws SQLException { + int maxTimeoutSec = 60; + String sql = "SELECT count(*) FROM cdc.%s_%s_ct".formatted(schemaName, tableName); + int actualRecordCount; + Instant startTime = Instant.now(); + Instant maxTime = startTime.plusSeconds(maxTimeoutSec); + do { + LOGGER.info("fetching the number of CDC records for {}.{}", schemaName, tableName); + actualRecordCount = query(ctx -> ctx.fetch(sql)).get(0).get(0, Integer.class); + LOGGER.info("Found {} CDC records for {}.{}. Expecting {}. Trying again", actualRecordCount, schemaName, tableName, recordCount); + } while (actualRecordCount < recordCount && maxTime.isAfter(Instant.now())); + if (actualRecordCount >= recordCount) { + LOGGER.info("found {} records!", actualRecordCount); + } else { + throw new RuntimeException( + "failed to find %d records after %s seconds. Only found %d!".formatted(recordCount, maxTimeoutSec, actualRecordCount)); + } + } + @Override public String getPassword() { return "S00p3rS33kr3tP4ssw0rd!";