diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/ReservationManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/ReservationManager.kt index 3cc82c78f310..354dfadd123d 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/ReservationManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/ReservationManager.kt @@ -49,6 +49,8 @@ class ReservationManager(val totalCapacityBytes: Long) { val remainingCapacityBytes: Long get() = totalCapacityBytes - usedBytes.get() + val totalBytesReserved: Long + get() = usedBytes.get() /* Attempt to reserve memory. If enough memory is not available, waits until it is, then reserves. */ suspend fun reserve(bytes: Long, reservedFor: T): Reserved { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt index 295258b05525..c8ec16edbc3a 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt @@ -76,7 +76,6 @@ class DefaultSpillToDiskTask( // reserve enough room for the record diskManager.reserve(wrapped.sizeBytes) - // calculate whether we should flush val rangeProcessed = range.withNextAdjacentValue(wrapped.index) val bytesProcessed = sizeBytes + wrapped.sizeBytes diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt index 64da1c4f3130..313bd1602bc5 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageClient.kt @@ -32,4 +32,12 @@ interface ObjectStorageClient> { streamProcessor: StreamProcessor? = null, block: suspend (OutputStream) -> Unit ): T + + /** Experimental sane replacement interface */ + suspend fun startStreamingUpload(key: String, metadata: Map): StreamingUpload +} + +interface StreamingUpload> { + suspend fun uploadPart(part: ByteArray) + suspend fun complete(): T } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt index d23e80ac55df..e2637dc5181f 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt @@ -8,6 +8,7 @@ import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.command.object_storage.AvroFormatConfiguration import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration import io.airbyte.cdk.load.data.ObjectType @@ -19,6 +20,7 @@ import io.airbyte.cdk.load.data.dataWithAirbyteMeta import io.airbyte.cdk.load.data.json.toJson import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory import io.airbyte.cdk.load.data.withAirbyteMeta +import io.airbyte.cdk.load.file.StreamProcessor import io.airbyte.cdk.load.file.avro.toAvroWriter import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader import io.airbyte.cdk.load.file.parquet.ParquetWriter @@ -29,6 +31,7 @@ import io.airbyte.cdk.load.util.write import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton +import java.io.ByteArrayOutputStream import java.io.Closeable import java.io.OutputStream import org.apache.avro.Schema @@ -75,6 +78,7 @@ class JsonFormattingWriter( private val outputStream: OutputStream, private val rootLevelFlattening: Boolean, ) : ObjectStorageFormattingWriter { + override fun accept(record: DestinationRecord) { val data = record.dataWithAirbyteMeta(stream, rootLevelFlattening).toJson().serializeToString() @@ -92,6 +96,7 @@ class CSVFormattingWriter( outputStream: OutputStream, private val rootLevelFlattening: Boolean ) : ObjectStorageFormattingWriter { + private val finalSchema = stream.schema.withAirbyteMeta(rootLevelFlattening) private val printer = finalSchema.toCsvPrinterWithHeader(outputStream) override fun accept(record: DestinationRecord) { @@ -124,11 +129,9 @@ class AvroFormattingWriter( } override fun accept(record: DestinationRecord) { - val dataMapped = - pipeline - .map(record.data, record.meta?.changes) - .withAirbyteMeta(stream, record.emittedAtMs, rootLevelFlattening) - writer.write(dataMapped.toAvroRecord(mappedSchema, avroSchema)) + val dataMapped = pipeline.map(record.data, record.meta?.changes) + val withMeta = dataMapped.withAirbyteMeta(stream, record.emittedAtMs, rootLevelFlattening) + writer.write(withMeta.toAvroRecord(mappedSchema, avroSchema)) } override fun close() { @@ -155,11 +158,60 @@ class ParquetFormattingWriter( } override fun accept(record: DestinationRecord) { - val dataMapped = - pipeline - .map(record.data, record.meta?.changes) - .withAirbyteMeta(stream, record.emittedAtMs, rootLevelFlattening) - writer.write(dataMapped.toAvroRecord(mappedSchema, avroSchema)) + val dataMapped = pipeline.map(record.data, record.meta?.changes) + val withMeta = dataMapped.withAirbyteMeta(stream, record.emittedAtMs, rootLevelFlattening) + writer.write(withMeta.toAvroRecord(mappedSchema, avroSchema)) + } + + override fun close() { + writer.close() + } +} + +@Singleton +@Secondary +class BufferedFormattingWriterFactory( + private val writerFactory: ObjectStorageFormattingWriterFactory, + private val compressionConfigurationProvider: ObjectStorageCompressionConfigurationProvider, +) { + fun create(stream: DestinationStream): BufferedFormattingWriter { + val outputStream = ByteArrayOutputStream() + val processor = + compressionConfigurationProvider.objectStorageCompressionConfiguration.compressor + val wrappingBuffer = processor.wrapper.invoke(outputStream) + val writer = writerFactory.create(stream, wrappingBuffer) + return BufferedFormattingWriter(writer, outputStream, processor, wrappingBuffer) + } +} + +class BufferedFormattingWriter( + private val writer: ObjectStorageFormattingWriter, + private val buffer: ByteArrayOutputStream, + private val streamProcessor: StreamProcessor, + private val wrappingBuffer: T +) : ObjectStorageFormattingWriter { + val bufferSize: Int + get() = buffer.size() + + override fun accept(record: DestinationRecord) { + writer.accept(record) + } + + fun takeBytes(): ByteArray { + wrappingBuffer.flush() + val bytes = buffer.toByteArray() + buffer.reset() + return bytes + } + + fun finish(): ByteArray? { + writer.close() + streamProcessor.partFinisher.invoke(wrappingBuffer) + return if (buffer.size() > 0) { + buffer.toByteArray() + } else { + null + } } override fun close() { diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index d7f130bda395..1933dcc25cda 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -8,10 +8,10 @@ import com.google.common.annotations.VisibleForTesting import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider -import io.airbyte.cdk.load.file.NoopProcessor +import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfigurationProvider import io.airbyte.cdk.load.file.StreamProcessor +import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient -import io.airbyte.cdk.load.file.object_storage.ObjectStorageFormattingWriterFactory import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.message.Batch @@ -31,33 +31,41 @@ import java.util.concurrent.atomic.AtomicLong @Singleton @Secondary -class ObjectStorageStreamLoaderFactory>( +class ObjectStorageStreamLoaderFactory, U : OutputStream>( private val client: ObjectStorageClient, - private val compressionConfig: ObjectStorageCompressionConfigurationProvider<*>? = null, private val pathFactory: ObjectStoragePathFactory, - private val writerFactory: ObjectStorageFormattingWriterFactory, + private val bufferedWriterFactory: BufferedFormattingWriterFactory, + private val compressionConfigurationProvider: + ObjectStorageCompressionConfigurationProvider? = + null, private val destinationStateManager: DestinationStateManager, + private val uploadConfigurationProvider: ObjectStorageUploadConfigurationProvider, ) { fun create(stream: DestinationStream): StreamLoader { return ObjectStorageStreamLoader( stream, client, - compressionConfig?.objectStorageCompressionConfiguration?.compressor ?: NoopProcessor, + compressionConfigurationProvider?.objectStorageCompressionConfiguration?.compressor, pathFactory, - writerFactory, - destinationStateManager + bufferedWriterFactory, + destinationStateManager, + uploadConfigurationProvider.objectStorageUploadConfiguration.streamingUploadPartSize, ) } } -@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") +@SuppressFBWarnings( + value = ["NP_NONNULL_PARAM_VIOLATION", "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"], + justification = "Kotlin async continuation" +) class ObjectStorageStreamLoader, U : OutputStream>( override val stream: DestinationStream, private val client: ObjectStorageClient, - private val compressor: StreamProcessor, + private val compressor: StreamProcessor?, private val pathFactory: ObjectStoragePathFactory, - private val writerFactory: ObjectStorageFormattingWriterFactory, + private val bufferedWriterFactory: BufferedFormattingWriterFactory, private val destinationStateManager: DestinationStateManager, + private val partSize: Long, ) : StreamLoader { private val log = KotlinLogging.logger {} @@ -95,12 +103,18 @@ class ObjectStorageStreamLoader, U : OutputStream>( ) val metadata = ObjectStorageDestinationState.metadataFor(stream) - val obj = - client.streamingUpload(key, metadata, streamProcessor = compressor) { outputStream -> - writerFactory.create(stream, outputStream).use { writer -> - records.forEach { writer.accept(it) } + val upload = client.startStreamingUpload(key, metadata) + bufferedWriterFactory.create(stream).use { writer -> + records.forEach { + writer.accept(it) + if (writer.bufferSize >= partSize) { + upload.uploadPart(writer.takeBytes()) } } + writer.finish()?.let { upload.uploadPart(it) } + } + val obj = upload.complete() + log.info { "Finished writing records to $key, persisting state" } destinationStateManager.persistState(stream) return RemoteObject(remoteObject = obj, partNumber = partNumber) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt index 09af64898baa..ab135d35c358 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderTest.kt @@ -6,8 +6,8 @@ package io.airbyte.cdk.load.write.object_storage import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.file.StreamProcessor +import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient -import io.airbyte.cdk.load.file.object_storage.ObjectStorageFormattingWriterFactory import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.message.DestinationFile @@ -31,9 +31,11 @@ class ObjectStorageStreamLoaderTest { private val client: ObjectStorageClient> = mockk(relaxed = true) private val compressor: StreamProcessor = mockk(relaxed = true) private val pathFactory: ObjectStoragePathFactory = mockk(relaxed = true) - private val writerFactory: ObjectStorageFormattingWriterFactory = mockk(relaxed = true) + private val writerFactory: BufferedFormattingWriterFactory = + mockk(relaxed = true) private val destinationStateManager: DestinationStateManager = mockk(relaxed = true) + private val partSize: Long = 1 private val objectStorageStreamLoader = spyk( @@ -43,7 +45,8 @@ class ObjectStorageStreamLoaderTest { compressor, pathFactory, writerFactory, - destinationStateManager + destinationStateManager, + partSize ) ) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockObjectStorageClient.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockObjectStorageClient.kt index e9dbd12d47aa..c261efcae958 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockObjectStorageClient.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockObjectStorageClient.kt @@ -8,6 +8,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.file.StreamProcessor import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.file.object_storage.StreamingUpload import io.micronaut.context.annotation.Requires import jakarta.inject.Singleton import java.io.ByteArrayOutputStream @@ -81,4 +82,11 @@ class MockObjectStorageClient : ObjectStorageClient { override suspend fun delete(remoteObject: MockRemoteObject) { objects.remove(remoteObject.key) } + + override suspend fun startStreamingUpload( + key: String, + metadata: Map + ): StreamingUpload { + TODO("Not yet implemented") + } } diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt index ab9d284dd4f8..25ba0171de06 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt @@ -30,6 +30,7 @@ import io.airbyte.cdk.load.file.NoopProcessor import io.airbyte.cdk.load.file.StreamProcessor import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.file.object_storage.StreamingUpload import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Secondary @@ -176,6 +177,33 @@ class S3Client( upload.runUsing(block) return S3Object(key, bucketConfig) } + + override suspend fun startStreamingUpload( + key: String, + metadata: Map + ): StreamingUpload { + // TODO: Remove permit handling once we control concurrency with # of accumulators + if (uploadPermits != null) { + log.info { + "Attempting to acquire upload permit for $key (${uploadPermits.availablePermits} available)" + } + uploadPermits.acquire() + log.info { + "Acquired upload permit for $key (${uploadPermits.availablePermits} available)" + } + } + + val request = CreateMultipartUploadRequest { + this.bucket = bucketConfig.s3BucketName + this.key = key + this.metadata = metadata + } + val response = client.createMultipartUpload(request) + + log.info { "Starting multipart upload for $key (uploadId=${response.uploadId})" } + + return S3StreamingUpload(client, bucketConfig, response, uploadPermits) + } } @Factory diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt index 0455e362d82a..4f6e3a05ecb7 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt @@ -10,17 +10,22 @@ import aws.sdk.kotlin.services.s3.model.CompletedPart import aws.sdk.kotlin.services.s3.model.CreateMultipartUploadResponse import aws.sdk.kotlin.services.s3.model.UploadPartRequest import aws.smithy.kotlin.runtime.content.ByteStream +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration +import io.airbyte.cdk.load.command.s3.S3BucketConfiguration import io.airbyte.cdk.load.file.StreamProcessor +import io.airbyte.cdk.load.file.object_storage.StreamingUpload import io.airbyte.cdk.load.util.setOnce import io.github.oshai.kotlinlogging.KotlinLogging import java.io.ByteArrayOutputStream import java.io.OutputStream +import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Semaphore /** * An S3MultipartUpload that provides an [OutputStream] abstraction for writing data. This should @@ -145,3 +150,48 @@ class S3MultipartUpload( client.completeMultipartUpload(request) } } + +@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") +class S3StreamingUpload( + private val client: aws.sdk.kotlin.services.s3.S3Client, + private val bucketConfig: S3BucketConfiguration, + private val response: CreateMultipartUploadResponse, + private val uploadPermits: Semaphore?, +) : StreamingUpload { + private val log = KotlinLogging.logger {} + private val uploadedParts = ConcurrentLinkedQueue() + + override suspend fun uploadPart(part: ByteArray) { + val partNumber = uploadedParts.size + 1 + val request = UploadPartRequest { + uploadId = response.uploadId + bucket = response.bucket + key = response.key + body = ByteStream.fromBytes(part) + this.partNumber = partNumber + } + val uploadResponse = client.uploadPart(request) + uploadedParts.add( + CompletedPart { + this.partNumber = partNumber + this.eTag = uploadResponse.eTag + } + ) + } + + override suspend fun complete(): S3Object { + log.info { "Completing multipart upload to ${response.key} (uploadId=${response.uploadId}" } + + val request = CompleteMultipartUploadRequest { + uploadId = response.uploadId + bucket = response.bucket + key = response.key + this.multipartUpload = CompletedMultipartUpload { parts = uploadedParts.toList() } + } + client.completeMultipartUpload(request) + // TODO: Remove permit handling once concurrency is managed by controlling # of concurrent + // uploads + uploadPermits?.release() + return S3Object(response.key!!, bucketConfig) + } +} diff --git a/airbyte-integrations/connectors/destination-s3-v2/gradle.properties b/airbyte-integrations/connectors/destination-s3-v2/gradle.properties index 86fdc5a55ef9..258cbecf4207 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/gradle.properties +++ b/airbyte-integrations/connectors/destination-s3-v2/gradle.properties @@ -1,2 +1,2 @@ testExecutionConcurrency=-1 -JunitMethodExecutionTimeout=20 m +JunitMethodExecutionTimeout=25 m diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index c8586eb4dc7b..4890a17cb2a6 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.2.11 + dockerImageTag: 0.2.12 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt index 706eb23f399b..3b9e60024d34 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt @@ -13,7 +13,7 @@ import jakarta.inject.Singleton @Singleton class S3V2Writer( - private val streamLoaderFactory: ObjectStorageStreamLoaderFactory, + private val streamLoaderFactory: ObjectStorageStreamLoaderFactory, ) : DestinationWriter { override fun createStreamLoader(stream: DestinationStream): StreamLoader { return streamLoaderFactory.create(stream) diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 6e26cd5ac317..4f9a24d35814 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -15,7 +15,7 @@ import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout -@Timeout(20, unit = TimeUnit.MINUTES) +@Timeout(25, unit = TimeUnit.MINUTES) abstract class S3V2WriteTest( path: String, stringifySchemalessObjects: Boolean,