Skip to content

Commit

Permalink
fix 2?
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Feb 24, 2024
1 parent 6235d5a commit 0ba82ae
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -403,6 +404,11 @@ void testUpdate() throws Exception {
assertCdcMetaData(recordMessages2.get(0).getData(), true);
}

protected void waitForCdcRecords(String schemaName, String tableName, int recordCount)
throws SQLException {

}

@SuppressWarnings({"BusyWait", "CodeBlock2Expr"})
@Test
// Verify that when data is inserted into the database while a sync is happening and after the first
Expand All @@ -418,6 +424,7 @@ protected void testRecordsProducedDuringAndAfterSync() throws Exception {
"F-" + recordsCreated));
writeModelRecord(record);
}
waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, recordsToCreate);

final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = source()
.read(config(), getConfiguredCatalog(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.debezium.connector.sqlserver.Lsn;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -25,9 +24,6 @@
public class MssqlCdcTargetPosition implements CdcTargetPosition<Lsn> {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcTargetPosition.class);

public static final Duration MAX_LSN_QUERY_DELAY = Duration.ZERO;
public static final Duration MAX_LSN_QUERY_DELAY_TEST = Duration.ofSeconds(1);
public final Lsn targetLsn;

public MssqlCdcTargetPosition(final Lsn targetLsn) {
Expand Down Expand Up @@ -83,31 +79,24 @@ public int hashCode() {

public static MssqlCdcTargetPosition getTargetPosition(final JdbcDatabase database, final String dbName) {
try {
// We might have to wait a bit before querying the max_lsn to give the CDC capture job
// a chance to catch up. This is important in tests, where reads might occur in quick succession
// which might leave the CT tables (which Debezium consumes) in a stale state.
final JsonNode sourceConfig = database.getSourceConfig();
final Duration delay = (sourceConfig != null && sourceConfig.has("is_test") && sourceConfig.get("is_test").asBoolean())
? MAX_LSN_QUERY_DELAY_TEST
: MAX_LSN_QUERY_DELAY;
final String maxLsnQuery = """
USE [%s];
WAITFOR DELAY '%02d:%02d:%02d';
SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;
""".formatted(dbName, delay.toHours(), delay.toMinutesPart(), delay.toSecondsPart());
""".formatted(dbName);
// Query the high-water mark.
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(
connection -> connection.createStatement().executeQuery(maxLsnQuery),
JdbcUtils.getDefaultSourceOperations()::rowToJson);
Preconditions.checkState(jsonNodes.size() == 1);
final Lsn maxLsn;
if (jsonNodes.get(0).get("max_lsn") != null) {
final Lsn maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue());
LOGGER.info("identified target lsn: " + maxLsn);
return new MssqlCdcTargetPosition(maxLsn);
maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue());
} else {
throw new RuntimeException("SQL returned max LSN as null, this might be because the SQL Server Agent is not running. " +
"Please enable the Agent and try again (https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service)");
maxLsn = Lsn.NULL;
}
LOGGER.info("identified target lsn: " + maxLsn);
return new MssqlCdcTargetPosition(maxLsn);
} catch (final SQLException | IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.airbyte.protocol.models.v0.AirbyteStreamState;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.connector.sqlserver.Lsn;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -470,4 +471,9 @@ private void assertStateTypes(final List<AirbyteStateMessage> stateMessages, fin
}
}

protected void waitForCdcRecords(String schemaName, String tableName, int recordCount)
throws SQLException {
testdb.waitForCdcRecords(schemaName, tableName, recordCount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -183,13 +184,7 @@ public void innerRun() throws Exception {

public MsSQLTestDatabase(final MSSQLServerContainer<?> container) {
super(container);
bgThreads = new AbstractMssqlTestDatabaseBackgroundThread[] {
new MssqlTestDatabaseBackgroundThreadAgentState(),
new MssqlTestDatabaseBackgroundThreadFnCdcGetMaxLsn(),
new MssqlTestDatabaseBackgroundThreadLsnTimeMapping(),
new MssqlTestDatabaseBackgroundThreadEnableInternalTable(),
new MssqlTestDatabaseBackgroundThreadQueryChangeTables(),
new MssqlTestDatabaseBackgroundThreadQueryInternalTable()};
bgThreads = new AbstractMssqlTestDatabaseBackgroundThread[] {};
LOGGER.info("SGX creating new database. databaseId=" + this.databaseId + ", databaseName=" + getDatabaseName());
}

Expand All @@ -204,13 +199,6 @@ public MsSQLTestDatabase withCdc() {
LOGGER.info("enabling CDC on database {} with id {}", getDatabaseName(), databaseId);
with("EXEC sys.sp_cdc_enable_db;");
LOGGER.info("CDC enabled on database {} with id {}", getDatabaseName(), databaseId);
try {
LOGGER.info("sleeping");
Thread.sleep(300_000);
LOGGER.info("resuming");
} catch (InterruptedException e) {

}
return this;
}

Expand All @@ -222,7 +210,19 @@ public synchronized MsSQLTestDatabase withCdcForTable(String schemaName, String
\t@role_name = %s,
\t@supports_net_changes = 0""";
String sqlRoleName = roleName == null ? "NULL" : "N'%s'".formatted(roleName);
return with(enableCdcSqlFmt.formatted(schemaName, tableName, sqlRoleName));
Instant startTime = Instant.now();
Instant timeout = startTime.plusSeconds(300);
while (timeout.isAfter(Instant.now())) {
try {
getDslContext().execute(enableCdcSqlFmt.formatted(schemaName, tableName, sqlRoleName));
return this;
} catch (Exception e) {
if (!e.getMessage().contains("The error returned was 14258: 'Cannot perform this operation while SQLServerAgent is starting.")) {
throw new RuntimeException(e);
}
}
}
throw new RuntimeException("Couldn't enable CDC for table %s.%s".formatted(schemaName, tableName));
}

public MsSQLTestDatabase withoutCdc() {
Expand All @@ -248,8 +248,7 @@ public MsSQLTestDatabase withWaitUntilAgentStopped() {
}

public MsSQLTestDatabase withShortenedCapturePollingInterval() {
return with("EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = %d;",
MssqlCdcTargetPosition.MAX_LSN_QUERY_DELAY_TEST.toSeconds());
return with("EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = 1;");
}

private void waitForAgentState(final boolean running) {
Expand Down Expand Up @@ -297,6 +296,26 @@ public MsSQLTestDatabase withWaitUntilMaxLsnAvailable() {
throw new RuntimeException("Exhausted retry attempts while polling for max LSN availability");
}

public void waitForCdcRecords(String schemaName, String tableName, int recordCount)
throws SQLException {
int maxTimeoutSec = 60;
String sql = "SELECT count(*) FROM cdc.%s_%s_ct".formatted(schemaName, tableName);
int actualRecordCount;
Instant startTime = Instant.now();
Instant maxTime = startTime.plusSeconds(maxTimeoutSec);
do {
LOGGER.info("fetching the number of CDC records for {}.{}", schemaName, tableName);
actualRecordCount = query(ctx -> ctx.fetch(sql)).get(0).get(0, Integer.class);
LOGGER.info("Found {} CDC records for {}.{}. Expecting {}. Trying again", actualRecordCount, schemaName, tableName, recordCount);
} while (actualRecordCount < recordCount && maxTime.isAfter(Instant.now()));
if (actualRecordCount >= recordCount) {
LOGGER.info("found {} records!", actualRecordCount);
} else {
throw new RuntimeException(
"failed to find %d records after %s seconds. Only found %d!".formatted(recordCount, maxTimeoutSec, actualRecordCount));
}
}

@Override
public String getPassword() {
return "S00p3rS33kr3tP4ssw0rd!";
Expand Down

0 comments on commit 0ba82ae

Please sign in to comment.