Skip to content

Commit

Permalink
feat: handle new source config format (#14493)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Oct 29, 2024
1 parent e0225d4 commit 000a748
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class ReplicationInputHydrator {
private final BackfillHelper backfillHelper;
private final CatalogClientConverters catalogClientConverters;

static final String FILE_TRANSFER_DELIVERY_TYPE = "use_file_transfer";

public ReplicationInputHydrator(final AirbyteApiClient airbyteApiClient,
final ResumableFullRefreshStatsHelper resumableFullRefreshStatsHelper,
final SecretsRepositoryReader secretsRepositoryReader,
Expand Down Expand Up @@ -142,7 +144,10 @@ public ReplicationInput getHydratedReplicationInput(final ReplicationActivityInp
new ResolveActorDefinitionVersionRequestBody(destination.getDestinationDefinitionId(), ActorType.DESTINATION, tag));

final SourceActorConfig sourceActorConfig = Jsons.object(replicationActivityInput.getSourceConfiguration(), SourceActorConfig.class);
if (sourceActorConfig.getUseFileTransfer() && !resolvedDestinationVersion.getSupportFileTransfer()) {
final boolean useFileTransfer = sourceActorConfig.getUseFileTransfer() || (sourceActorConfig.getDeliveryMethod() != null
&& FILE_TRANSFER_DELIVERY_TYPE.equals(sourceActorConfig.getDeliveryMethod().getDeliveryType()));

if (useFileTransfer && !resolvedDestinationVersion.getSupportFileTransfer()) {
final String errorMessage = "Destination does not support file transfers, but source requires it. The destination version is: "
+ resolvedDestinationVersion.getDockerImageTag();
LOGGER.error(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ class ReplicationInputMapper {
fun toReplicationInput(replicationActivityInput: ReplicationActivityInput): ReplicationInput {
// TODO: Remove any introspection of connector configs. Determine whether to use 'file transfer' mode another way.
val sourceConfiguration: SourceActorConfig = Jsons.`object`(replicationActivityInput.sourceConfiguration, SourceActorConfig::class.java)
val useFileTransfer =
sourceConfiguration.useFileTransfer || (
sourceConfiguration.deliveryMethod != null &&
"use_file_transfer".equals(
sourceConfiguration.deliveryMethod.deliveryType,
)
)

return ReplicationInput()
.withNamespaceDefinition(replicationActivityInput.namespaceDefinition)
.withNamespaceFormat(replicationActivityInput.namespaceFormat)
Expand All @@ -35,6 +43,6 @@ class ReplicationInputMapper {
.withSourceConfiguration(replicationActivityInput.sourceConfiguration)
.withDestinationConfiguration(replicationActivityInput.destinationConfiguration)
.withConnectionContext(replicationActivityInput.connectionContext)
.withUseFileTransfer(sourceConfiguration.useFileTransfer)
.withUseFileTransfer(useFileTransfer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers;

import static io.airbyte.workers.ReplicationInputHydrator.FILE_TRANSFER_DELIVERY_TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -50,6 +51,7 @@
import io.airbyte.commons.converters.CatalogClientConverters;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectionContext;
import io.airbyte.config.DeliveryMethod;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.SourceActorConfig;
import io.airbyte.config.State;
Expand Down Expand Up @@ -336,6 +338,10 @@ void testGenerateReplicationFailsIfNonCompatibleFileTransfer() throws Exception
final var replicationActivityInput = getDefaultReplicationActivityInputForTest();
replicationActivityInput.setSourceConfiguration(Jsons.jsonNode(new SourceActorConfig().withUseFileTransfer(true)));
assertThrows(WorkerException.class, () -> replicationInputHydrator.getHydratedReplicationInput(replicationActivityInput));

replicationActivityInput.setSourceConfiguration(Jsons.jsonNode(new SourceActorConfig().withUseFileTransfer(false)
.withDeliveryMethod(new DeliveryMethod().withDeliveryType(FILE_TRANSFER_DELIVERY_TYPE))));
assertThrows(WorkerException.class, () -> replicationInputHydrator.getHydratedReplicationInput(replicationActivityInput));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package io.airbyte.workers.input

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.commons.json.Jsons
import io.airbyte.config.ConnectionContext
import io.airbyte.config.JobSyncConfig
import io.airbyte.config.SyncResourceRequirements
import io.airbyte.persistence.job.models.IntegrationLauncherConfig
import io.airbyte.persistence.job.models.JobRunConfig
import io.airbyte.workers.input.ReplicationInputMapperTest.Fixtures.replicationActivityInput
import io.airbyte.workers.input.ReplicationInputMapperTest.Fixtures.getActivityInputForSourceConfig
import io.airbyte.workers.models.ReplicationActivityInput
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
import java.util.UUID
import java.util.stream.Stream

class ReplicationInputMapperTest {
@Test
fun `map activity input to repl input`() {
@ParameterizedTest
@MethodSource("useFileTransferFormat")
fun `map activity input to repl input`(replicationActivityInput: ReplicationActivityInput) {
val mapper = ReplicationInputMapper()

val replicationInput = mapper.toReplicationInput(replicationActivityInput)
Expand All @@ -37,14 +43,35 @@ class ReplicationInputMapperTest {
assertEquals(replicationActivityInput.sourceConfiguration, replicationInput.sourceConfiguration)
assertEquals(replicationActivityInput.destinationConfiguration, replicationInput.destinationConfiguration)
assertEquals(replicationActivityInput.connectionContext, replicationInput.connectionContext)
assertEquals(true, replicationInput.useFileTransfer)
}

@Test
fun `map activity input to repl input file transfer default`() {
val replicationActivityInputSimpleFileTransfer = getActivityInputForSourceConfig(Jsons.jsonNode(mapOf("no" to "file transfer")))

val mapper = ReplicationInputMapper()

val replicationInput = mapper.toReplicationInput(replicationActivityInputSimpleFileTransfer)

assertEquals(false, replicationInput.useFileTransfer)
}

companion object {
@JvmStatic
fun useFileTransferFormat(): Stream<Arguments> =
Stream.of(
Arguments.of(Fixtures.replicationActivityInputSimpleFileTransfer),
Arguments.of(Fixtures.replicationActivityInputNewFileTransfer),
)
}

object Fixtures {
val replicationActivityInput =
ReplicationActivityInput(
fun getActivityInputForSourceConfig(sourceConfig: JsonNode): ReplicationActivityInput {
return ReplicationActivityInput(
UUID.randomUUID(),
UUID.randomUUID(),
Jsons.jsonNode(mapOf("source" to "configuration")),
sourceConfig,
Jsons.jsonNode(mapOf("destination" to "configuration")),
JobRunConfig().withJobId("123").withAttemptId(0L),
IntegrationLauncherConfig().withDockerImage("source:dockertag"),
Expand All @@ -61,5 +88,12 @@ class ReplicationInputMapperTest {
ConnectionContext().withOrganizationId(UUID.randomUUID()),
null,
)
}

val replicationActivityInputSimpleFileTransfer =
getActivityInputForSourceConfig(Jsons.jsonNode(mapOf("use_file_transfer" to true)))

val replicationActivityInputNewFileTransfer =
getActivityInputForSourceConfig(Jsons.jsonNode(mapOf("delivery_method" to mapOf("delivery_type" to "use_file_transfer"))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ properties:
use_file_transfer:
type: boolean
default: false
delivery_method:
type: object
properties:
delivery_type:
type: string
default: "record"

0 comments on commit 000a748

Please sign in to comment.