Skip to content

Commit

Permalink
[DB Sources] : Add analytics message to understand when WASS occurs (…
Browse files Browse the repository at this point in the history
…not just when we timeout) (#42068)
  • Loading branch information
akashkulk authored Jul 17, 2024
1 parent 939bb2e commit 8e53180
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 4 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ corresponds to that version.
=======
| Version | Date | Pull Request | Subject |
|:------------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.41.8 | 2024-07-18 | [\#42068](https://github.com/airbytehq/airbyte/pull/42068) | Add analytics message for WASS occurrence. |
| 0.41.7 | 2024-07-17 | [\#42055](https://github.com/airbytehq/airbyte/pull/42055) | Add debezium heartbeat timeout back to shutdown debezium. |
| 0.41.6 | 2024-07-17 | [\#41996](https://github.com/airbytehq/airbyte/pull/41996) | Fix java interop compilation issue in Config/TransientErrorException. |
| 0.41.5 | 2024-07-16 | [\#42011] (https://github.com/airbytehq/airbyte/pull/42011) | Async consumer accepts null default namespace |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ object DbAnalyticsUtils {
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"
const val DEBEZIUM_CLOSE_REASON_KEY = "db-sources-debezium-close-reason"
const val WASS_OCCURRENCE_KEY = "db-sources-wass-occurrence"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
Expand All @@ -39,4 +40,9 @@ object DbAnalyticsUtils {
fun debeziumCloseReasonMessage(reason: String): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage().withType(DEBEZIUM_CLOSE_REASON_KEY).withValue(reason)
}

@JvmStatic
fun wassOccurrenceMessage(): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage().withType(WASS_OCCURRENCE_KEY).withValue("1")
}
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.41.7
version=0.41.8
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.41.7'
cdkVersionRequired = '0.41.8'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.5.1
dockerImageTag: 3.5.2
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.postgres.cdc;

import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
import static io.airbyte.cdk.db.DbAnalyticsUtils.wassOccurrenceMessage;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.FAIL_SYNC_OPTION;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
Expand Down Expand Up @@ -332,6 +333,7 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
* sync to clear the WAL. We cannot simply add the same cdc iterators as their target end position
* is fixed to the tip of the WAL at the start of the sync.
*/
AirbyteTraceMessageUtility.emitAnalyticsTrace(wassOccurrenceMessage());
final var propertiesManager = new RelationalDbDebeziumPropertiesManager(
PostgresCdcProperties.getDebeziumDefaultProperties(database), sourceConfig, catalog, startedCdcStreamList);
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = getCdcIncrementalIteratorsSupplier(handler,
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.5.1 | 2024-07-17 | [42055](https://github.com/airbytehq/airbyte/pull/42055) | Add debezium heartbeat timeout back to shutdown debezium. |
| 3.5.2 | 2024-07-17 | [42068](https://github.com/airbytehq/airbyte/pull/42068) | Add analytics for WASS case occurrence. |
| 3.5.1 | 2024-07-17 | [42055](https://github.com/airbytehq/airbyte/pull/42055) | Add debezium heartbeat timeout back to shutdown debezium. |
| 3.5.0 | 2024-07-17 | [41651](https://github.com/airbytehq/airbyte/pull/41651) | Implement WASS algo - large initial snapshots shouldn't block CDC. |
| 3.4.26 | 2024-07-15 | [41654](https://github.com/airbytehq/airbyte/pull/41654) | Allow null value for array typed columns in CDC. |
| 3.4.25 | 2024-07-12 | [41651](https://github.com/airbytehq/airbyte/pull/41651) | Throw transient error if tables of interest and undergoing full vacuum. |
Expand Down

0 comments on commit 8e53180

Please sign in to comment.