Skip to content

Commit

Permalink
refactor DebeziumPropertiesManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Posta committed Jan 27, 2024
1 parent 2d47102 commit 61e0745
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage.SchemaHistory;
import io.airbyte.cdk.integrations.debezium.internals.ChangeEventWithMetadata;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordIterator;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumShutdownProcedure;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateDecoratingIterator;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumPropertiesManager;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand Down Expand Up @@ -83,16 +84,19 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(
cdcSavedInfoFetcher.getSavedOffset(),
addDbNameToState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager =
trackSchemaHistory ? schemaHistoryManager(
cdcSavedInfoFetcher.getSavedSchemaHistory(),
cdcStateHandler.compressSchemaHistoryForState())
: Optional.empty();
final var schemaHistoryManager = trackSchemaHistory
? Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(
cdcSavedInfoFetcher.getSavedSchemaHistory(), cdcStateHandler.compressSchemaHistoryForState()))
: Optional.<AirbyteSchemaHistoryStorage>empty();

final var publisher = new DebeziumRecordPublisher(
connectorProperties, config, catalog, offsetManager, schemaHistoryManager, debeziumConnectorType);
final var debeziumPropertiesManager = switch (debeziumConnectorType) {
case MONGODB -> new MongoDbDebeziumPropertiesManager(connectorProperties, config, catalog);
default -> new RelationalDbDebeziumPropertiesManager(connectorProperties, config, catalog);
};

final var publisher = new DebeziumRecordPublisher(debeziumPropertiesManager);
final var queue = new LinkedBlockingQueue<ChangeEvent<String, String>>(queueSize);
publisher.start(queue);
publisher.start(queue, offsetManager, schemaHistoryManager);
// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
Expand Down Expand Up @@ -123,15 +127,6 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
config));
}

private Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager(final SchemaHistory<Optional<JsonNode>> schemaHistory,
final boolean compressSchemaHistoryForState) {
if (trackSchemaHistory) {
return Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(schemaHistory, compressSchemaHistoryForState));
}

return Optional.empty();
}

public static boolean isAnyStreamIncrementalSyncMode(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode)
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -49,10 +50,6 @@ public AirbyteFileOffsetBackingStore(final Path offsetFilePath, final Optional<S
this.dbName = dbName;
}

public Path getOffsetFilePath() {
return offsetFilePath;
}

public Map<String, String> read() {
final Map<ByteBuffer, ByteBuffer> raw = load();

Expand Down Expand Up @@ -185,4 +182,12 @@ public static AirbyteFileOffsetBackingStore initializeDummyStateForSnapshotPurpo
return new AirbyteFileOffsetBackingStore(cdcOffsetFilePath, Optional.empty());
}

public void setDebeziumProperties(Properties props) {
// debezium engine configuration
// https://debezium.io/documentation/reference/2.2/development/engine.html#engine-properties
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetFilePath.toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.Properties;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -52,10 +53,6 @@ public AirbyteSchemaHistoryStorage(final Path path, final boolean compressSchema
this.compressSchemaHistoryForState = compressSchemaHistoryForState;
}

public Path getPath() {
return path;
}

public record SchemaHistory<T> (T schema, boolean isCompressed) {}

public SchemaHistory<String> read() {
Expand Down Expand Up @@ -224,4 +221,14 @@ public static AirbyteSchemaHistoryStorage initializeDBHistory(final SchemaHistor
return schemaHistoryManager;
}

public void setDebeziumProperties(Properties props) {
// https://debezium.io/documentation/reference/2.2/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", path.toString());
props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,29 @@ public abstract class DebeziumPropertiesManager {
public static final String TOPIC_PREFIX_KEY = "topic.prefix";

private final JsonNode config;
private final AirbyteFileOffsetBackingStore offsetManager;
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;

private final Properties properties;
private final ConfiguredAirbyteCatalog catalog;

public DebeziumPropertiesManager(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
final ConfiguredAirbyteCatalog catalog) {
this.properties = properties;
this.config = config;
this.catalog = catalog;
this.offsetManager = offsetManager;
this.schemaHistoryManager = schemaHistoryManager;
}

public Properties getDebeziumProperties() {
public Properties getDebeziumProperties(final AirbyteFileOffsetBackingStore offsetManager) {
return getDebeziumProperties(offsetManager, Optional.empty());
}

public Properties getDebeziumProperties(
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
final Properties props = new Properties();
props.putAll(properties);

// debezium engine configuration
// https://debezium.io/documentation/reference/2.2/development/engine.html#engine-properties
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
offsetManager.setDebeziumProperties(props);
// default values from debezium CommonConnectorConfig
props.setProperty("max.batch.size", "2048");
props.setProperty("max.queue.size", "8192");
Expand All @@ -57,15 +53,7 @@ public Properties getDebeziumProperties() {
props.setProperty("errors.retry.delay.initial.ms", "299");
props.setProperty("errors.retry.delay.max.ms", "300");

if (schemaHistoryManager.isPresent()) {
// https://debezium.io/documentation/reference/2.2/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", schemaHistoryManager.get().getPath().toString());
props.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true");
}
schemaHistoryManager.ifPresent(m -> m.setDebeziumProperties(props));

// https://debezium.io/documentation/reference/2.2/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@

package io.airbyte.cdk.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumPropertiesManager;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -38,15 +34,6 @@ public class DebeziumRecordPublisher implements AutoCloseable {
private final CountDownLatch engineLatch;
private final DebeziumPropertiesManager debeziumPropertiesManager;

public DebeziumRecordPublisher(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager,
final DebeziumPropertiesManager.DebeziumConnectorType debeziumConnectorType) {
this(createDebeziumPropertiesManager(debeziumConnectorType, properties, config, catalog, offsetManager, schemaHistoryManager));
}

public DebeziumRecordPublisher(DebeziumPropertiesManager debeziumPropertiesManager) {
this.debeziumPropertiesManager = debeziumPropertiesManager;
this.hasClosed = new AtomicBoolean(false);
Expand All @@ -56,21 +43,11 @@ public DebeziumRecordPublisher(DebeziumPropertiesManager debeziumPropertiesManag
this.engineLatch = new CountDownLatch(1);
}

static private DebeziumPropertiesManager createDebeziumPropertiesManager(final DebeziumPropertiesManager.DebeziumConnectorType connectorType,
final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
return switch (connectorType) {
case MONGODB -> new MongoDbDebeziumPropertiesManager(properties, config, catalog, offsetManager);
default -> new RelationalDbDebeziumPropertiesManager(properties, config, catalog, offsetManager, schemaHistoryManager);
};
}

public void start(final BlockingQueue<ChangeEvent<String, String>> queue) {
public void start(final BlockingQueue<ChangeEvent<String, String>> queue,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
engine = DebeziumEngine.create(Json.class)
.using(debeziumPropertiesManager.getDebeziumProperties())
.using(debeziumPropertiesManager.getDebeziumProperties(offsetManager, schemaHistoryManager))
.using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy())
.notifying(e -> {
// debezium outputs a tombstone event that has a value of null. this is an artifact of how it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -21,10 +20,8 @@ public class RelationalDbDebeziumPropertiesManager extends DebeziumPropertiesMan

public RelationalDbDebeziumPropertiesManager(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
super(properties, config, catalog, offsetManager, schemaHistoryManager);
final ConfiguredAirbyteCatalog catalog) {
super(properties, config, catalog);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
import static io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumConstants.Configuration.USERNAME_CONFIGURATION_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -45,9 +43,8 @@ public class MongoDbDebeziumPropertiesManager extends DebeziumPropertiesManager

public MongoDbDebeziumPropertiesManager(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager) {
super(properties, config, catalog, offsetManager, Optional.empty());
final ConfiguredAirbyteCatalog catalog) {
super(properties, config, catalog);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ public Optional<BsonDocument> savedOffset(final Properties baseProperties,
final JsonNode config,
final MongoClient mongoClient) {
LOGGER.debug("Initializing file offset backing store with state '{}'...", cdcState);
final DebeziumPropertiesManager debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(baseProperties,
config, catalog,
AirbyteFileOffsetBackingStore.initializeState(cdcState, Optional.empty()));
final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties();
final var offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcState, Optional.empty());
final DebeziumPropertiesManager debeziumPropertiesManager = new MongoDbDebeziumPropertiesManager(baseProperties, config, catalog);
final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
return parseSavedOffset(debeziumProperties, mongoClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,9 @@ public Optional<MysqlDebeziumStateAttributes> savedOffset(final Properties baseP
return Optional.empty();
}

final DebeziumPropertiesManager debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(baseProperties, config, catalog,
AirbyteFileOffsetBackingStore.initializeState(cdcOffset, Optional.empty()),
Optional.empty());
final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties();
final var offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcOffset, Optional.empty());
final DebeziumPropertiesManager debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(baseProperties, config, catalog);
final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
return parseSavedOffset(debeziumProperties);
}

Expand Down Expand Up @@ -244,13 +243,10 @@ public JsonNode constructInitialDebeziumState(final Properties properties,
final AirbyteSchemaHistoryStorage schemaHistoryStorage =
AirbyteSchemaHistoryStorage.initializeDBHistory(new SchemaHistory<>(Optional.empty(), false), COMPRESSION_ENABLED);
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>();
try (final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(properties,
database.getSourceConfig(),
catalog,
offsetManager,
Optional.of(schemaHistoryStorage),
DebeziumPropertiesManager.DebeziumConnectorType.RELATIONALDB)) {
publisher.start(queue);
final var debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(properties, database.getSourceConfig(), catalog);

try (final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(debeziumPropertiesManager)) {
publisher.start(queue, offsetManager, Optional.of(schemaHistoryStorage));
final Instant engineStartTime = Instant.now();
while (!publisher.hasClosed()) {
final ChangeEvent<String, String> event = queue.poll(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ public OptionalLong savedOffset(final Properties baseProperties,
final ConfiguredAirbyteCatalog catalog,
final JsonNode cdcState,
final JsonNode config) {
final DebeziumPropertiesManager debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(baseProperties, config, catalog,
AirbyteFileOffsetBackingStore.initializeState(cdcState, Optional.empty()),
Optional.empty());
final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties();
final var offsetManager = AirbyteFileOffsetBackingStore.initializeState(cdcState, Optional.empty());
final DebeziumPropertiesManager debeziumPropertiesManager = new RelationalDbDebeziumPropertiesManager(baseProperties, config, catalog);
final Properties debeziumProperties = debeziumPropertiesManager.getDebeziumProperties(offsetManager);
return parseSavedOffset(debeziumProperties);
}

Expand Down
Loading

0 comments on commit 61e0745

Please sign in to comment.