Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor debezium harness in CDK, push connector-specific logic down to connectors #34573

Merged
merged 12 commits into from
Jan 27, 2024
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.16.0 | 2024-01-26 | [\#34573](https://github.com/airbytehq/airbyte/pull/34573) | Untangle Debezium harness dependencies. |
| 0.15.2 | 2024-01-25 | [\#34441](https://github.com/airbytehq/airbyte/pull/34441) | Improve airbyte-api build performance. |
| 0.15.1 | 2024-01-25 | [\#34451](https://github.com/airbytehq/airbyte/pull/34451) | Async destinations: Better logging when we fail to parse an AirbyteMessage |
| 0.15.0 | 2024-01-23 | [\#34441](https://github.com/airbytehq/airbyte/pull/34441) | Removed connector registry and micronaut dependencies. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.15.2
version=0.16.0
7 changes: 0 additions & 7 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,11 @@ dependencies {
implementation libs.hikaricp
implementation libs.debezium.api
implementation libs.debezium.embedded
implementation libs.debezium.mysql
implementation libs.debezium.postgres
implementation libs.debezium.mongodb

implementation libs.bundles.datadog
// implementation 'com.datadoghq:dd-trace-api'
implementation 'org.apache.sshd:sshd-mina:2.8.0'

implementation libs.testcontainers
implementation libs.testcontainers.mysql
implementation libs.testcontainers.jdbc
implementation libs.testcontainers.postgresql
testImplementation libs.testcontainers.jdbc
testImplementation libs.testcontainers.mysql
testImplementation libs.testcontainers.postgresql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,23 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage.SchemaHistory;
import io.airbyte.cdk.integrations.debezium.internals.ChangeEventWithMetadata;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordIterator;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumShutdownProcedure;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateDecoratingIterator;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,88 +46,47 @@ public class AirbyteDebeziumHandler<T> {
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE}is 2048
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192
*/
private static final int QUEUE_CAPACITY = 10000;
public static final int QUEUE_CAPACITY = 10_000;

private final JsonNode config;
private final CdcTargetPosition<T> targetPosition;
private final boolean trackSchemaHistory;
private final Duration firstRecordWaitTime, subsequentRecordWaitTime;
private final OptionalInt queueSize;
private final int queueSize;
private final boolean addDbNameToOffsetState;

public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition<T> targetPosition,
final boolean trackSchemaHistory,
final Duration firstRecordWaitTime,
final Duration subsequentRecordWaitTime,
final OptionalInt queueSize) {
final int queueSize,
final boolean addDbNameToOffsetState) {
this.config = config;
this.targetPosition = targetPosition;
this.trackSchemaHistory = trackSchemaHistory;
this.firstRecordWaitTime = firstRecordWaitTime;
this.subsequentRecordWaitTime = subsequentRecordWaitTime;
this.queueSize = queueSize;
this.addDbNameToOffsetState = addDbNameToOffsetState;
}

public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(
final ConfiguredAirbyteCatalog catalogContainingStreamsToSnapshot,
final CdcMetadataInjector cdcMetadataInjector,
final Properties snapshotProperties,
final CdcStateHandler cdcStateHandler,
final DebeziumPropertiesManager.DebeziumConnectorType debeziumConnectorType,
final Instant emittedAt) {

LOGGER.info("Running snapshot for " + catalogContainingStreamsToSnapshot.getStreams().size() + " new tables");
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(queueSize.orElse(QUEUE_CAPACITY));

final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose();
final DebeziumRecordPublisher tableSnapshotPublisher = new DebeziumRecordPublisher(snapshotProperties,
config,
catalogContainingStreamsToSnapshot,
offsetManager,
schemaHistoryManager(new SchemaHistory<>(Optional.empty(), false), cdcStateHandler.compressSchemaHistoryForState()),
debeziumConnectorType);
tableSnapshotPublisher.start(queue);

final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
targetPosition,
tableSnapshotPublisher::hasClosed,
new DebeziumShutdownProcedure<>(queue, tableSnapshotPublisher::close, tableSnapshotPublisher::hasClosed),
firstRecordWaitTime,
subsequentRecordWaitTime);

return AutoCloseableIterators.concatWithEagerClose(AutoCloseableIterators
.transform(
eventIterator,
(event) -> DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, catalogContainingStreamsToSnapshot, emittedAt,
debeziumConnectorType, config)),
AutoCloseableIterators
.fromIterator(MoreIterators.singletonIteratorFromSupplier(cdcStateHandler::saveStateAfterCompletionOfSnapshotOfNewStreams)));
}

public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final ConfiguredAirbyteCatalog catalog,
public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final DebeziumPropertiesManager debeziumPropertiesManager,
final DebeziumEventConverter eventConverter,
final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final DebeziumPropertiesManager.DebeziumConnectorType debeziumConnectorType,
final Instant emittedAt,
final boolean addDbNameToState) {
final CdcStateHandler cdcStateHandler) {
LOGGER.info("Using CDC: {}", true);
LOGGER.info("Using DBZ version: {}", DebeziumEngine.class.getPackage().getImplementationVersion());
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(
cdcSavedInfoFetcher.getSavedOffset(),
addDbNameToState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager =
trackSchemaHistory ? schemaHistoryManager(
cdcSavedInfoFetcher.getSavedSchemaHistory(),
cdcStateHandler.compressSchemaHistoryForState())
: Optional.empty();

final var publisher = new DebeziumRecordPublisher(
connectorProperties, config, catalog, offsetManager, schemaHistoryManager, debeziumConnectorType);
final var queue = new LinkedBlockingQueue<ChangeEvent<String, String>>(queueSize.orElse(QUEUE_CAPACITY));
publisher.start(queue);
addDbNameToOffsetState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final var schemaHistoryManager = trackSchemaHistory
? Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(
cdcSavedInfoFetcher.getSavedSchemaHistory(), cdcStateHandler.compressSchemaHistoryForState()))
: Optional.<AirbyteSchemaHistoryStorage>empty();
final var publisher = new DebeziumRecordPublisher(debeziumPropertiesManager);
final var queue = new LinkedBlockingQueue<ChangeEvent<String, String>>(queueSize);
publisher.start(queue, offsetManager, schemaHistoryManager);
// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
Expand All @@ -142,34 +96,22 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
firstRecordWaitTime,
subsequentRecordWaitTime);

final Duration syncCheckpointDuration =
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
final Duration syncCheckpointDuration = config.has(SYNC_CHECKPOINT_DURATION_PROPERTY)
? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.has(SYNC_CHECKPOINT_RECORDS_PROPERTY)
? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: SYNC_CHECKPOINT_RECORDS;
return AutoCloseableIterators.fromIterator(new DebeziumStateDecoratingIterator<>(
eventIterator,
cdcStateHandler,
targetPosition,
cdcMetadataInjector,
emittedAt,
eventConverter,
offsetManager,
trackSchemaHistory,
schemaHistoryManager.orElse(null),
syncCheckpointDuration,
syncCheckpointRecords,
catalog,
debeziumConnectorType,
config));
}

private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(final SchemaHistory<Optional<JsonNode>> schemaHistory,
final boolean compressSchemaHistoryForState) {
if (trackSchemaHistory) {
return Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(schemaHistory, compressSchemaHistoryForState));
}

return Optional.empty();
syncCheckpointRecords));
}

public static boolean isAnyStreamIncrementalSyncMode(final ConfiguredAirbyteCatalog catalog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -49,10 +50,6 @@ public AirbyteFileOffsetBackingStore(final Path offsetFilePath, final Optional<S
this.dbName = dbName;
}

public Path getOffsetFilePath() {
return offsetFilePath;
}

public Map<String, String> read() {
final Map<ByteBuffer, ByteBuffer> raw = load();

Expand Down Expand Up @@ -185,4 +182,12 @@ public static AirbyteFileOffsetBackingStore initializeDummyStateForSnapshotPurpo
return new AirbyteFileOffsetBackingStore(cdcOffsetFilePath, Optional.empty());
}

public void setDebeziumProperties(Properties props) {
// debezium engine configuration
// https://debezium.io/documentation/reference/2.2/development/engine.html#engine-properties
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetFilePath.toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.Properties;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -52,10 +53,6 @@ public AirbyteSchemaHistoryStorage(final Path path, final boolean compressSchema
this.compressSchemaHistoryForState = compressSchemaHistoryForState;
}

public Path getPath() {
return path;
}

public record SchemaHistory<T> (T schema, boolean isCompressed) {}

public SchemaHistory<String> read() {
Expand Down Expand Up @@ -224,4 +221,14 @@ public static AirbyteSchemaHistoryStorage initializeDBHistory(final SchemaHistor
return schemaHistoryManager;
}

public void setDebeziumProperties(Properties props) {
// https://debezium.io/documentation/reference/2.2/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", path.toString());
props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.integrations.debezium.CdcMetadataInjector;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.time.Instant;

public interface DebeziumEventConverter {

String CDC_LSN = "_ab_cdc_lsn";
String CDC_UPDATED_AT = "_ab_cdc_updated_at";
String CDC_DELETED_AT = "_ab_cdc_deleted_at";
String AFTER_EVENT = "after";
String BEFORE_EVENT = "before";
String OPERATION_FIELD = "op";
String SOURCE_EVENT = "source";

static AirbyteMessage buildAirbyteMessage(
final JsonNode source,
final CdcMetadataInjector cdcMetadataInjector,
final Instant emittedAt,
final JsonNode data) {
final String streamNamespace = cdcMetadataInjector.namespace(source);
final String streamName = cdcMetadataInjector.name(source);

final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(streamNamespace)
.withEmittedAt(emittedAt.toEpochMilli())
.withData(data);

return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(airbyteRecordMessage);
}

static JsonNode addCdcMetadata(
final ObjectNode baseNode,
final JsonNode source,
final CdcMetadataInjector cdcMetadataInjector,
final boolean isDelete) {

final long transactionMillis = source.get("ts_ms").asLong();
final String transactionTimestamp = Instant.ofEpochMilli(transactionMillis).toString();

baseNode.put(CDC_UPDATED_AT, transactionTimestamp);
cdcMetadataInjector.addMetaData(baseNode, source);

if (isDelete) {
baseNode.put(CDC_DELETED_AT, transactionTimestamp);
} else {
baseNode.put(CDC_DELETED_AT, (String) null);
}

return baseNode;
}

AirbyteMessage toAirbyteMessage(final ChangeEventWithMetadata event);

}
Loading
Loading