Skip to content

Commit

Permalink
refactor debezium harness in CDK, push connector-specific logic down …
Browse files Browse the repository at this point in the history
…to connectors (#34573)
  • Loading branch information
postamar authored Jan 27, 2024
1 parent 02c0ec7 commit 6f0fd7a
Show file tree
Hide file tree
Showing 109 changed files with 666 additions and 739 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.16.0 | 2024-01-26 | [\#34573](https://github.com/airbytehq/airbyte/pull/34573) | Untangle Debezium harness dependencies. |
| 0.15.2 | 2024-01-25 | [\#34441](https://github.com/airbytehq/airbyte/pull/34441) | Improve airbyte-api build performance. |
| 0.15.1 | 2024-01-25 | [\#34451](https://github.com/airbytehq/airbyte/pull/34451) | Async destinations: Better logging when we fail to parse an AirbyteMessage |
| 0.15.0 | 2024-01-23 | [\#34441](https://github.com/airbytehq/airbyte/pull/34441) | Removed connector registry and micronaut dependencies. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.15.2
version=0.16.0
7 changes: 0 additions & 7 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,11 @@ dependencies {
implementation libs.hikaricp
implementation libs.debezium.api
implementation libs.debezium.embedded
implementation libs.debezium.mysql
implementation libs.debezium.postgres
implementation libs.debezium.mongodb

implementation libs.bundles.datadog
// implementation 'com.datadoghq:dd-trace-api'
implementation 'org.apache.sshd:sshd-mina:2.8.0'

implementation libs.testcontainers
implementation libs.testcontainers.mysql
implementation libs.testcontainers.jdbc
implementation libs.testcontainers.postgresql
testImplementation libs.testcontainers.jdbc
testImplementation libs.testcontainers.mysql
testImplementation libs.testcontainers.postgresql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,23 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage.SchemaHistory;
import io.airbyte.cdk.integrations.debezium.internals.ChangeEventWithMetadata;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordIterator;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumShutdownProcedure;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateDecoratingIterator;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,88 +46,47 @@ public class AirbyteDebeziumHandler<T> {
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE}is 2048
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192
*/
private static final int QUEUE_CAPACITY = 10000;
public static final int QUEUE_CAPACITY = 10_000;

private final JsonNode config;
private final CdcTargetPosition<T> targetPosition;
private final boolean trackSchemaHistory;
private final Duration firstRecordWaitTime, subsequentRecordWaitTime;
private final OptionalInt queueSize;
private final int queueSize;
private final boolean addDbNameToOffsetState;

public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition<T> targetPosition,
final boolean trackSchemaHistory,
final Duration firstRecordWaitTime,
final Duration subsequentRecordWaitTime,
final OptionalInt queueSize) {
final int queueSize,
final boolean addDbNameToOffsetState) {
this.config = config;
this.targetPosition = targetPosition;
this.trackSchemaHistory = trackSchemaHistory;
this.firstRecordWaitTime = firstRecordWaitTime;
this.subsequentRecordWaitTime = subsequentRecordWaitTime;
this.queueSize = queueSize;
this.addDbNameToOffsetState = addDbNameToOffsetState;
}

public AutoCloseableIterator<AirbyteMessage> getSnapshotIterators(
final ConfiguredAirbyteCatalog catalogContainingStreamsToSnapshot,
final CdcMetadataInjector cdcMetadataInjector,
final Properties snapshotProperties,
final CdcStateHandler cdcStateHandler,
final DebeziumPropertiesManager.DebeziumConnectorType debeziumConnectorType,
final Instant emittedAt) {

LOGGER.info("Running snapshot for " + catalogContainingStreamsToSnapshot.getStreams().size() + " new tables");
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(queueSize.orElse(QUEUE_CAPACITY));

final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeDummyStateForSnapshotPurpose();
final DebeziumRecordPublisher tableSnapshotPublisher = new DebeziumRecordPublisher(snapshotProperties,
config,
catalogContainingStreamsToSnapshot,
offsetManager,
schemaHistoryManager(new SchemaHistory<>(Optional.empty(), false), cdcStateHandler.compressSchemaHistoryForState()),
debeziumConnectorType);
tableSnapshotPublisher.start(queue);

final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
targetPosition,
tableSnapshotPublisher::hasClosed,
new DebeziumShutdownProcedure<>(queue, tableSnapshotPublisher::close, tableSnapshotPublisher::hasClosed),
firstRecordWaitTime,
subsequentRecordWaitTime);

return AutoCloseableIterators.concatWithEagerClose(AutoCloseableIterators
.transform(
eventIterator,
(event) -> DebeziumEventUtils.toAirbyteMessage(event, cdcMetadataInjector, catalogContainingStreamsToSnapshot, emittedAt,
debeziumConnectorType, config)),
AutoCloseableIterators
.fromIterator(MoreIterators.singletonIteratorFromSupplier(cdcStateHandler::saveStateAfterCompletionOfSnapshotOfNewStreams)));
}

public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final ConfiguredAirbyteCatalog catalog,
public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final DebeziumPropertiesManager debeziumPropertiesManager,
final DebeziumEventConverter eventConverter,
final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final DebeziumPropertiesManager.DebeziumConnectorType debeziumConnectorType,
final Instant emittedAt,
final boolean addDbNameToState) {
final CdcStateHandler cdcStateHandler) {
LOGGER.info("Using CDC: {}", true);
LOGGER.info("Using DBZ version: {}", DebeziumEngine.class.getPackage().getImplementationVersion());
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(
cdcSavedInfoFetcher.getSavedOffset(),
addDbNameToState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager =
trackSchemaHistory ? schemaHistoryManager(
cdcSavedInfoFetcher.getSavedSchemaHistory(),
cdcStateHandler.compressSchemaHistoryForState())
: Optional.empty();

final var publisher = new DebeziumRecordPublisher(
connectorProperties, config, catalog, offsetManager, schemaHistoryManager, debeziumConnectorType);
final var queue = new LinkedBlockingQueue<ChangeEvent<String, String>>(queueSize.orElse(QUEUE_CAPACITY));
publisher.start(queue);
addDbNameToOffsetState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final var schemaHistoryManager = trackSchemaHistory
? Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(
cdcSavedInfoFetcher.getSavedSchemaHistory(), cdcStateHandler.compressSchemaHistoryForState()))
: Optional.<AirbyteSchemaHistoryStorage>empty();
final var publisher = new DebeziumRecordPublisher(debeziumPropertiesManager);
final var queue = new LinkedBlockingQueue<ChangeEvent<String, String>>(queueSize);
publisher.start(queue, offsetManager, schemaHistoryManager);
// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
Expand All @@ -142,34 +96,22 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
firstRecordWaitTime,
subsequentRecordWaitTime);

final Duration syncCheckpointDuration =
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
final Duration syncCheckpointDuration = config.has(SYNC_CHECKPOINT_DURATION_PROPERTY)
? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.has(SYNC_CHECKPOINT_RECORDS_PROPERTY)
? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: SYNC_CHECKPOINT_RECORDS;
return AutoCloseableIterators.fromIterator(new DebeziumStateDecoratingIterator<>(
eventIterator,
cdcStateHandler,
targetPosition,
cdcMetadataInjector,
emittedAt,
eventConverter,
offsetManager,
trackSchemaHistory,
schemaHistoryManager.orElse(null),
syncCheckpointDuration,
syncCheckpointRecords,
catalog,
debeziumConnectorType,
config));
}

private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(final SchemaHistory<Optional<JsonNode>> schemaHistory,
final boolean compressSchemaHistoryForState) {
if (trackSchemaHistory) {
return Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(schemaHistory, compressSchemaHistoryForState));
}

return Optional.empty();
syncCheckpointRecords));
}

public static boolean isAnyStreamIncrementalSyncMode(final ConfiguredAirbyteCatalog catalog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -49,10 +50,6 @@ public AirbyteFileOffsetBackingStore(final Path offsetFilePath, final Optional<S
this.dbName = dbName;
}

public Path getOffsetFilePath() {
return offsetFilePath;
}

public Map<String, String> read() {
final Map<ByteBuffer, ByteBuffer> raw = load();

Expand Down Expand Up @@ -185,4 +182,12 @@ public static AirbyteFileOffsetBackingStore initializeDummyStateForSnapshotPurpo
return new AirbyteFileOffsetBackingStore(cdcOffsetFilePath, Optional.empty());
}

public void setDebeziumProperties(Properties props) {
// debezium engine configuration
// https://debezium.io/documentation/reference/2.2/development/engine.html#engine-properties
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetFilePath.toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.Properties;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -52,10 +53,6 @@ public AirbyteSchemaHistoryStorage(final Path path, final boolean compressSchema
this.compressSchemaHistoryForState = compressSchemaHistoryForState;
}

public Path getPath() {
return path;
}

public record SchemaHistory<T> (T schema, boolean isCompressed) {}

public SchemaHistory<String> read() {
Expand Down Expand Up @@ -224,4 +221,14 @@ public static AirbyteSchemaHistoryStorage initializeDBHistory(final SchemaHistor
return schemaHistoryManager;
}

public void setDebeziumProperties(Properties props) {
// https://debezium.io/documentation/reference/2.2/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", path.toString());
props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.integrations.debezium.CdcMetadataInjector;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.time.Instant;

public interface DebeziumEventConverter {

String CDC_LSN = "_ab_cdc_lsn";
String CDC_UPDATED_AT = "_ab_cdc_updated_at";
String CDC_DELETED_AT = "_ab_cdc_deleted_at";
String AFTER_EVENT = "after";
String BEFORE_EVENT = "before";
String OPERATION_FIELD = "op";
String SOURCE_EVENT = "source";

static AirbyteMessage buildAirbyteMessage(
final JsonNode source,
final CdcMetadataInjector cdcMetadataInjector,
final Instant emittedAt,
final JsonNode data) {
final String streamNamespace = cdcMetadataInjector.namespace(source);
final String streamName = cdcMetadataInjector.name(source);

final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(streamNamespace)
.withEmittedAt(emittedAt.toEpochMilli())
.withData(data);

return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(airbyteRecordMessage);
}

static JsonNode addCdcMetadata(
final ObjectNode baseNode,
final JsonNode source,
final CdcMetadataInjector cdcMetadataInjector,
final boolean isDelete) {

final long transactionMillis = source.get("ts_ms").asLong();
final String transactionTimestamp = Instant.ofEpochMilli(transactionMillis).toString();

baseNode.put(CDC_UPDATED_AT, transactionTimestamp);
cdcMetadataInjector.addMetaData(baseNode, source);

if (isDelete) {
baseNode.put(CDC_DELETED_AT, transactionTimestamp);
} else {
baseNode.put(CDC_DELETED_AT, (String) null);
}

return baseNode;
}

AirbyteMessage toAirbyteMessage(final ChangeEventWithMetadata event);

}
Loading

0 comments on commit 6f0fd7a

Please sign in to comment.