From f0e74472dd6adf3a5a51dc151ffe0e55a940b524 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Tue, 20 Feb 2024 14:09:05 -0800 Subject: [PATCH] [Source-mysql] : Add config to throw an error on invalid CDC position (#35338) --- .../connectors/source-mysql/metadata.yaml | 2 +- .../source/mysql/MySqlSpecConstants.java | 15 +++++++++++++++ .../mysql/initialsync/MySqlInitialReadUtil.java | 8 ++++++++ .../source-mysql/src/main/resources/spec.json | 9 +++++++++ .../resources/expected_cloud_spec.json | 9 +++++++++ .../resources/expected_oss_spec.json | 9 +++++++++ .../source/mysql/CdcMysqlSourceTest.java | 9 +++++++++ .../src/test/resources/expected_cloud_spec.json | 9 +++++++++ .../src/test/resources/expected_oss_spec.json | 9 +++++++++ .../source/mysql/MySQLTestDatabase.java | 8 ++++++++ docs/integrations/sources/mysql.md | 1 + .../sources/mysql/mysql-troubleshooting.md | 12 ++++++++++++ 12 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 68a3b86807fe..f63c1375e894 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.3.7 + dockerImageTag: 3.3.8 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java new file mode 100644 index 000000000000..7735470482da --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSpecConstants.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql; + +// Constants defined in +// airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json. +public class MySqlSpecConstants { + + public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior"; + public static final String FAIL_SYNC_OPTION = "Fail sync"; + public static final String RESYNC_DATA_OPTION = "Re-sync data"; + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java index ff0b7a477e5c..47aa83ee09fc 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java @@ -7,6 +7,8 @@ import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams; import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.prettyPrintConfiguredAirbyteStreamList; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.FAIL_SYNC_OPTION; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET; import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadGlobalStateManager.STATE_TYPE_KEY; import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.PRIMARY_KEY_STATE_TYPE; @@ -25,6 +27,7 @@ import io.airbyte.cdk.integrations.source.relationaldb.TableInfo; import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState; import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -111,6 +114,11 @@ public static List> getCdcReadIterators(fi if (!savedOffsetStillPresentOnServer) { AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage()); + if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get( + INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) { + throw new ConfigErrorException( + "Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details."); + } LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch"); } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json index 841fa1f3bdba..78450b13aabd 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json @@ -211,6 +211,15 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json index 50d717a95886..871b7c0c38bb 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json @@ -189,6 +189,15 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json index 1a884d8de813..7ffbbad5f718 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json @@ -211,6 +211,15 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index 5825ae0848df..b88b5baa6420 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -10,6 +10,7 @@ import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_DEFAULT_CURSOR; import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE; import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.FAIL_SYNC_OPTION; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.IS_COMPRESSED; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET; import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_DB_HISTORY; @@ -20,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; @@ -33,6 +35,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.debezium.CdcSourceTest; import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -277,6 +280,12 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception { dataFromSecondBatch); assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount, recordsFromSecondBatch.size(), "Expected 46 records to be replicated in the second sync."); + + JsonNode failSyncConfig = testdb.testConfigBuilder() + .withCdcReplication(FAIL_SYNC_OPTION) + .with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1) + .build(); + assertThrows(ConfigErrorException.class, () -> source().read(failSyncConfig, getConfiguredCatalog(), state)); } /** diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json index 52441e124b17..66f0b3bdf647 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json @@ -205,6 +205,15 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json index 841fa1f3bdba..78450b13aabd 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json @@ -211,6 +211,15 @@ "description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.", "order": 2, "always_show": true + }, + "invalid_cdc_cursor_position_behavior": { + "type": "string", + "title": "Invalid CDC position behavior (Advanced)", + "description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.", + "enum": ["Fail sync", "Re-sync data"], + "default": "Fail sync", + "order": 3, + "always_show": true } } }, diff --git a/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java b/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java index fc031ed541c7..219d5e90f479 100644 --- a/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java +++ b/airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java @@ -4,6 +4,9 @@ package io.airbyte.integrations.source.mysql; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY; +import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.RESYNC_DATA_OPTION; + import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.factory.DatabaseDriver; import io.airbyte.cdk.testutils.TestDatabase; @@ -128,12 +131,17 @@ public MySQLConfigBuilder withStandardReplication() { } public MySQLConfigBuilder withCdcReplication() { + return withCdcReplication(RESYNC_DATA_OPTION); + } + + public MySQLConfigBuilder withCdcReplication(String cdcCursorFailBehaviour) { return this .with("is_test", true) .with("replication_method", ImmutableMap.builder() .put("method", "CDC") .put("initial_waiting_seconds", 5) .put("server_time_zone", "America/Los_Angeles") + .put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorFailBehaviour) .build()); } diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 09117fd7e280..18e59658342d 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.8 | 2024-02-20 | [35338](https://github.com/airbytehq/airbyte/pull/35338) | Add config to throw an error on invalid CDC position. | | 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. | | 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. | | 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name | diff --git a/docs/integrations/sources/mysql/mysql-troubleshooting.md b/docs/integrations/sources/mysql/mysql-troubleshooting.md index aee512157839..7e6265d0b867 100644 --- a/docs/integrations/sources/mysql/mysql-troubleshooting.md +++ b/docs/integrations/sources/mysql/mysql-troubleshooting.md @@ -18,6 +18,18 @@ * Amazon RDS MySQL or MariaDB connection issues: If you see the following `Cannot create a PoolableConnectionFactory` error, please add `enabledTLSProtocols=TLSv1.2` in the JDBC parameters. * Amazon RDS MySQL connection issues: If you see `Error: HikariPool-1 - Connection is not available, request timed out after 30001ms.`, many times this due to your VPC not allowing public traffic. We recommend going through [this AWS troubleshooting checklist](https://aws.amazon.com/premiumsupport/knowledge-center/rds-cannot-connect/) to ensure the correct permissions/settings have been granted to allow Airbyte to connect to your database. +### Under CDC incremental mode, there are still full refresh syncs + +Normally under the CDC mode, the MySQL source will first run a full refresh sync to read the snapshot of all the existing data, and all subsequent runs will only be incremental syncs reading from the binlogs. However, occasionally, you may see full refresh syncs after the initial run. When this happens, you will see the following log: + +> Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch + +The root causes is that the binglogs needed for the incremental sync have been removed by MySQL. This can occur under the following scenarios: + +- When there are lots of database updates resulting in more WAL files than allowed in the `pg_wal` directory, Postgres will purge or archive the WAL files. This scenario is preventable. Possible solutions include: + - Sync the data source more frequently. + - Set a higher `binlog_expire_logs_seconds`. It's recommended to set this value to a time period of 7 days. See detailed documentation [here](https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_expire_logs_seconds). The downside of this approach is that more disk space will be needed. + ### EventDataDeserializationException errors during initial snapshot When a sync runs for the first time using CDC, Airbyte performs an initial consistent snapshot of your database. Airbyte doesn't acquire any table locks \(for tables defined with MyISAM engine, the tables would still be locked\) while creating the snapshot to allow writes by other database clients. But in order for the sync to work without any error/unexpected behaviour, it is assumed that no schema changes are happening while the snapshot is running.