From cd528f156375bfd8ffcc2a254160352c8fa3224d Mon Sep 17 00:00:00 2001 From: Michael Siega <109092231+mfsiega-airbyte@users.noreply.github.com> Date: Thu, 29 Feb 2024 22:14:44 +0100 Subject: [PATCH] acceptance tests - consolidate updates and make sure we always sleep before syncing (#11478) --- .../test/utils/AcceptanceTestHarness.java | 47 +++++++++++-------- .../acceptance/SchemaManagementTests.java | 6 --- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java index 4c2373edcf2..e1d0099d4a8 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AcceptanceTestHarness.java @@ -653,29 +653,36 @@ public ConnectionRead getConnection(final UUID connectionId) throws Exception { public void updateConnectionSchedule( final UUID connectionId, final ConnectionScheduleType newScheduleType, - final ConnectionScheduleData newScheduleData) { - AirbyteApiClient.retryWithJitter(() -> apiClient.getConnectionApi().updateConnection( + final ConnectionScheduleData newScheduleData) + throws Exception { + updateConnection( new ConnectionUpdate() .connectionId(connectionId) .scheduleType(newScheduleType) - .scheduleData(newScheduleData)), - "update connection", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES); + .scheduleData(newScheduleData)); } - public void updateConnectionCatalog(final UUID connectionId, final AirbyteCatalog catalog) { - AirbyteApiClient.retryWithJitter(() -> apiClient.getConnectionApi().updateConnection( - new ConnectionUpdate() - .connectionId(connectionId) - .syncCatalog(catalog)), - "update connection catalog", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES); + public void updateConnectionCatalog(final UUID connectionId, final AirbyteCatalog catalog) throws Exception { + updateConnection(new ConnectionUpdate() + .connectionId(connectionId) + .syncCatalog(catalog)); } - public ConnectionRead updateConnectionSourceCatalogId(final UUID connectionId, UUID sourceCatalogId) { - return AirbyteApiClient.retryWithJitter(() -> apiClient.getConnectionApi().updateConnection( + public ConnectionRead updateConnectionSourceCatalogId(final UUID connectionId, UUID sourceCatalogId) throws Exception { + return updateConnection( new ConnectionUpdate() .connectionId(connectionId) - .sourceCatalogId(sourceCatalogId)), - "update connection source catalog id", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES); + .sourceCatalogId(sourceCatalogId)); + } + + private ConnectionRead updateConnection(final ConnectionUpdate request) throws Exception { + final var result = AirbyteApiClient.retryWithJitterThrows(() -> apiClient.getConnectionApi().updateConnection(request), + "update connection catalog", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES); + // Attempting to sync immediately after updating the connection can run into a race condition in the + // connection manager workflow hangs. This should be fixed in the backend, but for now we try to + // tolerate it. + Thread.sleep(1000 * 5); + return result; } public JobInfoRead syncConnection(final UUID connectionId) { @@ -959,10 +966,10 @@ private void clearDestinationDbData() throws SQLException { } } - private void disableConnection(final UUID connectionId) throws ApiException { + private void disableConnection(final UUID connectionId) throws Exception { final ConnectionUpdate connectionUpdate = new ConnectionUpdate().connectionId(connectionId).status(ConnectionStatus.DEPRECATED); - apiClient.getConnectionApi().updateConnection(connectionUpdate); + updateConnection(connectionUpdate); } private void deleteSource(final UUID sourceId) { @@ -1190,13 +1197,13 @@ public void deleteSourceDefinition(UUID sourceDefinitionId) throws Exception { public void updateSchemaChangePreference(final UUID connectionId, final NonBreakingChangesPreference nonBreakingChangesPreference, - final SchemaChangeBackfillPreference backfillPreference) { - AirbyteApiClient.retryWithJitter(() -> apiClient.getConnectionApi().updateConnection( + final SchemaChangeBackfillPreference backfillPreference) + throws Exception { + updateConnection( new ConnectionUpdate() .connectionId(connectionId) .nonBreakingChangesPreference(nonBreakingChangesPreference) - .backfillPreference(backfillPreference)), - "update connection non breaking change preference", JITTER_MAX_INTERVAL_SECS, FINAL_INTERVAL_SECS, MAX_TRIES); + .backfillPreference(backfillPreference)); } public WebBackendConnectionRead webBackendGetConnectionAndRefreshSchema(UUID connectionId) throws Exception { diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SchemaManagementTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SchemaManagementTests.java index 34ee3b28f26..a42216cdcba 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SchemaManagementTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/SchemaManagementTests.java @@ -223,8 +223,6 @@ void testPropagateAllChangesViaSyncRefresh() throws Exception { void testBackfillDisabled() throws Exception { testHarness.updateSchemaChangePreference(createdConnection.getConnectionId(), NonBreakingChangesPreference.PROPAGATE_FULLY, SchemaChangeBackfillPreference.DISABLED); - // Avoid a race with the connection manager. - Thread.sleep(1000 * 5); // Run a sync with the initial data. final var jobRead = testHarness.syncConnection(createdConnection.getConnectionId()).getJob(); testHarness.waitForSuccessfulSyncNoTimeout(jobRead); @@ -251,10 +249,6 @@ void testBackfillDisabled() throws Exception { void testBackfillOnNewColumn() throws Exception { testHarness.updateSchemaChangePreference(createdConnection.getConnectionId(), NonBreakingChangesPreference.PROPAGATE_FULLY, SchemaChangeBackfillPreference.ENABLED); - // We sometimes get a race where the connection manager isn't ready, and this results in - // `syncConnection` simply - // hanging. This is a bug we should fix in the backend, but we tolerate it here for now. - Thread.sleep(1000 * 5); // Run a sync with the initial data. final var jobRead = testHarness.syncConnection(createdConnection.getConnectionId()).getJob(); testHarness.waitForSuccessfulSyncNoTimeout(jobRead);