diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/ReplicationInputHydrator.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/ReplicationInputHydrator.java index be327ac8bb..deb9917b5e 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/ReplicationInputHydrator.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/ReplicationInputHydrator.java @@ -75,6 +75,8 @@ public class ReplicationInputHydrator { private final BackfillHelper backfillHelper; private final CatalogClientConverters catalogClientConverters; + static final String FILE_TRANSFER_DELIVERY_TYPE = "use_file_transfer"; + public ReplicationInputHydrator(final AirbyteApiClient airbyteApiClient, final ResumableFullRefreshStatsHelper resumableFullRefreshStatsHelper, final SecretsRepositoryReader secretsRepositoryReader, @@ -142,7 +144,10 @@ public ReplicationInput getHydratedReplicationInput(final ReplicationActivityInp new ResolveActorDefinitionVersionRequestBody(destination.getDestinationDefinitionId(), ActorType.DESTINATION, tag)); final SourceActorConfig sourceActorConfig = Jsons.object(replicationActivityInput.getSourceConfiguration(), SourceActorConfig.class); - if (sourceActorConfig.getUseFileTransfer() && !resolvedDestinationVersion.getSupportFileTransfer()) { + final boolean useFileTransfer = sourceActorConfig.getUseFileTransfer() || (sourceActorConfig.getDeliveryMethod() != null + && FILE_TRANSFER_DELIVERY_TYPE.equals(sourceActorConfig.getDeliveryMethod().getDeliveryType())); + + if (useFileTransfer && !resolvedDestinationVersion.getSupportFileTransfer()) { final String errorMessage = "Destination does not support file transfers, but source requires it. The destination version is: " + resolvedDestinationVersion.getDockerImageTag(); LOGGER.error(errorMessage); diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/input/ReplicationInputMapper.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/input/ReplicationInputMapper.kt index 9bedd57ea1..8b1232642a 100644 --- a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/input/ReplicationInputMapper.kt +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/input/ReplicationInputMapper.kt @@ -18,6 +18,14 @@ class ReplicationInputMapper { fun toReplicationInput(replicationActivityInput: ReplicationActivityInput): ReplicationInput { // TODO: Remove any introspection of connector configs. Determine whether to use 'file transfer' mode another way. val sourceConfiguration: SourceActorConfig = Jsons.`object`(replicationActivityInput.sourceConfiguration, SourceActorConfig::class.java) + val useFileTransfer = + sourceConfiguration.useFileTransfer || ( + sourceConfiguration.deliveryMethod != null && + "use_file_transfer".equals( + sourceConfiguration.deliveryMethod.deliveryType, + ) + ) + return ReplicationInput() .withNamespaceDefinition(replicationActivityInput.namespaceDefinition) .withNamespaceFormat(replicationActivityInput.namespaceFormat) @@ -35,6 +43,6 @@ class ReplicationInputMapper { .withSourceConfiguration(replicationActivityInput.sourceConfiguration) .withDestinationConfiguration(replicationActivityInput.destinationConfiguration) .withConnectionContext(replicationActivityInput.connectionContext) - .withUseFileTransfer(sourceConfiguration.useFileTransfer) + .withUseFileTransfer(useFileTransfer) } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/ReplicationInputHydratorTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/ReplicationInputHydratorTest.java index 61e179c59c..096ad9a4bf 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/ReplicationInputHydratorTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/ReplicationInputHydratorTest.java @@ -4,6 +4,7 @@ package io.airbyte.workers; +import static io.airbyte.workers.ReplicationInputHydrator.FILE_TRANSFER_DELIVERY_TYPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -50,6 +51,7 @@ import io.airbyte.commons.converters.CatalogClientConverters; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConnectionContext; +import io.airbyte.config.DeliveryMethod; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.SourceActorConfig; import io.airbyte.config.State; @@ -336,6 +338,10 @@ void testGenerateReplicationFailsIfNonCompatibleFileTransfer() throws Exception final var replicationActivityInput = getDefaultReplicationActivityInputForTest(); replicationActivityInput.setSourceConfiguration(Jsons.jsonNode(new SourceActorConfig().withUseFileTransfer(true))); assertThrows(WorkerException.class, () -> replicationInputHydrator.getHydratedReplicationInput(replicationActivityInput)); + + replicationActivityInput.setSourceConfiguration(Jsons.jsonNode(new SourceActorConfig().withUseFileTransfer(false) + .withDeliveryMethod(new DeliveryMethod().withDeliveryType(FILE_TRANSFER_DELIVERY_TYPE)))); + assertThrows(WorkerException.class, () -> replicationInputHydrator.getHydratedReplicationInput(replicationActivityInput)); } @Test diff --git a/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/input/ReplicationInputMapperTest.kt b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/input/ReplicationInputMapperTest.kt index 64d2156339..6963c27c8c 100644 --- a/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/input/ReplicationInputMapperTest.kt +++ b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/input/ReplicationInputMapperTest.kt @@ -1,20 +1,26 @@ package io.airbyte.workers.input +import com.fasterxml.jackson.databind.JsonNode import io.airbyte.commons.json.Jsons import io.airbyte.config.ConnectionContext import io.airbyte.config.JobSyncConfig import io.airbyte.config.SyncResourceRequirements import io.airbyte.persistence.job.models.IntegrationLauncherConfig import io.airbyte.persistence.job.models.JobRunConfig -import io.airbyte.workers.input.ReplicationInputMapperTest.Fixtures.replicationActivityInput +import io.airbyte.workers.input.ReplicationInputMapperTest.Fixtures.getActivityInputForSourceConfig import io.airbyte.workers.models.ReplicationActivityInput import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource import java.util.UUID +import java.util.stream.Stream class ReplicationInputMapperTest { - @Test - fun `map activity input to repl input`() { + @ParameterizedTest + @MethodSource("useFileTransferFormat") + fun `map activity input to repl input`(replicationActivityInput: ReplicationActivityInput) { val mapper = ReplicationInputMapper() val replicationInput = mapper.toReplicationInput(replicationActivityInput) @@ -37,14 +43,35 @@ class ReplicationInputMapperTest { assertEquals(replicationActivityInput.sourceConfiguration, replicationInput.sourceConfiguration) assertEquals(replicationActivityInput.destinationConfiguration, replicationInput.destinationConfiguration) assertEquals(replicationActivityInput.connectionContext, replicationInput.connectionContext) + assertEquals(true, replicationInput.useFileTransfer) + } + + @Test + fun `map activity input to repl input file transfer default`() { + val replicationActivityInputSimpleFileTransfer = getActivityInputForSourceConfig(Jsons.jsonNode(mapOf("no" to "file transfer"))) + + val mapper = ReplicationInputMapper() + + val replicationInput = mapper.toReplicationInput(replicationActivityInputSimpleFileTransfer) + + assertEquals(false, replicationInput.useFileTransfer) + } + + companion object { + @JvmStatic + fun useFileTransferFormat(): Stream = + Stream.of( + Arguments.of(Fixtures.replicationActivityInputSimpleFileTransfer), + Arguments.of(Fixtures.replicationActivityInputNewFileTransfer), + ) } object Fixtures { - val replicationActivityInput = - ReplicationActivityInput( + fun getActivityInputForSourceConfig(sourceConfig: JsonNode): ReplicationActivityInput { + return ReplicationActivityInput( UUID.randomUUID(), UUID.randomUUID(), - Jsons.jsonNode(mapOf("source" to "configuration")), + sourceConfig, Jsons.jsonNode(mapOf("destination" to "configuration")), JobRunConfig().withJobId("123").withAttemptId(0L), IntegrationLauncherConfig().withDockerImage("source:dockertag"), @@ -61,5 +88,12 @@ class ReplicationInputMapperTest { ConnectionContext().withOrganizationId(UUID.randomUUID()), null, ) + } + + val replicationActivityInputSimpleFileTransfer = + getActivityInputForSourceConfig(Jsons.jsonNode(mapOf("use_file_transfer" to true))) + + val replicationActivityInputNewFileTransfer = + getActivityInputForSourceConfig(Jsons.jsonNode(mapOf("delivery_method" to mapOf("delivery_type" to "use_file_transfer")))) } } diff --git a/airbyte-config/config-models/src/main/resources/types/SourceActorConfig.yaml b/airbyte-config/config-models/src/main/resources/types/SourceActorConfig.yaml index 17647bd289..d788e30a65 100644 --- a/airbyte-config/config-models/src/main/resources/types/SourceActorConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/SourceActorConfig.yaml @@ -9,3 +9,9 @@ properties: use_file_transfer: type: boolean default: false + delivery_method: + type: object + properties: + delivery_type: + type: string + default: "record"