From becd728f9cb2de3efb260e5980936c3ab538290c Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Thu, 16 May 2024 13:02:50 -0700 Subject: [PATCH] move redshift's large-record handling logic into the CDK --- .../destination/s3/S3DestinationConfig.kt | 1 + .../SizeBasedDataTransformer.kt | 261 ++++++++++++++++++ .../destination-redshift/build.gradle | 2 +- .../RedshiftSuperLimitationTransformer.java | 228 +-------------- ...edshiftSuperLimitationTransformerTest.java | 36 ++- 5 files changed, 296 insertions(+), 232 deletions(-) create mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SizeBasedDataTransformer.kt diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt index 6e6a29551b03..bffe509272ca 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt @@ -294,6 +294,7 @@ open class S3DestinationConfig { } @JvmStatic + @JvmOverloads fun getS3DestinationConfig( @Nonnull config: JsonNode, environment: Map = System.getenv() diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SizeBasedDataTransformer.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SizeBasedDataTransformer.kt new file mode 100644 index 000000000000..d266e014ce66 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SizeBasedDataTransformer.kt @@ -0,0 +1,261 @@ +package io.airbyte.integrations.base.destination.typing_deduping + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer +import io.airbyte.commons.json.Jsons +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.airbyte.protocol.models.v0.StreamDescriptor +import io.github.oshai.kotlinlogging.KotlinLogging +import org.apache.commons.lang3.tuple.ImmutablePair +import java.nio.charset.StandardCharsets +import java.util.* +import java.util.ArrayDeque +import java.util.stream.Collectors + +private val log = KotlinLogging.logger { } + +open class SizeBasedDataTransformer( + private val parsedCatalog: ParsedCatalog, + private val defaultNamespace: String, + private val maxFieldSize: Int = Int.MAX_VALUE, + private val maxRecordSize: Int = Int.MAX_VALUE +) : + StreamAwareDataTransformer { + @JvmRecord + private data class ScalarNodeModification(val size: Int, val removedSize: Int, val shouldNull: Boolean) + + data class TransformationInfo(val originalBytes: Int, val removedBytes: Int, val node: JsonNode?, val meta: AirbyteRecordMessageMeta) + + /* + * This method walks the Json tree nodes and does the following + * + * 1. Collect the original bytes using UTF-8 charset. This is to avoid double walking the tree if + * the total size > maxRecordSize This is to optimize for best case (see worst case as 4 below) that most of + * the data will be <= maxRecordSize and only few offending varchars > maxRecordSize. + * + * 2. Replace all TextNodes with Null nodes if they are greater than maxRecordSize. + * + * 3. Verify if replacing the varchars with NULLs brought the record size down to < 16MB. This + * includes verifying the original bytes and transformed bytes are below the record size limit. + * + * 4. If 3 is false, this is the worst case scenarios where we try to resurrect PKs and cursors and + * trash the rest of the record. + * + */ + override fun transform( + streamDescriptor: StreamDescriptor?, + data: JsonNode?, + meta: AirbyteRecordMessageMeta? + ): Pair { + val startTime = System.currentTimeMillis() + log.debug{"Traversing the record to NULL fields for redshift size limitations"} + val namespace = + if ((streamDescriptor!!.namespace != null && streamDescriptor.namespace.isNotEmpty())) streamDescriptor.namespace else defaultNamespace + val streamConfig: StreamConfig = + parsedCatalog.getStream(namespace, streamDescriptor.name) + val cursorField: Optional = + streamConfig.cursor.map(ColumnId::originalName) + // convert List to Set for faster lookup + val primaryKeys: Set = + streamConfig.primaryKey.stream().map(ColumnId::originalName).collect( + Collectors.toSet() + ) + val syncMode: DestinationSyncMode = streamConfig.destinationSyncMode + val transformationInfo = clearLargeFields(data) + val originalBytes = transformationInfo.originalBytes + val transformedBytes = transformationInfo.originalBytes - transformationInfo.removedBytes + // We check if the transformedBytes has solved the record limit. + log.debug{"Traversal complete in ${System.currentTimeMillis() - startTime} ms"} + if (originalBytes > maxRecordSize && transformedBytes > maxRecordSize) { + // If we have reached here with a bunch of small varchars constituted to becoming a large record, + // person using Redshift for this data should re-evaluate life choices. + log.warn { + "Record size before transformation $originalBytes, after transformation $transformedBytes bytes exceeds 16MB limit" + } + val minimalNode = constructMinimalJsonWithPks(data, primaryKeys, cursorField) + if (minimalNode.isEmpty && syncMode == DestinationSyncMode.APPEND_DEDUP) { + // Fail the sync if PKs are missing in DEDUPE, no point sending an empty record to destination. + throw RuntimeException("Record exceeds size limit, cannot transform without PrimaryKeys in DEDUPE sync") + } + // Preserve original changes + val changes: MutableList = ArrayList() + changes.add( + AirbyteRecordMessageMetaChange() + .withField("all").withChange(AirbyteRecordMessageMetaChange.Change.NULLED) + .withReason(AirbyteRecordMessageMetaChange.Reason.DESTINATION_RECORD_SIZE_LIMITATION) + ) + if (meta != null && meta.changes != null) { + changes.addAll(meta.changes) + } + return Pair(minimalNode, AirbyteRecordMessageMeta().withChanges(changes)) + } + if (meta != null && meta.changes != null) { + // The underlying list of AirbyteRecordMessageMeta is mutable + transformationInfo.meta.changes.addAll(meta.changes) + } + // We intentionally don't deep copy for transformation to avoid memory bloat. + // The caller already has the reference of original jsonNode but returning again in + // case we choose to deepCopy in future for thread-safety. + return Pair(data, transformationInfo.meta) + } + + private fun shouldTransformScalarNode( + node: JsonNode + ): ScalarNodeModification { + val bytes: Int + if (node.isTextual) { + val originalBytes = getByteSize( + node.asText() + ) + 2 // for quotes + if (getByteSize(node.asText()) > maxFieldSize) { + return ScalarNodeModification( + originalBytes, // size before nulling + originalBytes - 4, // account 4 bytes for null string + true + ) + } + bytes = originalBytes + } else if (node.isNumber) { + // Serialize exactly for numbers to account for Scientific notation converted to full value. + // This is what we send over wire for persistence. + bytes = getByteSize(Jsons.serialize(node)) + } else if (node.isBoolean) { + bytes = getByteSize(node.toString()) + } else if (node.isNull) { + bytes = 4 // for "null" + } else { + bytes = 0 + } + return ScalarNodeModification( + bytes, // For all other types, just return bytes + 0, + false + ) + } + + fun clearLargeFields( + rootNode: JsonNode? + ): TransformationInfo { + // Walk the tree and transform Varchars that exceed the limit + // We are intentionally not checking the whole size upfront to check if it exceeds record size limit to + // optimize for best case. + + var originalBytes = 0 + var removedBytes = 0 + // We accumulate nested keys in jsonPath format for adding to airbyte changes. + val stack: Deque> = ArrayDeque() + val changes: MutableList = ArrayList() + + // This was intentionally done using Iterative DFS to avoid stack overflow for large records. + // This will ensure we are allocating on heap and not on stack. + stack.push(ImmutablePair.of("$", rootNode)) + while (!stack.isEmpty()) { + val jsonPathNodePair = stack.pop() + val currentNode = jsonPathNodePair.right + if (currentNode!!.isObject) { + originalBytes += CURLY_BRACES_BYTE_SIZE + val fields = currentNode.fields() + while (fields.hasNext()) { + val field = fields.next() + originalBytes += getByteSize(field.key) + OBJECT_COLON_QUOTES_COMMA_BYTE_SIZE // for quotes, colon, comma + val jsonPathKey = "${jsonPathNodePair.left}.${field.key}" + // TODO: Little difficult to unify this logic in Object & Array, find a way later + // Push only non-scalar nodes to stack. For scalar nodes, we need reference of parent to do in-place + // update. + if (field.value.isContainerNode) { + stack.push(ImmutablePair.of(jsonPathKey, field.value)) + } else { + val shouldTransform = shouldTransformScalarNode(field.value) + if (shouldTransform.shouldNull) { + removedBytes += shouldTransform.removedSize + // DO NOT do this if this code every modified to a multithreading call stack + field.setValue(Jsons.jsonNode(null)) + changes.add( + AirbyteRecordMessageMetaChange() + .withField(jsonPathKey) + .withChange(AirbyteRecordMessageMetaChange.Change.NULLED) + .withReason(AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION) + ) + } + originalBytes += shouldTransform.size + } + } + originalBytes -= 1 // remove extra comma from last key-value pair + } else if (currentNode.isArray) { + originalBytes += SQUARE_BRACKETS_BYTE_SIZE + val arrayNode = currentNode as ArrayNode? + // We cannot use foreach here as we need to update the array in place. + for (i in 0 until arrayNode!!.size()) { + val childNode = arrayNode[i] + val jsonPathKey = "${jsonPathNodePair.left}[$i]" + if (childNode.isContainerNode) stack.push(ImmutablePair.of(jsonPathKey, childNode)) + else { + val shouldTransform = shouldTransformScalarNode(childNode) + if (shouldTransform.shouldNull) { + removedBytes += shouldTransform.removedSize + // DO NOT do this if this code every modified to a multithreading call stack + arrayNode[i] = Jsons.jsonNode(null) + changes.add( + AirbyteRecordMessageMetaChange() + .withField(jsonPathKey) + .withChange(AirbyteRecordMessageMetaChange.Change.NULLED) + .withReason(AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION) + ) + } + originalBytes += shouldTransform.size + } + } + originalBytes += if (!currentNode.isEmpty) currentNode.size() - 1 else 0 // for commas + } else { // Top level scalar node is a valid json + originalBytes += shouldTransformScalarNode(currentNode).size + } + } + + if (removedBytes != 0) { + log.info { + "Original record size $originalBytes bytes, Modified record size ${originalBytes - removedBytes} bytes" + } + } + return TransformationInfo(originalBytes, removedBytes, rootNode, AirbyteRecordMessageMeta().withChanges(changes)) + } + + private fun constructMinimalJsonWithPks(rootNode: JsonNode?, primaryKeys: Set, cursorField: Optional): JsonNode { + val minimalNode = Jsons.emptyObject() as ObjectNode + // We only iterate for top-level fields in the root object, since we only support PKs and cursor in + // top level keys. + if (rootNode!!.isObject) { + val fields = rootNode.fields() + while (fields.hasNext()) { + val field = fields.next() + if (!field.value.isContainerNode) { + if (primaryKeys.contains(field.key) || cursorField.isPresent && cursorField.get() == field.key) { + // Make a deepcopy into minimalNode of PKs and cursor fields and values, + // without deepcopy, we will re-reference the original Tree's nodes. + // god help us if someone set a PK on non-scalar field, and it reached this point, only do at root + // level + minimalNode.set(field.key, field.value.deepCopy()) + } + } + } + } else { + log.error{"Encountered ${rootNode.nodeType} as top level JSON field, this is not supported"} + // This should have caught way before it reaches here. Just additional safety. + throw RuntimeException("Encountered ${rootNode.nodeType} as top level JSON field, this is not supported") + } + return minimalNode + } + + companion object { + private val CURLY_BRACES_BYTE_SIZE = getByteSize("{}") + private val SQUARE_BRACKETS_BYTE_SIZE = getByteSize("[]") + private val OBJECT_COLON_QUOTES_COMMA_BYTE_SIZE = getByteSize("\"\":,") + + private fun getByteSize(value: String): Int { + return value.toByteArray(StandardCharsets.UTF_8).size + } + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index a6bb21f2f0da..b3c5fb95fb64 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -6,7 +6,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.35.0' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } java { diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.java index bdb123fb41d8..801e477d2655 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformer.java @@ -12,6 +12,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; +import io.airbyte.integrations.base.destination.typing_deduping.SizeBasedDataTransformer; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange; @@ -38,235 +39,12 @@ import org.jetbrains.annotations.NotNull; @Slf4j -public class RedshiftSuperLimitationTransformer implements StreamAwareDataTransformer { - - private record ScalarNodeModification(int size, int removedSize, boolean shouldNull) {} - - public record TransformationInfo(int originalBytes, int removedBytes, JsonNode node, AirbyteRecordMessageMeta meta) {} - +public class RedshiftSuperLimitationTransformer extends SizeBasedDataTransformer { public static final int REDSHIFT_VARCHAR_MAX_BYTE_SIZE = 65535; public static final int REDSHIFT_SUPER_MAX_BYTE_SIZE = 16 * 1024 * 1024; - - static final Predicate DEFAULT_PREDICATE_VARCHAR_GREATER_THAN_64K = text -> getByteSize(text) > REDSHIFT_VARCHAR_MAX_BYTE_SIZE; - static final Predicate DEFAULT_PREDICATE_RECORD_SIZE_GT_THAN_16M = size -> size > REDSHIFT_SUPER_MAX_BYTE_SIZE; - - private static final int CURLY_BRACES_BYTE_SIZE = getByteSize("{}"); - private static final int SQUARE_BRACKETS_BYTE_SIZE = getByteSize("[]"); - private static final int OBJECT_COLON_QUOTES_COMMA_BYTE_SIZE = getByteSize("\"\":,"); - - private final ParsedCatalog parsedCatalog; - - private final String defaultNamespace; - public RedshiftSuperLimitationTransformer(final ParsedCatalog parsedCatalog, final String defaultNamespace) { - this.parsedCatalog = parsedCatalog; - Objects.requireNonNull(defaultNamespace); - this.defaultNamespace = defaultNamespace; - - } - - /* - * This method walks the Json tree nodes and does the following - * - * 1. Collect the original bytes using UTF-8 charset. This is to avoid double walking the tree if - * the total size > 16MB This is to optimize for best case (see worst case as 4 below) that most of - * the data will be < 16MB and only few offending varchars > 64KB. - * - * 2. Replace all TextNodes with Null nodes if they are greater than 64K. - * - * 3. Verify if replacing the varchars with NULLs brought the record size down to < 16MB. This - * includes verifying the original bytes and transformed bytes are below the record size limit. - * - * 4. If 3 is false, this is the worst case scenarios where we try to resurrect PKs and cursors and - * trash the rest of the record. - * - */ - @NotNull - @Override - public Pair transform(final StreamDescriptor streamDescriptor, - final JsonNode jsonNode, - final AirbyteRecordMessageMeta airbyteRecordMessageMeta) { - final long startTime = System.currentTimeMillis(); - log.debug("Traversing the record to NULL fields for redshift size limitations"); - final String namespace = - (streamDescriptor.getNamespace() != null && !streamDescriptor.getNamespace().isEmpty()) ? streamDescriptor.getNamespace() : defaultNamespace; - final StreamConfig streamConfig = parsedCatalog.getStream(namespace, streamDescriptor.getName()); - final Optional cursorField = streamConfig.getCursor().map(ColumnId::getOriginalName); - // convert List to Set for faster lookup - final Set primaryKeys = streamConfig.getPrimaryKey().stream().map(ColumnId::getOriginalName).collect(Collectors.toSet()); - final DestinationSyncMode syncMode = streamConfig.getDestinationSyncMode(); - final TransformationInfo transformationInfo = transformNodes(jsonNode, DEFAULT_PREDICATE_VARCHAR_GREATER_THAN_64K); - final int originalBytes = transformationInfo.originalBytes; - final int transformedBytes = transformationInfo.originalBytes - transformationInfo.removedBytes; - // We check if the transformedBytes has solved the record limit. - log.debug("Traversal complete in {} ms", System.currentTimeMillis() - startTime); - if (DEFAULT_PREDICATE_RECORD_SIZE_GT_THAN_16M.test(originalBytes) - && DEFAULT_PREDICATE_RECORD_SIZE_GT_THAN_16M.test(transformedBytes)) { - // If we have reached here with a bunch of small varchars constituted to becoming a large record, - // person using Redshift for this data should re-evaluate life choices. - log.warn("Record size before transformation {}, after transformation {} bytes exceeds 16MB limit", originalBytes, transformedBytes); - final JsonNode minimalNode = constructMinimalJsonWithPks(jsonNode, primaryKeys, cursorField); - if (minimalNode.isEmpty() && syncMode == DestinationSyncMode.APPEND_DEDUP) { - // Fail the sync if PKs are missing in DEDUPE, no point sending an empty record to destination. - throw new RuntimeException("Record exceeds size limit, cannot transform without PrimaryKeys in DEDUPE sync"); - } - // Preserve original changes - final List changes = new ArrayList<>(); - changes.add(new AirbyteRecordMessageMetaChange() - .withField("all").withChange(Change.NULLED) - .withReason(Reason.DESTINATION_RECORD_SIZE_LIMITATION)); - if (airbyteRecordMessageMeta != null && airbyteRecordMessageMeta.getChanges() != null) { - changes.addAll(airbyteRecordMessageMeta.getChanges()); - } - return new Pair<>(minimalNode, new AirbyteRecordMessageMeta().withChanges(changes)); - } - if (airbyteRecordMessageMeta != null && airbyteRecordMessageMeta.getChanges() != null) { - // The underlying list of AirbyteRecordMessageMeta is mutable - transformationInfo.meta.getChanges().addAll(airbyteRecordMessageMeta.getChanges()); - } - // We intentionally don't deep copy for transformation to avoid memory bloat. - // The caller already has the reference of original jsonNode but returning again in - // case we choose to deepCopy in future for thread-safety. - return new Pair<>(jsonNode, transformationInfo.meta); + super(parsedCatalog, defaultNamespace, REDSHIFT_VARCHAR_MAX_BYTE_SIZE, REDSHIFT_SUPER_MAX_BYTE_SIZE); } - private ScalarNodeModification shouldTransformScalarNode(final JsonNode node, - final Predicate textNodePredicate) { - final int bytes; - if (node.isTextual()) { - final int originalBytes = getByteSize(node.asText()) + 2; // for quotes - if (textNodePredicate.test(node.asText())) { - return new ScalarNodeModification(originalBytes, // size before nulling - originalBytes - 4, // account 4 bytes for null string - true); - } - bytes = originalBytes; - } else if (node.isNumber()) { - // Serialize exactly for numbers to account for Scientific notation converted to full value. - // This is what we send over wire for persistence. - bytes = getByteSize(Jsons.serialize(node)); - } else if (node.isBoolean()) { - bytes = getByteSize(node.toString()); - } else if (node.isNull()) { - bytes = 4; // for "null" - } else { - bytes = 0; - } - return new ScalarNodeModification(bytes, // For all other types, just return bytes - 0, - false); - } - - private static int getByteSize(final String value) { - return value.getBytes(StandardCharsets.UTF_8).length; - } - - @VisibleForTesting - TransformationInfo transformNodes(final JsonNode rootNode, - final Predicate textNodePredicate) { - - // Walk the tree and transform Varchars that exceed the limit - // We are intentionally not checking the whole size upfront to check if it exceeds 16MB limit to - // optimize for best case. - int originalBytes = 0; - int removedBytes = 0; - // We accumulate nested keys in jsonPath format for adding to airbyte changes. - final Deque> stack = new ArrayDeque<>(); - final List changes = new ArrayList<>(); - - // This was intentionally done using Iterative DFS to avoid stack overflow for large records. - // This will ensure we are allocating on heap and not on stack. - stack.push(ImmutablePair.of("$", rootNode)); - while (!stack.isEmpty()) { - final ImmutablePair jsonPathNodePair = stack.pop(); - final JsonNode currentNode = jsonPathNodePair.right; - if (currentNode.isObject()) { - originalBytes += CURLY_BRACES_BYTE_SIZE; - final Iterator> fields = currentNode.fields(); - while (fields.hasNext()) { - final Map.Entry field = fields.next(); - originalBytes += getByteSize(field.getKey()) + OBJECT_COLON_QUOTES_COMMA_BYTE_SIZE; // for quotes, colon, comma - final String jsonPathKey = String.format("%s.%s", jsonPathNodePair.left, field.getKey()); - // TODO: Little difficult to unify this logic in Object & Array, find a way later - // Push only non-scalar nodes to stack. For scalar nodes, we need reference of parent to do in-place - // update. - if (field.getValue().isContainerNode()) { - stack.push(ImmutablePair.of(jsonPathKey, field.getValue())); - } else { - final ScalarNodeModification shouldTransform = shouldTransformScalarNode(field.getValue(), textNodePredicate); - if (shouldTransform.shouldNull()) { - removedBytes += shouldTransform.removedSize; - // DO NOT do this if this code every modified to a multithreading call stack - field.setValue(Jsons.jsonNode(null)); - changes.add(new AirbyteRecordMessageMetaChange() - .withField(jsonPathKey) - .withChange(Change.NULLED) - .withReason(Reason.DESTINATION_FIELD_SIZE_LIMITATION)); - } - originalBytes += shouldTransform.size; - } - } - originalBytes -= 1; // remove extra comma from last key-value pair - } else if (currentNode.isArray()) { - originalBytes += SQUARE_BRACKETS_BYTE_SIZE; - final ArrayNode arrayNode = (ArrayNode) currentNode; - // We cannot use foreach here as we need to update the array in place. - for (int i = 0; i < arrayNode.size(); i++) { - final JsonNode childNode = arrayNode.get(i); - final String jsonPathKey = String.format("%s[%d]", jsonPathNodePair.left, i); - if (childNode.isContainerNode()) - stack.push(ImmutablePair.of(jsonPathKey, childNode)); - else { - final ScalarNodeModification shouldTransform = shouldTransformScalarNode(childNode, textNodePredicate); - if (shouldTransform.shouldNull()) { - removedBytes += shouldTransform.removedSize; - // DO NOT do this if this code every modified to a multithreading call stack - arrayNode.set(i, Jsons.jsonNode(null)); - changes.add(new AirbyteRecordMessageMetaChange() - .withField(jsonPathKey) - .withChange(Change.NULLED) - .withReason(Reason.DESTINATION_FIELD_SIZE_LIMITATION)); - } - originalBytes += shouldTransform.size; - } - } - originalBytes += !currentNode.isEmpty() ? currentNode.size() - 1 : 0; // for commas - } else { // Top level scalar node is a valid json - originalBytes += shouldTransformScalarNode(currentNode, textNodePredicate).size(); - } - } - - if (removedBytes != 0) { - log.info("Original record size {} bytes, Modified record size {} bytes", originalBytes, (originalBytes - removedBytes)); - } - return new TransformationInfo(originalBytes, removedBytes, rootNode, new AirbyteRecordMessageMeta().withChanges(changes)); - } - - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - private JsonNode constructMinimalJsonWithPks(JsonNode rootNode, Set primaryKeys, Optional cursorField) { - final ObjectNode minimalNode = (ObjectNode) Jsons.emptyObject(); - // We only iterate for top-level fields in the root object, since we only support PKs and cursor in - // top level keys. - if (rootNode.isObject()) { - final Iterator> fields = rootNode.fields(); - while (fields.hasNext()) { - final Map.Entry field = fields.next(); - if (!field.getValue().isContainerNode()) { - if (primaryKeys.contains(field.getKey()) || cursorField.isPresent() && cursorField.get().equals(field.getKey())) { - // Make a deepcopy into minimalNode of PKs and cursor fields and values, - // without deepcopy, we will re-reference the original Tree's nodes. - // god help us if someone set a PK on non-scalar field, and it reached this point, only do at root - // level - minimalNode.set(field.getKey(), field.getValue().deepCopy()); - } - } - } - } else { - log.error("Encountered {} as top level JSON field, this is not supported", rootNode.getNodeType()); - // This should have caught way before it reaches here. Just additional safety. - throw new RuntimeException("Encountered " + rootNode.getNodeType() + " as top level JSON field, this is not supported"); - } - return minimalNode; - } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformerTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformerTest.java index 4850739ca88c..dce96b1cbff3 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformerTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSuperLimitationTransformerTest.java @@ -16,10 +16,10 @@ import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; +import io.airbyte.integrations.base.destination.typing_deduping.SizeBasedDataTransformer.TransformationInfo; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer.TransformationInfo; import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta; import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange; import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change; @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.IntStream; import kotlin.Pair; import org.junit.jupiter.api.BeforeEach; @@ -64,9 +65,32 @@ public void setup() { transformer = new RedshiftSuperLimitationTransformer(parsedCatalog, "test_schema"); } + private static final String EMPLOYEE_JSON_STRING = """ + { + "id": %d, + "name": "John Doe", + "age": 35, + "salary": 7.50005e4, + "performance_rating": 4.5, + "department": "Engineering", + "skills": ["Java", "Python", "C++"], + "manager": { + "id": 101, + "name": "Jane Smith" + } + }"""; + private static final String JSON_STRING = """ + { + "employees": [ + %s + ] + } + """; + @Test public void testVarcharNulling() throws IOException { - final String jsonString = MoreResources.readResource("test.json"); + int numEmployees = (RedshiftSuperLimitationTransformer.REDSHIFT_SUPER_MAX_BYTE_SIZE/EMPLOYEE_JSON_STRING.length()) + 1; + final String jsonString = JSON_STRING.formatted(IntStream.range(0, numEmployees).mapToObj(i->EMPLOYEE_JSON_STRING.formatted(i)).collect(Collectors.joining(",\n"))); final JsonNode jsonNode = Jsons.deserializeExact(jsonString); // Calculate the size of the json before transformation, note that the original JsonNode is altered // so @@ -74,12 +98,12 @@ public void testVarcharNulling() throws IOException { final int jacksonDeserializationSize = Jsons.serialize(jsonNode).getBytes(StandardCharsets.UTF_8).length; // Add a short length as predicate. final TransformationInfo transformationInfo = - transformer.transformNodes(jsonNode, text -> text.length() > 10); + transformer.clearLargeFields(jsonNode); // Calculate the size of the json after transformation final int jacksonDeserializeSizeAfterTransform = Jsons.serialize(jsonNode).getBytes(StandardCharsets.UTF_8).length; - assertEquals(jacksonDeserializationSize, transformationInfo.originalBytes()); - assertEquals(jacksonDeserializeSizeAfterTransform, transformationInfo.originalBytes() - transformationInfo.removedBytes()); - System.out.println(transformationInfo.meta()); + assertEquals(jacksonDeserializationSize, transformationInfo.getOriginalBytes()); + assertEquals(jacksonDeserializeSizeAfterTransform, transformationInfo.getOriginalBytes() - transformationInfo.getRemovedBytes()); + System.out.println(transformationInfo.getMeta()); System.out.println(Jsons.serialize(jsonNode)); }