Skip to content

Commit

Permalink
refactor DebeziumEventUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Posta committed Jan 27, 2024
1 parent 61e0745 commit 11142c0
Show file tree
Hide file tree
Showing 22 changed files with 325 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
import io.airbyte.cdk.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.cdk.integrations.debezium.internals.ChangeEventWithMetadata;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordIterator;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumShutdownProcedure;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateDecoratingIterator;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumEventConverter;
import io.airbyte.cdk.integrations.debezium.internals.mongodb.MongoDbDebeziumPropertiesManager;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -111,20 +114,20 @@ public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final Confi
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: SYNC_CHECKPOINT_RECORDS;
final DebeziumEventConverter eventConverter = switch (debeziumConnectorType) {
case MONGODB -> new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config);
default -> new RelationalDbDebeziumEventConverter(cdcMetadataInjector, emittedAt);
};
return AutoCloseableIterators.fromIterator(new DebeziumStateDecoratingIterator<>(
eventIterator,
cdcStateHandler,
targetPosition,
cdcMetadataInjector,
emittedAt,
eventConverter,
offsetManager,
trackSchemaHistory,
schemaHistoryManager.orElse(null),
syncCheckpointDuration,
syncCheckpointRecords,
catalog,
debeziumConnectorType,
config));
syncCheckpointRecords));
}

public static boolean isAnyStreamIncrementalSyncMode(final ConfiguredAirbyteCatalog catalog) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.integrations.debezium.CdcMetadataInjector;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.time.Instant;

public interface DebeziumEventConverter {

String CDC_LSN = "_ab_cdc_lsn";
String CDC_UPDATED_AT = "_ab_cdc_updated_at";
String CDC_DELETED_AT = "_ab_cdc_deleted_at";
String AFTER_EVENT = "after";
String BEFORE_EVENT = "before";
String OPERATION_FIELD = "op";
String SOURCE_EVENT = "source";

static AirbyteMessage buildAirbyteMessage(
final JsonNode source,
final CdcMetadataInjector cdcMetadataInjector,
final Instant emittedAt,
final JsonNode data) {
final String streamNamespace = cdcMetadataInjector.namespace(source);
final String streamName = cdcMetadataInjector.name(source);

final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(streamNamespace)
.withEmittedAt(emittedAt.toEpochMilli())
.withData(data);

return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(airbyteRecordMessage);
}

static JsonNode addCdcMetadata(
final ObjectNode baseNode,
final JsonNode source,
final CdcMetadataInjector cdcMetadataInjector,
final boolean isDelete) {

final long transactionMillis = source.get("ts_ms").asLong();
final String transactionTimestamp = Instant.ofEpochMilli(transactionMillis).toString();

baseNode.put(CDC_UPDATED_AT, transactionTimestamp);
cdcMetadataInjector.addMetaData(baseNode, source);

if (isDelete) {
baseNode.put(CDC_DELETED_AT, transactionTimestamp);
} else {
baseNode.put(CDC_DELETED_AT, (String) null);
}

return baseNode;
}

AirbyteMessage toAirbyteMessage(final ChangeEventWithMetadata event);

}

This file was deleted.

Loading

0 comments on commit 11142c0

Please sign in to comment.