Skip to content

Commit

Permalink
fix: actually pass connector apm env vars to connector (#13744)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Aug 28, 2024
1 parent 986f00b commit 33d12df
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ private static Container getMain(final FeatureFlagClient featureFlagClient,
List<EnvVar> allEnvVars = Lists.concat(envVars, secretEnvVars);

if (featureFlagClient.boolVariation(ConnectorApmEnabled.INSTANCE, featureFlagContext)) {
CONNECTOR_DATADOG_SUPPORT_HELPER.addApmEnvVars(envVars);
CONNECTOR_DATADOG_SUPPORT_HELPER.addServerNameAndVersionToEnvVars(image, envVars);
CONNECTOR_DATADOG_SUPPORT_HELPER.addApmEnvVars(allEnvVars);
CONNECTOR_DATADOG_SUPPORT_HELPER.addServerNameAndVersionToEnvVars(image, allEnvVars);
}

final ContainerBuilder containerBuilder = new ContainerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.airbyte.commons.workers.config.WorkerConfigs
import io.airbyte.config.WorkloadPriority
import io.airbyte.config.WorkloadType
import io.airbyte.featureflag.Connection
import io.airbyte.featureflag.ConnectorApmEnabled
import io.airbyte.featureflag.ConnectorSidecarFetchesInputFromInit
import io.airbyte.featureflag.ContainerOrchestratorDevImage
import io.airbyte.featureflag.Context
Expand All @@ -14,6 +15,7 @@ import io.airbyte.featureflag.OrchestratorFetchesInputFromInit
import io.airbyte.featureflag.Workspace
import io.airbyte.persistence.job.models.IntegrationLauncherConfig
import io.airbyte.persistence.job.models.ReplicationInput
import io.airbyte.workers.helper.ConnectorApmSupportHelper
import io.airbyte.workers.input.getAttemptId
import io.airbyte.workers.input.getDestinationResourceReqs
import io.airbyte.workers.input.getJobId
Expand Down Expand Up @@ -133,16 +135,14 @@ class PayloadKubeInputMapper(
val sourceRuntimeEnvVars =
input.sourceLauncherConfig.additionalEnvironmentVariables
?.map { EnvVar(it.key, it.value, null) }
?.toList()
.orEmpty()
?.toList().orEmpty() + getConnectorApmEnvVars(image = sourceImage, context = buildFFContext(workspaceId = input.workspaceId))

val destinationImage = input.destinationLauncherConfig.dockerImage
val destinationReqs = KubePodProcess.buildResourceRequirements(input.getDestinationResourceReqs())
val destinationRuntimeEnvVars =
input.destinationLauncherConfig.additionalEnvironmentVariables
?.map { EnvVar(it.key, it.value, null) }
?.toList()
.orEmpty()
?.toList().orEmpty() + getConnectorApmEnvVars(image = destinationImage, context = buildFFContext(workspaceId = input.workspaceId))

val labels =
labeler.getReplicationLabels(
Expand Down Expand Up @@ -433,6 +433,22 @@ class PayloadKubeInputMapper(
},
)
}

private fun getConnectorApmEnvVars(
image: String,
context: Context,
): List<EnvVar> {
val connectorApmEnvVars = mutableListOf<EnvVar>()
if (featureFlagClient.boolVariation(ConnectorApmEnabled, context)) {
connectorApmSupportHelper.addApmEnvVars(connectorApmEnvVars)
connectorApmSupportHelper.addServerNameAndVersionToEnvVars(image, connectorApmEnvVars)
}
return connectorApmEnvVars.toList()
}

companion object {
var connectorApmSupportHelper: ConnectorApmSupportHelper = ConnectorApmSupportHelper()
}
}

data class OrchestratorKubeInput(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package io.airbyte.workload.launcher.pods

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.commons.constants.WorkerConstants
import io.airbyte.commons.workers.config.WorkerConfigs
import io.airbyte.config.ActorType
import io.airbyte.config.ResourceRequirements
import io.airbyte.config.StandardCheckConnectionInput
import io.airbyte.config.StandardDiscoverCatalogInput
import io.airbyte.config.WorkloadPriority
import io.airbyte.config.WorkloadType
import io.airbyte.featureflag.ConnectorApmEnabled
import io.airbyte.featureflag.ConnectorSidecarFetchesInputFromInit
import io.airbyte.featureflag.ContainerOrchestratorDevImage
import io.airbyte.featureflag.InjectAwsSecretsToConnectorPods
Expand Down Expand Up @@ -170,6 +172,7 @@ class PayloadKubeInputMapperTest {
every { replConfigs.workerKubeAnnotations } returns annotations
val ffClient: TestClient = mockk()
every { ffClient.stringVariation(ContainerOrchestratorDevImage, any()) } returns ""
every { ffClient.boolVariation(ConnectorApmEnabled, any()) } returns false

val mapper =
PayloadKubeInputMapper(
Expand Down Expand Up @@ -273,6 +276,142 @@ class PayloadKubeInputMapperTest {
assertEquals(expectedDestRuntimeEnvVars, result.destinationRuntimeEnvVars)
}

@ParameterizedTest
@ValueSource(booleans = [true, false])
fun `builds a kube input from a replication payload with apm support (mono-pod)`(customConnector: Boolean) {
val imageVersion = "1.2.3"
val destinationImageName = "dest-docker-img"
val sourceImageName = "src-docker-img"
val serializer: ObjectSerializer = mockk()
val labeler: PodLabeler = mockk()
val namespace = "test-namespace"
val podName = "a-repl-pod"
val podNameGenerator: PodNameGenerator = mockk()
every { podNameGenerator.getReplicationPodName(any(), any()) } returns podName
val containerInfo = KubeContainerInfo("img-name", "pull-policy")
val awsAssumedRoleEnv: List<EnvVar> = listOf()
val replSelectors = mapOf("test-selector" to "normal-repl")
val replCustomSelectors = mapOf("test-selector" to "custom-repl")
val checkConfigs: WorkerConfigs = mockk()
val discoverConfigs: WorkerConfigs = mockk()
val specConfigs: WorkerConfigs = mockk()
val replConfigs: WorkerConfigs = mockk()
every { replConfigs.getworkerKubeNodeSelectors() } returns replSelectors
every { replConfigs.workerIsolatedKubeNodeSelectors } returns Optional.of(replCustomSelectors)
val annotations = mapOf("annotation" to "value2")
every { replConfigs.workerKubeAnnotations } returns annotations
val ffClient: TestClient = mockk()
every { ffClient.stringVariation(ContainerOrchestratorDevImage, any()) } returns ""
every { ffClient.boolVariation(ConnectorApmEnabled, any()) } returns true

val mapper =
PayloadKubeInputMapper(
serializer,
labeler,
podNameGenerator,
namespace,
containerInfo,
awsAssumedRoleEnv,
replConfigs,
checkConfigs,
discoverConfigs,
specConfigs,
ffClient,
listOf(),
)
val input: ReplicationInput = mockk()

mockkStatic("io.airbyte.workers.input.ReplicationInputExtensionsKt")
val jobId = "415"
val attemptId = 7654L
val resourceReqs1 =
ResourceRequirements()
.withCpuLimit("1")
.withMemoryRequest("7Mi")
val resourceReqs2 =
ResourceRequirements()
.withCpuLimit("2")
.withMemoryRequest("3Mi")
val resourceReqs3 =
ResourceRequirements()
.withCpuLimit("2")
.withMemoryRequest("300Mi")
val srcLauncherConfig =
IntegrationLauncherConfig()
.withAdditionalEnvironmentVariables(mapOf("env-1" to "val-1", "env-2" to "val-2"))
.withDockerImage("$sourceImageName:$imageVersion")
val destLauncherConfig =
IntegrationLauncherConfig()
.withDockerImage("$destinationImageName:$imageVersion")
.withAdditionalEnvironmentVariables(mapOf("env-3" to "val-3"))

every { input.getJobId() } returns jobId
every { input.getAttemptId() } returns attemptId
every { input.getOrchestratorResourceReqs() } returns resourceReqs1
every { input.getSourceResourceReqs() } returns resourceReqs2
every { input.getDestinationResourceReqs() } returns resourceReqs3
every { input.usesCustomConnector() } returns customConnector
every { input.jobRunConfig } returns mockk<JobRunConfig>()
every { input.sourceLauncherConfig } returns srcLauncherConfig
every { input.destinationLauncherConfig } returns destLauncherConfig
every { input.connectionId } returns mockk<UUID>()
every { input.workspaceId } returns mockk<UUID>()

val mockSerializedOutput = "Serialized Obj."
every { serializer.serialize<Any>(any()) } returns mockSerializedOutput

val replLabels = mapOf("orchestrator" to "labels")
val sharedLabels = mapOf("pass through" to "labels")
every {
labeler.getReplicationLabels(
containerInfo.image,
srcLauncherConfig.dockerImage,
destLauncherConfig.dockerImage,
)
} returns replLabels
val workloadId = UUID.randomUUID().toString()
val result = mapper.toReplicationKubeInput(workloadId, input, sharedLabels)

assertEquals(podName, result.podName)
assertEquals(replLabels + sharedLabels, result.labels)
assertEquals(if (customConnector) replCustomSelectors else replSelectors, result.nodeSelectors)
assertEquals(annotations, result.annotations)
assertEquals(containerInfo.image, result.orchestratorImage)
assertEquals(srcLauncherConfig.dockerImage, result.sourceImage)
assertEquals(destLauncherConfig.dockerImage, result.destinationImage)
assertEquals(KubePodProcess.buildResourceRequirements(resourceReqs1), result.orchestratorReqs)
assertEquals(KubePodProcess.buildResourceRequirements(resourceReqs2), result.sourceReqs)
assertEquals(KubePodProcess.buildResourceRequirements(resourceReqs3), result.destinationReqs)

val expectedOrchestratorRuntimeEnvVars =
listOf(
EnvVar(AirbyteEnvVar.MONO_POD.toString(), true.toString(), null),
EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.SYNC.toString(), null),
EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null),
EnvVar(AirbyteEnvVar.JOB_ID.toString(), jobId, null),
EnvVar(AirbyteEnvVar.ATTEMPT_ID.toString(), attemptId.toString(), null),
)
val expectedSrcRuntimeEnvVars =
listOf(
EnvVar("env-1", "val-1", null),
EnvVar("env-2", "val-2", null),
EnvVar(io.airbyte.commons.envvar.EnvVar.JAVA_OPTS.name, WorkerConstants.DD_ENV_VAR, null),
EnvVar(io.airbyte.commons.envvar.EnvVar.DD_SERVICE.name, sourceImageName, null),
EnvVar(io.airbyte.commons.envvar.EnvVar.DD_VERSION.name, imageVersion, null),
).sortedBy { it.name }
val expectedDestRuntimeEnvVars =
listOf(
EnvVar("env-3", "val-3", null),
EnvVar(io.airbyte.commons.envvar.EnvVar.JAVA_OPTS.name, WorkerConstants.DD_ENV_VAR, null),
EnvVar(io.airbyte.commons.envvar.EnvVar.DD_SERVICE.name, destinationImageName, null),
EnvVar(io.airbyte.commons.envvar.EnvVar.DD_VERSION.name, imageVersion, null),
).sortedBy { it.name }

assertEquals(expectedOrchestratorRuntimeEnvVars, result.orchestratorRuntimeEnvVars)
assertEquals(expectedSrcRuntimeEnvVars, result.sourceRuntimeEnvVars.sortedBy { it.name })
assertEquals(expectedDestRuntimeEnvVars, result.destinationRuntimeEnvVars.sortedBy { it.name })
}

@ParameterizedTest
@MethodSource("connectorInputMatrix")
fun `builds a kube input from a check payload`(
Expand Down

0 comments on commit 33d12df

Please sign in to comment.