Skip to content

Commit

Permalink
Destinations CDK: Plumbing related to airbyte_meta from protocol to r…
Browse files Browse the repository at this point in the history
…aw table (#35944)

Co-authored-by: Edward Gao <[email protected]>
  • Loading branch information
gisripa and edgao authored Mar 13, 2024
1 parent 58b6b80 commit 48faee8
Show file tree
Hide file tree
Showing 30 changed files with 291 additions and 98 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.24.0 | 2024-03-13 | [\#35944](https://github.com/airbytehq/airbyte/pull/35944) | Add `_airbyte_meta` in raw table and test fixture updates |
| 0.23.20 | 2024-03-12 | [\#36011](https://github.com/airbytehq/airbyte/pull/36011) | Debezium configuration for conversion of null value on a column with default value. |
| 0.23.19 | 2024-03-11 | [\#35904](https://github.com/airbytehq/airbyte/pull/35904) | Add retries to the debezium engine. |
| 0.23.18 | 2024-03-07 | [\#35899](https://github.com/airbytehq/airbyte/pull/35899) | Null check when retrieving destination state |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.integrations.base;

import java.util.List;
import java.util.Set;

public final class JavaBaseConstants {

Expand All @@ -31,11 +32,20 @@ private JavaBaseConstants() {}
public static final String COLUMN_NAME_AB_LOADED_AT = "_airbyte_loaded_at";
public static final String COLUMN_NAME_AB_EXTRACTED_AT = "_airbyte_extracted_at";
public static final String COLUMN_NAME_AB_META = "_airbyte_meta";
public static final List<String> V2_RAW_TABLE_COLUMN_NAMES = List.of(

// Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2
// use this column list.
public static final Set<String> V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META = Set.of(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA);
public static final List<String> V2_RAW_TABLE_COLUMN_NAMES = List.of(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META);
public static final List<String> V2_FINAL_TABLE_METADATA_COLUMNS = List.of(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.cdk.integrations.destination.record_buffer;

import com.google.common.io.CountingOutputStream;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -66,12 +65,11 @@ protected BaseSerializedBuffer(final BufferStorage bufferStorage) throws Excepti
* AirbyteRecord
*
* @param recordString serialized record
* @param airbyteMetaString
* @param emittedAt timestamp of the record in milliseconds
* @throws IOException
*/
protected void writeRecord(final String recordString, final long emittedAt) throws IOException {
writeRecord(Jsons.deserialize(recordString, AirbyteRecordMessage.class).withEmittedAt(emittedAt));
}
protected abstract void writeRecord(final String recordString, String airbyteMetaString, final long emittedAt) throws IOException;

/**
* Stops the writer from receiving new data and prepares it for being finalized and converted into
Expand Down Expand Up @@ -111,7 +109,7 @@ public long accept(final AirbyteRecordMessage record) throws Exception {
}

@Override
public long accept(final String recordString, final long emittedAt) throws Exception {
public long accept(final String recordString, final String airbyteMetaString, final long emittedAt) throws Exception {
if (!isStarted) {
if (useCompression) {
compressedBuffer = new GzipCompressorOutputStream(byteCounter);
Expand All @@ -123,7 +121,7 @@ public long accept(final String recordString, final long emittedAt) throws Excep
}
if (inputStream == null && !isClosed) {
final long startCount = byteCounter.getCount();
writeRecord(recordString, emittedAt);
writeRecord(recordString, airbyteMetaString, emittedAt);
return byteCounter.getCount() - startCount;
} else {
throw new IllegalCallerException("Buffer is already closed, it cannot accept more messages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ public interface SerializableBuffer extends AutoCloseable {
* the entire AirbyteRecordMessage
*
* @param recordString serialized record
* @param airbyteMetaString The serialized airbyte_meta entry
* @param emittedAt timestamp of the record in milliseconds
* @return number of bytes written to the buffer
* @throws Exception
*/
long accept(String recordString, long emittedAt) throws Exception;
long accept(String recordString, String airbyteMetaString, long emittedAt) throws Exception;

/**
* Flush a buffer implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.integrations.destination_async;

import static io.airbyte.cdk.integrations.destination_async.deser.DeserializationUtil.*;
import static java.util.stream.Collectors.toMap;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -14,6 +15,8 @@
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
import io.airbyte.cdk.integrations.destination_async.buffers.BufferEnqueue;
import io.airbyte.cdk.integrations.destination_async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination_async.deser.IdentityDataTransformer;
import io.airbyte.cdk.integrations.destination_async.deser.StreamAwareDataTransformer;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination_async.state.FlushFailure;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -56,6 +59,8 @@ public class AsyncStreamConsumer implements SerializedAirbyteMessageConsumer {
private final Set<StreamDescriptor> streamNames;
private final FlushFailure flushFailure;
private final String defaultNamespace;

private final StreamAwareDataTransformer dataTransformer;
// Note that this map will only be populated for streams with nonzero records.
private final ConcurrentMap<StreamDescriptor, AtomicLong> recordCounts;

Expand All @@ -69,6 +74,9 @@ public class AsyncStreamConsumer implements SerializedAirbyteMessageConsumer {
// a 64 bit JVM.
final int PARTIAL_DESERIALIZE_REF_BYTES = 10 * 8;

// TODO: What the.. combinatorics of the constructors are getting out of hand. We should consider
// refactoring this to use a builder pattern with enforced defaults.

public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
Expand All @@ -79,6 +87,18 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace);
}

public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final BufferManager bufferManager,
final String defaultNamespace,
final StreamAwareDataTransformer dataTransformer) {
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace,
Executors.newFixedThreadPool(5), dataTransformer);
}

public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
Expand All @@ -87,7 +107,21 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final BufferManager bufferManager,
final String defaultNamespace,
final ExecutorService workerPool) {
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace, workerPool);
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, new FlushFailure(), defaultNamespace, workerPool,
new IdentityDataTransformer());
}

@VisibleForTesting
public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final BufferManager bufferManager,
final FlushFailure flushFailure,
final String defaultNamespace) {
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, flushFailure, defaultNamespace, Executors.newFixedThreadPool(5),
new IdentityDataTransformer());
}

@VisibleForTesting
Expand All @@ -99,7 +133,8 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final BufferManager bufferManager,
final FlushFailure flushFailure,
final String defaultNamespace,
final ExecutorService workerPool) {
final ExecutorService workerPool,
final StreamAwareDataTransformer dataTransformer) {
this.defaultNamespace = defaultNamespace;
hasStarted = false;
hasClosed = false;
Expand All @@ -114,18 +149,7 @@ public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
new FlushWorkers(bufferManager.getBufferDequeue(), flusher, outputRecordCollector, flushFailure, bufferManager.getStateManager(), workerPool);
streamNames = StreamDescriptorUtils.fromConfiguredCatalog(catalog);
this.recordCounts = new ConcurrentHashMap<>();
}

@VisibleForTesting
public AsyncStreamConsumer(final Consumer<AirbyteMessage> outputRecordCollector,
final OnStartFunction onStart,
final OnCloseFunction onClose,
final DestinationFlushFunction flusher,
final ConfiguredAirbyteCatalog catalog,
final BufferManager bufferManager,
final FlushFailure flushFailure,
final String defaultNamespace) {
this(outputRecordCollector, onStart, onClose, flusher, catalog, bufferManager, flushFailure, defaultNamespace, Executors.newFixedThreadPool(5));
this.dataTransformer = dataTransformer;
}

@Override
Expand All @@ -148,7 +172,7 @@ public void accept(final String messageString, final Integer sizeInBytes) throws
* to try to use a thread pool to partially deserialize to get record type and stream name, we can
* do it without touching buffer manager.
*/
final var message = deserializeAirbyteMessage(messageString);
final var message = deserializeAirbyteMessage(messageString, this.dataTransformer);
if (Type.RECORD.equals(message.getType())) {
if (Strings.isNullOrEmpty(message.getRecord().getNamespace())) {
message.getRecord().setNamespace(defaultNamespace);
Expand All @@ -160,41 +184,6 @@ public void accept(final String messageString, final Integer sizeInBytes) throws
bufferEnqueue.addRecord(message, sizeInBytes + PARTIAL_DESERIALIZE_REF_BYTES, defaultNamespace);
}

/**
* Deserializes to a {@link PartialAirbyteMessage} which can represent both a Record or a State
* Message
*
* PartialAirbyteMessage holds either:
* <li>entire serialized message string when message is a valid State Message
* <li>serialized AirbyteRecordMessage when message is a valid Record Message</li>
*
* @param messageString the string to deserialize
* @return PartialAirbyteMessage if the message is valid, empty otherwise
*/
@VisibleForTesting
public static PartialAirbyteMessage deserializeAirbyteMessage(final String messageString) {
// TODO: (ryankfu) plumb in the serialized AirbyteStateMessage to match AirbyteRecordMessage code
// parity. https://github.com/airbytehq/airbyte/issues/27530 for additional context
final var partial = Jsons.tryDeserializeExact(messageString, PartialAirbyteMessage.class)
.orElseThrow(() -> new RuntimeException("Unable to deserialize PartialAirbyteMessage."));

final var msgType = partial.getType();
if (Type.RECORD.equals(msgType) && partial.getRecord().getData() != null) {
// store serialized json
partial.withSerialized(partial.getRecord().getData().toString());
// The connector doesn't need to be able to access to the record value. We can serialize it here and
// drop the json
// object. Having this data stored as a string is slightly more optimal for the memory usage.
partial.getRecord().setData(null);
} else if (Type.STATE.equals(msgType)) {
partial.withSerialized(messageString);
} else {
throw new RuntimeException(String.format("Unsupported message type: %s", msgType));
}

return partial;
}

@Override
public void close() throws Exception {
Preconditions.checkState(hasStarted, "Cannot close; has not started.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.cdk.integrations.destination_async.deser

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage

object DeserializationUtil {
/**
* Deserializes to a [PartialAirbyteMessage] which can represent both a Record or a State
* Message
*
* PartialAirbyteMessage holds either:
* * entire serialized message string when message is a valid State Message
* * serialized AirbyteRecordMessage when message is a valid Record Message
*
* @param messageString the string to deserialize
* @return PartialAirbyteMessage if the message is valid, empty otherwise
*/
@JvmStatic
@VisibleForTesting
fun deserializeAirbyteMessage(
messageString: String?,
dataTransformer: StreamAwareDataTransformer
): PartialAirbyteMessage {
// TODO: This is doing some sketchy assumptions by deserializing either the whole or the
// partial based on type.
// Use JsonSubTypes and extend StdDeserializer to properly handle this.
// Make immutability a first class citizen in the PartialAirbyteMessage class.
val partial =
Jsons.tryDeserializeExact(messageString, PartialAirbyteMessage::class.java)
.orElseThrow { RuntimeException("Unable to deserialize PartialAirbyteMessage.") }

val msgType = partial.type
if (AirbyteMessage.Type.RECORD == msgType && partial.record.data != null) {
// Transform data provided by destination.
val transformedData =
dataTransformer.transform(
partial.record.streamDescriptor,
partial.record.data,
partial.record.meta
)
// store serialized json & meta
partial.withSerialized(Jsons.serialize(transformedData.getLeft()))
partial.record.meta = transformedData.getRight()
// The connector doesn't need to be able to access to the record value. We can serialize
// it here and
// drop the json
// object. Having this data stored as a string is slightly more optimal for the memory
// usage.
partial.record.data = null
} else if (AirbyteMessage.Type.STATE == msgType) {
partial.withSerialized(messageString)
} else {
throw RuntimeException(String.format("Unsupported message type: %s", msgType))
}

return partial
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination_async.deser;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import org.apache.commons.lang3.tuple.ImmutablePair;

/**
* Identity transformer which echoes back the original data and meta.
*/
public class IdentityDataTransformer implements StreamAwareDataTransformer {

@Override
public ImmutablePair<JsonNode, AirbyteRecordMessageMeta> transform(StreamDescriptor streamDescriptor,
JsonNode data,
AirbyteRecordMessageMeta meta) {
return ImmutablePair.of(data, meta);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination_async.deser;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import org.apache.commons.lang3.tuple.ImmutablePair;

public interface StreamAwareDataTransformer {

/**
* Transforms the input data by applying destination limitations and populating
* {@link AirbyteRecordMessageMeta}. The returned pair contains the transformed data and the merged
* meta information from upstream.
*
* @param streamDescriptor
* @param data
* @param meta
* @return
*/
ImmutablePair<JsonNode, AirbyteRecordMessageMeta> transform(StreamDescriptor streamDescriptor, JsonNode data, AirbyteRecordMessageMeta meta);

}
Loading

0 comments on commit 48faee8

Please sign in to comment.