From 33d12df6e1cb31050fc3326fde760f03729894f1 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Wed, 28 Aug 2024 15:40:12 -0400 Subject: [PATCH] fix: actually pass connector apm env vars to connector (#13744) --- .../workers/process/KubePodProcess.java | 4 +- .../kotlin/pods/PayloadKubeInputMapper.kt | 24 ++- .../kotlin/pods/PayloadKubeInputMapperTest.kt | 139 ++++++++++++++++++ 3 files changed, 161 insertions(+), 6 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 73ac5c549f2..7104b966932 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -275,8 +275,8 @@ private static Container getMain(final FeatureFlagClient featureFlagClient, List allEnvVars = Lists.concat(envVars, secretEnvVars); if (featureFlagClient.boolVariation(ConnectorApmEnabled.INSTANCE, featureFlagContext)) { - CONNECTOR_DATADOG_SUPPORT_HELPER.addApmEnvVars(envVars); - CONNECTOR_DATADOG_SUPPORT_HELPER.addServerNameAndVersionToEnvVars(image, envVars); + CONNECTOR_DATADOG_SUPPORT_HELPER.addApmEnvVars(allEnvVars); + CONNECTOR_DATADOG_SUPPORT_HELPER.addServerNameAndVersionToEnvVars(image, allEnvVars); } final ContainerBuilder containerBuilder = new ContainerBuilder() diff --git a/airbyte-workload-launcher/src/main/kotlin/pods/PayloadKubeInputMapper.kt b/airbyte-workload-launcher/src/main/kotlin/pods/PayloadKubeInputMapper.kt index fb9bdf298a5..6be1a84c856 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pods/PayloadKubeInputMapper.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pods/PayloadKubeInputMapper.kt @@ -4,6 +4,7 @@ import io.airbyte.commons.workers.config.WorkerConfigs import io.airbyte.config.WorkloadPriority import io.airbyte.config.WorkloadType import io.airbyte.featureflag.Connection +import io.airbyte.featureflag.ConnectorApmEnabled import io.airbyte.featureflag.ConnectorSidecarFetchesInputFromInit import io.airbyte.featureflag.ContainerOrchestratorDevImage import io.airbyte.featureflag.Context @@ -14,6 +15,7 @@ import io.airbyte.featureflag.OrchestratorFetchesInputFromInit import io.airbyte.featureflag.Workspace import io.airbyte.persistence.job.models.IntegrationLauncherConfig import io.airbyte.persistence.job.models.ReplicationInput +import io.airbyte.workers.helper.ConnectorApmSupportHelper import io.airbyte.workers.input.getAttemptId import io.airbyte.workers.input.getDestinationResourceReqs import io.airbyte.workers.input.getJobId @@ -133,16 +135,14 @@ class PayloadKubeInputMapper( val sourceRuntimeEnvVars = input.sourceLauncherConfig.additionalEnvironmentVariables ?.map { EnvVar(it.key, it.value, null) } - ?.toList() - .orEmpty() + ?.toList().orEmpty() + getConnectorApmEnvVars(image = sourceImage, context = buildFFContext(workspaceId = input.workspaceId)) val destinationImage = input.destinationLauncherConfig.dockerImage val destinationReqs = KubePodProcess.buildResourceRequirements(input.getDestinationResourceReqs()) val destinationRuntimeEnvVars = input.destinationLauncherConfig.additionalEnvironmentVariables ?.map { EnvVar(it.key, it.value, null) } - ?.toList() - .orEmpty() + ?.toList().orEmpty() + getConnectorApmEnvVars(image = destinationImage, context = buildFFContext(workspaceId = input.workspaceId)) val labels = labeler.getReplicationLabels( @@ -433,6 +433,22 @@ class PayloadKubeInputMapper( }, ) } + + private fun getConnectorApmEnvVars( + image: String, + context: Context, + ): List { + val connectorApmEnvVars = mutableListOf() + if (featureFlagClient.boolVariation(ConnectorApmEnabled, context)) { + connectorApmSupportHelper.addApmEnvVars(connectorApmEnvVars) + connectorApmSupportHelper.addServerNameAndVersionToEnvVars(image, connectorApmEnvVars) + } + return connectorApmEnvVars.toList() + } + + companion object { + var connectorApmSupportHelper: ConnectorApmSupportHelper = ConnectorApmSupportHelper() + } } data class OrchestratorKubeInput( diff --git a/airbyte-workload-launcher/src/test/kotlin/pods/PayloadKubeInputMapperTest.kt b/airbyte-workload-launcher/src/test/kotlin/pods/PayloadKubeInputMapperTest.kt index 6dcc3205cf5..aef0b1d95e5 100644 --- a/airbyte-workload-launcher/src/test/kotlin/pods/PayloadKubeInputMapperTest.kt +++ b/airbyte-workload-launcher/src/test/kotlin/pods/PayloadKubeInputMapperTest.kt @@ -1,6 +1,7 @@ package io.airbyte.workload.launcher.pods import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.commons.constants.WorkerConstants import io.airbyte.commons.workers.config.WorkerConfigs import io.airbyte.config.ActorType import io.airbyte.config.ResourceRequirements @@ -8,6 +9,7 @@ import io.airbyte.config.StandardCheckConnectionInput import io.airbyte.config.StandardDiscoverCatalogInput import io.airbyte.config.WorkloadPriority import io.airbyte.config.WorkloadType +import io.airbyte.featureflag.ConnectorApmEnabled import io.airbyte.featureflag.ConnectorSidecarFetchesInputFromInit import io.airbyte.featureflag.ContainerOrchestratorDevImage import io.airbyte.featureflag.InjectAwsSecretsToConnectorPods @@ -170,6 +172,7 @@ class PayloadKubeInputMapperTest { every { replConfigs.workerKubeAnnotations } returns annotations val ffClient: TestClient = mockk() every { ffClient.stringVariation(ContainerOrchestratorDevImage, any()) } returns "" + every { ffClient.boolVariation(ConnectorApmEnabled, any()) } returns false val mapper = PayloadKubeInputMapper( @@ -273,6 +276,142 @@ class PayloadKubeInputMapperTest { assertEquals(expectedDestRuntimeEnvVars, result.destinationRuntimeEnvVars) } + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun `builds a kube input from a replication payload with apm support (mono-pod)`(customConnector: Boolean) { + val imageVersion = "1.2.3" + val destinationImageName = "dest-docker-img" + val sourceImageName = "src-docker-img" + val serializer: ObjectSerializer = mockk() + val labeler: PodLabeler = mockk() + val namespace = "test-namespace" + val podName = "a-repl-pod" + val podNameGenerator: PodNameGenerator = mockk() + every { podNameGenerator.getReplicationPodName(any(), any()) } returns podName + val containerInfo = KubeContainerInfo("img-name", "pull-policy") + val awsAssumedRoleEnv: List = listOf() + val replSelectors = mapOf("test-selector" to "normal-repl") + val replCustomSelectors = mapOf("test-selector" to "custom-repl") + val checkConfigs: WorkerConfigs = mockk() + val discoverConfigs: WorkerConfigs = mockk() + val specConfigs: WorkerConfigs = mockk() + val replConfigs: WorkerConfigs = mockk() + every { replConfigs.getworkerKubeNodeSelectors() } returns replSelectors + every { replConfigs.workerIsolatedKubeNodeSelectors } returns Optional.of(replCustomSelectors) + val annotations = mapOf("annotation" to "value2") + every { replConfigs.workerKubeAnnotations } returns annotations + val ffClient: TestClient = mockk() + every { ffClient.stringVariation(ContainerOrchestratorDevImage, any()) } returns "" + every { ffClient.boolVariation(ConnectorApmEnabled, any()) } returns true + + val mapper = + PayloadKubeInputMapper( + serializer, + labeler, + podNameGenerator, + namespace, + containerInfo, + awsAssumedRoleEnv, + replConfigs, + checkConfigs, + discoverConfigs, + specConfigs, + ffClient, + listOf(), + ) + val input: ReplicationInput = mockk() + + mockkStatic("io.airbyte.workers.input.ReplicationInputExtensionsKt") + val jobId = "415" + val attemptId = 7654L + val resourceReqs1 = + ResourceRequirements() + .withCpuLimit("1") + .withMemoryRequest("7Mi") + val resourceReqs2 = + ResourceRequirements() + .withCpuLimit("2") + .withMemoryRequest("3Mi") + val resourceReqs3 = + ResourceRequirements() + .withCpuLimit("2") + .withMemoryRequest("300Mi") + val srcLauncherConfig = + IntegrationLauncherConfig() + .withAdditionalEnvironmentVariables(mapOf("env-1" to "val-1", "env-2" to "val-2")) + .withDockerImage("$sourceImageName:$imageVersion") + val destLauncherConfig = + IntegrationLauncherConfig() + .withDockerImage("$destinationImageName:$imageVersion") + .withAdditionalEnvironmentVariables(mapOf("env-3" to "val-3")) + + every { input.getJobId() } returns jobId + every { input.getAttemptId() } returns attemptId + every { input.getOrchestratorResourceReqs() } returns resourceReqs1 + every { input.getSourceResourceReqs() } returns resourceReqs2 + every { input.getDestinationResourceReqs() } returns resourceReqs3 + every { input.usesCustomConnector() } returns customConnector + every { input.jobRunConfig } returns mockk() + every { input.sourceLauncherConfig } returns srcLauncherConfig + every { input.destinationLauncherConfig } returns destLauncherConfig + every { input.connectionId } returns mockk() + every { input.workspaceId } returns mockk() + + val mockSerializedOutput = "Serialized Obj." + every { serializer.serialize(any()) } returns mockSerializedOutput + + val replLabels = mapOf("orchestrator" to "labels") + val sharedLabels = mapOf("pass through" to "labels") + every { + labeler.getReplicationLabels( + containerInfo.image, + srcLauncherConfig.dockerImage, + destLauncherConfig.dockerImage, + ) + } returns replLabels + val workloadId = UUID.randomUUID().toString() + val result = mapper.toReplicationKubeInput(workloadId, input, sharedLabels) + + assertEquals(podName, result.podName) + assertEquals(replLabels + sharedLabels, result.labels) + assertEquals(if (customConnector) replCustomSelectors else replSelectors, result.nodeSelectors) + assertEquals(annotations, result.annotations) + assertEquals(containerInfo.image, result.orchestratorImage) + assertEquals(srcLauncherConfig.dockerImage, result.sourceImage) + assertEquals(destLauncherConfig.dockerImage, result.destinationImage) + assertEquals(KubePodProcess.buildResourceRequirements(resourceReqs1), result.orchestratorReqs) + assertEquals(KubePodProcess.buildResourceRequirements(resourceReqs2), result.sourceReqs) + assertEquals(KubePodProcess.buildResourceRequirements(resourceReqs3), result.destinationReqs) + + val expectedOrchestratorRuntimeEnvVars = + listOf( + EnvVar(AirbyteEnvVar.MONO_POD.toString(), true.toString(), null), + EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.SYNC.toString(), null), + EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null), + EnvVar(AirbyteEnvVar.JOB_ID.toString(), jobId, null), + EnvVar(AirbyteEnvVar.ATTEMPT_ID.toString(), attemptId.toString(), null), + ) + val expectedSrcRuntimeEnvVars = + listOf( + EnvVar("env-1", "val-1", null), + EnvVar("env-2", "val-2", null), + EnvVar(io.airbyte.commons.envvar.EnvVar.JAVA_OPTS.name, WorkerConstants.DD_ENV_VAR, null), + EnvVar(io.airbyte.commons.envvar.EnvVar.DD_SERVICE.name, sourceImageName, null), + EnvVar(io.airbyte.commons.envvar.EnvVar.DD_VERSION.name, imageVersion, null), + ).sortedBy { it.name } + val expectedDestRuntimeEnvVars = + listOf( + EnvVar("env-3", "val-3", null), + EnvVar(io.airbyte.commons.envvar.EnvVar.JAVA_OPTS.name, WorkerConstants.DD_ENV_VAR, null), + EnvVar(io.airbyte.commons.envvar.EnvVar.DD_SERVICE.name, destinationImageName, null), + EnvVar(io.airbyte.commons.envvar.EnvVar.DD_VERSION.name, imageVersion, null), + ).sortedBy { it.name } + + assertEquals(expectedOrchestratorRuntimeEnvVars, result.orchestratorRuntimeEnvVars) + assertEquals(expectedSrcRuntimeEnvVars, result.sourceRuntimeEnvVars.sortedBy { it.name }) + assertEquals(expectedDestRuntimeEnvVars, result.destinationRuntimeEnvVars.sortedBy { it.name }) + } + @ParameterizedTest @MethodSource("connectorInputMatrix") fun `builds a kube input from a check payload`(