Skip to content

Commit

Permalink
acceptance tests - consolidate updates and make sure we always sleep …
Browse files Browse the repository at this point in the history
…before syncing (#11478)
  • Loading branch information
mfsiega-airbyte committed Feb 29, 2024
1 parent e81128a commit cd528f1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,29 +653,36 @@ public ConnectionRead getConnection(final UUID connectionId) throws Exception {
public void updateConnectionSchedule(
final UUID connectionId,
final ConnectionScheduleType newScheduleType,
final ConnectionScheduleData newScheduleData) {
AirbyteApiClient.retryWithJitter(() -> apiClient.getConnectionApi().updateConnection(
final ConnectionScheduleData newScheduleData)
throws Exception {
updateConnection(
new ConnectionUpdate()
.connectionId(connectionId)
.scheduleType(newScheduleType)
.scheduleData(newScheduleData)),
"update connection", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES);
.scheduleData(newScheduleData));
}

public void updateConnectionCatalog(final UUID connectionId, final AirbyteCatalog catalog) {
AirbyteApiClient.retryWithJitter(() -> apiClient.getConnectionApi().updateConnection(
new ConnectionUpdate()
.connectionId(connectionId)
.syncCatalog(catalog)),
"update connection catalog", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES);
public void updateConnectionCatalog(final UUID connectionId, final AirbyteCatalog catalog) throws Exception {
updateConnection(new ConnectionUpdate()
.connectionId(connectionId)
.syncCatalog(catalog));
}

public ConnectionRead updateConnectionSourceCatalogId(final UUID connectionId, UUID sourceCatalogId) {
return AirbyteApiClient.retryWithJitter(() -> apiClient.getConnectionApi().updateConnection(
public ConnectionRead updateConnectionSourceCatalogId(final UUID connectionId, UUID sourceCatalogId) throws Exception {
return updateConnection(
new ConnectionUpdate()
.connectionId(connectionId)
.sourceCatalogId(sourceCatalogId)),
"update connection source catalog id", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES);
.sourceCatalogId(sourceCatalogId));
}

private ConnectionRead updateConnection(final ConnectionUpdate request) throws Exception {
final var result = AirbyteApiClient.retryWithJitterThrows(() -> apiClient.getConnectionApi().updateConnection(request),
"update connection catalog", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES);
// Attempting to sync immediately after updating the connection can run into a race condition in the
// connection manager workflow hangs. This should be fixed in the backend, but for now we try to
// tolerate it.
Thread.sleep(1000 * 5);
return result;
}

public JobInfoRead syncConnection(final UUID connectionId) {
Expand Down Expand Up @@ -959,10 +966,10 @@ private void clearDestinationDbData() throws SQLException {
}
}

private void disableConnection(final UUID connectionId) throws ApiException {
private void disableConnection(final UUID connectionId) throws Exception {
final ConnectionUpdate connectionUpdate =
new ConnectionUpdate().connectionId(connectionId).status(ConnectionStatus.DEPRECATED);
apiClient.getConnectionApi().updateConnection(connectionUpdate);
updateConnection(connectionUpdate);
}

private void deleteSource(final UUID sourceId) {
Expand Down Expand Up @@ -1190,13 +1197,13 @@ public void deleteSourceDefinition(UUID sourceDefinitionId) throws Exception {

public void updateSchemaChangePreference(final UUID connectionId,
final NonBreakingChangesPreference nonBreakingChangesPreference,
final SchemaChangeBackfillPreference backfillPreference) {
AirbyteApiClient.retryWithJitter(() -> apiClient.getConnectionApi().updateConnection(
final SchemaChangeBackfillPreference backfillPreference)
throws Exception {
updateConnection(
new ConnectionUpdate()
.connectionId(connectionId)
.nonBreakingChangesPreference(nonBreakingChangesPreference)
.backfillPreference(backfillPreference)),
"update connection non breaking change preference", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES);
.backfillPreference(backfillPreference));
}

public WebBackendConnectionRead webBackendGetConnectionAndRefreshSchema(UUID connectionId) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ void testPropagateAllChangesViaSyncRefresh() throws Exception {
void testBackfillDisabled() throws Exception {
testHarness.updateSchemaChangePreference(createdConnection.getConnectionId(), NonBreakingChangesPreference.PROPAGATE_FULLY,
SchemaChangeBackfillPreference.DISABLED);
// Avoid a race with the connection manager.
Thread.sleep(1000 * 5);
// Run a sync with the initial data.
final var jobRead = testHarness.syncConnection(createdConnection.getConnectionId()).getJob();
testHarness.waitForSuccessfulSyncNoTimeout(jobRead);
Expand All @@ -251,10 +249,6 @@ void testBackfillDisabled() throws Exception {
void testBackfillOnNewColumn() throws Exception {
testHarness.updateSchemaChangePreference(createdConnection.getConnectionId(), NonBreakingChangesPreference.PROPAGATE_FULLY,
SchemaChangeBackfillPreference.ENABLED);
// We sometimes get a race where the connection manager isn't ready, and this results in
// `syncConnection` simply
// hanging. This is a bug we should fix in the backend, but we tolerate it here for now.
Thread.sleep(1000 * 5);
// Run a sync with the initial data.
final var jobRead = testHarness.syncConnection(createdConnection.getConnectionId()).getJob();
testHarness.waitForSuccessfulSyncNoTimeout(jobRead);
Expand Down

0 comments on commit cd528f1

Please sign in to comment.