Skip to content

Commit

Permalink
remove DebeziumConnectorType enum
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Posta committed Jan 27, 2024
1 parent 11142c0 commit 7fcef6a
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumShutdownProcedure;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateDecoratingIterator;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumPropertiesManager;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand All @@ -33,9 +29,7 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,44 +53,37 @@ public class AirbyteDebeziumHandler<T> {
private final boolean trackSchemaHistory;
private final Duration firstRecordWaitTime, subsequentRecordWaitTime;
private final int queueSize;
private final boolean addDbNameToOffsetState;

public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition<T> targetPosition,
final boolean trackSchemaHistory,
final Duration firstRecordWaitTime,
final Duration subsequentRecordWaitTime,
final int queueSize) {
final int queueSize,
final boolean addDbNameToOffsetState) {
this.config = config;
this.targetPosition = targetPosition;
this.trackSchemaHistory = trackSchemaHistory;
this.firstRecordWaitTime = firstRecordWaitTime;
this.subsequentRecordWaitTime = subsequentRecordWaitTime;
this.queueSize = queueSize;
this.addDbNameToOffsetState = addDbNameToOffsetState;
}

public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final ConfiguredAirbyteCatalog catalog,
public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final DebeziumPropertiesManager debeziumPropertiesManager,
final DebeziumEventConverter eventConverter,
final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler,
final CdcMetadataInjector cdcMetadataInjector,
final Properties connectorProperties,
final DebeziumPropertiesManager.DebeziumConnectorType debeziumConnectorType,
final Instant emittedAt,
final boolean addDbNameToState) {
final CdcStateHandler cdcStateHandler) {
LOGGER.info("Using CDC: {}", true);
LOGGER.info("Using DBZ version: {}", DebeziumEngine.class.getPackage().getImplementationVersion());
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(
cdcSavedInfoFetcher.getSavedOffset(),
addDbNameToState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
addDbNameToOffsetState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final var schemaHistoryManager = trackSchemaHistory
? Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(
cdcSavedInfoFetcher.getSavedSchemaHistory(), cdcStateHandler.compressSchemaHistoryForState()))
: Optional.<AirbyteSchemaHistoryStorage>empty();

final var debeziumPropertiesManager = switch (debeziumConnectorType) {
case MONGODB -> new MongoDbDebeziumPropertiesManager(connectorProperties, config, catalog);
default -> new RelationalDbDebeziumPropertiesManager(connectorProperties, config, catalog);
};

final var publisher = new DebeziumRecordPublisher(debeziumPropertiesManager);
final var queue = new LinkedBlockingQueue<ChangeEvent<String, String>>(queueSize);
publisher.start(queue, offsetManager, schemaHistoryManager);
Expand All @@ -109,15 +96,12 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
firstRecordWaitTime,
subsequentRecordWaitTime);

final Duration syncCheckpointDuration =
config.get(SYNC_CHECKPOINT_DURATION_PROPERTY) != null ? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
final Duration syncCheckpointDuration = config.has(SYNC_CHECKPOINT_DURATION_PROPERTY)
? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.has(SYNC_CHECKPOINT_RECORDS_PROPERTY)
? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: SYNC_CHECKPOINT_RECORDS;
final DebeziumEventConverter eventConverter = switch (debeziumConnectorType) {
case MONGODB -> new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config);
default -> new RelationalDbDebeziumEventConverter(cdcMetadataInjector, emittedAt);
};
return AutoCloseableIterators.fromIterator(new DebeziumStateDecoratingIterator<>(
eventIterator,
cdcStateHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,4 @@ public Properties getDebeziumProperties(

protected abstract Properties getIncludeConfiguration(final ConfiguredAirbyteCatalog catalog, final JsonNode config);

public enum DebeziumConnectorType {
RELATIONALDB,
MONGODB;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbCdcTargetPosition;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumStateUtil;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbResumeTokenHelper;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -136,18 +137,14 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
emittedAt, config.getCheckpointInterval(), isEnforceSchema);

final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.rawConfig(),
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize);
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false);
final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager);
final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed);
final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.rawConfig(), catalog);
final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.rawConfig());

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog,
cdcSavedInfoFetcher,
mongoDbCdcStateHandler,
cdcMetadataInjector,
defaultDebeziumProperties,
DebeziumPropertiesManager.DebeziumConnectorType.MONGODB,
emittedAt,
false);
final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
propertiesManager, eventConverter, cdcSavedInfoFetcher, mongoDbCdcStateHandler);

// We can close the client after the initial snapshot is complete, incremental
// iterator does not make use of the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -37,7 +36,6 @@
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.cdk.integrations.debezium.internals.ChangeEventWithMetadata;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordIterator;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumShutdownProcedure;
Expand Down Expand Up @@ -84,6 +82,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -485,22 +484,22 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
true,
firstRecordWaitTime,
subsequentRecordWaitTime,
AirbyteDebeziumHandler.QUEUE_CAPACITY);
AirbyteDebeziumHandler.QUEUE_CAPACITY,
true);
final MssqlCdcConnectorMetadataInjector mssqlCdcConnectorMetadataInjector = MssqlCdcConnectorMetadataInjector.getInstance(emittedAt);

// Determine if new stream(s) have been added to the catalog after initial sync of existing streams
final List<ConfiguredAirbyteStream> streamsToSnapshot = identifyStreamsToSnapshot(catalog, stateManager);
final ConfiguredAirbyteCatalog streamsToSnapshotCatalog = new ConfiguredAirbyteCatalog().withStreams(streamsToSnapshot);
final var propertiesManager = new RelationalDbDebeziumPropertiesManager(
MssqlCdcHelper.getDebeziumProperties(database, catalog, false), sourceConfig, catalog);
final var eventConverter = new RelationalDbDebeziumEventConverter(mssqlCdcConnectorMetadataInjector, emittedAt);

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorsSupplier = () -> handler.getIncrementalIterators(
catalog,
propertiesManager,
eventConverter,
new MssqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new MssqlCdcStateHandler(stateManager),
mssqlCdcConnectorMetadataInjector,
MssqlCdcHelper.getDebeziumProperties(database, catalog, false),
DebeziumPropertiesManager.DebeziumConnectorType.RELATIONALDB,
emittedAt,
true);
new MssqlCdcStateHandler(stateManager));

/*
* If the CDC state is null or there is no streams to snapshot, that means no stream has gone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.mysql.MySqlCdcPosition;
import io.airbyte.cdk.integrations.debezium.internals.mysql.MySqlCdcTargetPosition;
import io.airbyte.cdk.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil;
Expand Down Expand Up @@ -155,16 +156,14 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi
true,
firstRecordWaitTime,
subsequentRecordWaitTime,
AirbyteDebeziumHandler.QUEUE_CAPACITY);

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(catalog,
new MySqlCdcSavedInfoFetcher(stateToBeUsed),
new MySqlCdcStateHandler(stateManager),
metadataInjector,
MySqlCdcProperties.getDebeziumProperties(database),
DebeziumPropertiesManager.DebeziumConnectorType.RELATIONALDB,
emittedAt,
AirbyteDebeziumHandler.QUEUE_CAPACITY,
false);
final var propertiesManager = new RelationalDbDebeziumPropertiesManager(
MySqlCdcProperties.getDebeziumProperties(database), sourceConfig, catalog);
final var eventConverter = new RelationalDbDebeziumEventConverter(metadataInjector, emittedAt);

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
propertiesManager, eventConverter, new MySqlCdcSavedInfoFetcher(stateToBeUsed), new MySqlCdcStateHandler(stateManager));

// This starts processing the binglogs as soon as initial sync is complete, this is a bit different
// from the current cdc syncs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.postgres.PostgresCdcTargetPosition;
import io.airbyte.cdk.integrations.debezium.internals.postgres.PostgresDebeziumStateUtil;
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
Expand Down Expand Up @@ -176,18 +177,14 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
// receive that is after the target LSN.
PostgresUtils.advanceLsn(database);
final AirbyteDebeziumHandler<Long> handler = new AirbyteDebeziumHandler<>(sourceConfig,
targetPosition, false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize);
targetPosition, false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false);
final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager);
final var propertiesManager = new RelationalDbDebeziumPropertiesManager(
PostgresCdcProperties.getDebeziumDefaultProperties(database), sourceConfig, catalog);
final var eventConverter = new RelationalDbDebeziumEventConverter(new PostgresCdcConnectorMetadataInjector(), emittedAt);

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
catalog,
new PostgresCdcSavedInfoFetcher(stateToBeUsed),
postgresCdcStateHandler,
new PostgresCdcConnectorMetadataInjector(),
PostgresCdcProperties.getDebeziumDefaultProperties(database),
DebeziumPropertiesManager.DebeziumConnectorType.RELATIONALDB,
emittedAt,
false);
propertiesManager, eventConverter, new PostgresCdcSavedInfoFetcher(stateToBeUsed), postgresCdcStateHandler);

if (initialSyncCtidIterators.isEmpty()) {
return Collections.singletonList(incrementalIteratorSupplier.get());
Expand Down

0 comments on commit 7fcef6a

Please sign in to comment.