Skip to content

Commit

Permalink
chore: pass correct env vars to read/write containers in megapod. Org…
Browse files Browse the repository at this point in the history
…anize runtime env var code. (#13790)
  • Loading branch information
tryangul committed Sep 3, 2024
1 parent 805247b commit 6361736
Show file tree
Hide file tree
Showing 13 changed files with 731 additions and 368 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import io.airbyte.workers.CheckConnectionInputHydrator
import io.airbyte.workers.ConnectorSecretsHydrator
import io.airbyte.workers.DiscoverCatalogInputHydrator
import io.airbyte.workers.ReplicationInputHydrator
import io.airbyte.workers.helper.ConnectorApmSupportHelper
import io.airbyte.workers.helper.ResumableFullRefreshStatsHelper
import io.micrometer.core.instrument.MeterRegistry
import io.micronaut.context.annotation.Factory
Expand Down Expand Up @@ -150,4 +151,9 @@ class ApplicationBeanFactory {
listOf(Geography(geography), PlaneName(dataPlaneName))
}
}

@Singleton
fun connectorApmSupportHelper(): ConnectorApmSupportHelper {
return ConnectorApmSupportHelper()
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.airbyte.featureflag.Connection
import io.airbyte.featureflag.ContainerOrchestratorJavaOpts
import io.airbyte.featureflag.FeatureFlagClient
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.launcher.constants.EnvVarConstants
import io.fabric8.kubernetes.api.model.EnvVar
import io.fabric8.kubernetes.api.model.EnvVarSource
import io.micronaut.context.annotation.Value
Expand Down Expand Up @@ -80,9 +81,9 @@ class OrchestratorEnvSingleton(
) {
val injectedJavaOpts: String = featureFlagClient.stringVariation(ContainerOrchestratorJavaOpts, Connection(connectionId))
if (injectedJavaOpts.isNotEmpty()) {
envMap[EnvVarConfigBeanFactory.JAVA_OPTS_ENV_VAR] = injectedJavaOpts.trim()
envMap[EnvVarConstants.JAVA_OPTS_ENV_VAR] = injectedJavaOpts.trim()
} else {
envMap[EnvVarConfigBeanFactory.JAVA_OPTS_ENV_VAR] = containerOrchestratorJavaOpts
envMap[EnvVarConstants.JAVA_OPTS_ENV_VAR] = containerOrchestratorJavaOpts
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.airbyte.workload.launcher.constants

import io.airbyte.workers.workload.WorkloadConstants

/**
* Env var names and other string constants used to set env vars for the containers we launch.
*
* Over time, we should try to collect the env vars here. Moving this to a shared library is also likely.
*/
object EnvVarConstants {
const val METRIC_CLIENT_ENV_VAR = "METRIC_CLIENT"
const val DD_AGENT_HOST_ENV_VAR = "DD_AGENT_HOST"
const val DD_DOGSTATSD_PORT_ENV_VAR = "DD_DOGSTATSD_PORT"
const val DD_ENV_ENV_VAR = "DD_ENV"
const val DD_SERVICE_ENV_VAR = "DD_SERVICE"
const val DD_VERSION_ENV_VAR = "DD_VERSION"
const val JAVA_OPTS_ENV_VAR = "JAVA_OPTS"
const val PUBLISH_METRICS_ENV_VAR = "PUBLISH_METRICS"
const val CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR = "CONTROL_PLANE_AUTH_ENDPOINT"
const val DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR = "DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH"
const val DATA_PLANE_SERVICE_ACCOUNT_EMAIL_ENV_VAR = "DATA_PLANE_SERVICE_ACCOUNT_EMAIL"
const val AIRBYTE_API_AUTH_HEADER_NAME_ENV_VAR = "AIRBYTE_API_AUTH_HEADER_NAME"
const val AIRBYTE_API_AUTH_HEADER_VALUE_ENV_VAR = "AIRBYTE_API_AUTH_HEADER_VALUE"
const val KEYCLOAK_CLIENT_ID_ENV_VAR = "KEYCLOAK_CLIENT_ID"
const val KEYCLOAK_INTERNAL_REALM_ISSUER_ENV_VAR = "KEYCLOAK_INTERNAL_REALM_ISSUER"
const val INTERNAL_API_HOST_ENV_VAR = "INTERNAL_API_HOST"
const val ACCEPTANCE_TEST_ENABLED_VAR = "ACCEPTANCE_TEST_ENABLED"
const val DD_INTEGRATION_ENV_VAR_FORMAT = "DD_INTEGRATION_%s_ENABLED"
const val WORKER_V2_MICRONAUT_ENV = WorkloadConstants.WORKER_V2_MICRONAUT_ENV
const val WORKLOAD_API_HOST_ENV_VAR = "WORKLOAD_API_HOST"
const val WORKLOAD_API_CONNECT_TIMEOUT_SECONDS_ENV_VAR = "WORKLOAD_API_CONNECT_TIMEOUT_SECONDS"
const val WORKLOAD_API_READ_TIMEOUT_SECONDS_ENV_VAR = "WORKLOAD_API_READ_TIMEOUT_SECONDS"
const val WORKLOAD_API_RETRY_DELAY_SECONDS_ENV_VAR = "WORKLOAD_API_RETRY_DELAY_SECONDS"
const val WORKLOAD_API_MAX_RETRIES_ENV_VAR = "WORKLOAD_API_MAX_RETRIES"
const val SECRET_PERSISTENCE = "SECRET_PERSISTENCE"
const val SECRET_STORE_GCP_PROJECT_ID = "SECRET_STORE_GCP_PROJECT_ID"
const val AWS_SECRET_MANAGER_REGION = "AWS_SECRET_MANAGER_REGION"
const val AWS_KMS_KEY_ARN = "AWS_KMS_KEY_ARN"
const val AWS_SECRET_MANAGER_SECRET_TAGS = "AWS_SECRET_MANAGER_SECRET_TAGS"
const val VAULT_ADDRESS = "VAULT_ADDRESS"
const val VAULT_PREFIX = "VAULT_PREFIX"
const val CONCURRENT_SOURCE_STREAM_READ_ENV_VAR = "CONCURRENT_SOURCE_STREAM_READ"
const val USE_STREAM_CAPABLE_STATE_ENV_VAR = "USE_STREAM_CAPABLE_STATE"
const val OTEL_COLLECTOR_ENDPOINT_ENV_VAR = "OTEL_COLLECTOR_ENDPOINT"

// secrets
const val AWS_ASSUME_ROLE_ACCESS_KEY_ID_ENV_VAR = "AWS_ASSUME_ROLE_ACCESS_KEY_ID"
const val AWS_ASSUME_ROLE_SECRET_ACCESS_KEY_ENV_VAR = "AWS_ASSUME_ROLE_SECRET_ACCESS_KEY"
const val WORKLOAD_API_BEARER_TOKEN_ENV_VAR = "WORKLOAD_API_BEARER_TOKEN"
const val KEYCLOAK_CLIENT_SECRET_ENV_VAR = "KEYCLOAK_CLIENT_SECRET"
const val SECRET_STORE_GCP_CREDENTIALS = "SECRET_STORE_GCP_CREDENTIALS"
const val AWS_SECRET_MANAGER_ACCESS_KEY_ID = "AWS_SECRET_MANAGER_ACCESS_KEY_ID"
const val AWS_SECRET_MANAGER_SECRET_ACCESS_KEY = "AWS_SECRET_MANAGER_SECRET_ACCESS_KEY"
const val VAULT_AUTH_TOKEN = "VAULT_AUTH_TOKEN"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.airbyte.workload.launcher.model

import io.fabric8.kubernetes.api.model.EnvVar
import io.fabric8.kubernetes.api.model.EnvVarSource

/**
* For EnvVar that directly contain their values.
*/
private fun Map.Entry<String, String>.toEnvVar() = EnvVar(this.key, this.value, null)

/**
* For EnvVar that contain a reference to their values (EnvVarSource). Usually secrets.
*/
private fun Map.Entry<String, EnvVarSource>.toRefEnvVar() = EnvVar(this.key, null, this.value)

fun Map<String, String>.toEnvVarList() = this.map { it.toEnvVar() }.toList()

fun Map<String, EnvVarSource>.toRefEnvVarList() = this.map { it.toRefEnvVar() }.toList()
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ import io.airbyte.commons.workers.config.WorkerConfigs
import io.airbyte.config.WorkloadPriority
import io.airbyte.config.WorkloadType
import io.airbyte.featureflag.Connection
import io.airbyte.featureflag.ConnectorApmEnabled
import io.airbyte.featureflag.ConnectorSidecarFetchesInputFromInit
import io.airbyte.featureflag.ContainerOrchestratorDevImage
import io.airbyte.featureflag.Context
import io.airbyte.featureflag.FeatureFlagClient
import io.airbyte.featureflag.InjectAwsSecretsToConnectorPods
import io.airbyte.featureflag.Multi
import io.airbyte.featureflag.Workspace
import io.airbyte.persistence.job.models.IntegrationLauncherConfig
import io.airbyte.persistence.job.models.ReplicationInput
import io.airbyte.workers.helper.ConnectorApmSupportHelper
import io.airbyte.workers.input.getAttemptId
import io.airbyte.workers.input.getDestinationResourceReqs
import io.airbyte.workers.input.getJobId
Expand All @@ -32,11 +28,11 @@ import io.airbyte.workers.pod.PodNameGenerator
import io.airbyte.workers.process.KubeContainerInfo
import io.airbyte.workers.process.KubePodInfo
import io.airbyte.workers.process.KubePodProcess
import io.airbyte.workers.process.Metadata.AWS_ASSUME_ROLE_EXTERNAL_ID
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workload.launcher.model.getAttemptId
import io.airbyte.workload.launcher.model.getJobId
import io.airbyte.workload.launcher.model.usesCustomConnector
import io.airbyte.workload.launcher.pods.factories.RuntimeEnvVarFactory
import io.fabric8.kubernetes.api.model.EnvVar
import io.fabric8.kubernetes.api.model.ResourceRequirements
import io.micronaut.context.annotation.Value
Expand All @@ -56,11 +52,11 @@ class PayloadKubeInputMapper(
private val podNameGenerator: PodNameGenerator,
@Value("\${airbyte.worker.job.kube.namespace}") private val namespace: String?,
@Named("orchestratorKubeContainerInfo") private val orchestratorKubeContainerInfo: KubeContainerInfo,
@Named("connectorAwsAssumedRoleSecretEnv") private val connectorAwsAssumedRoleSecretEnvList: List<EnvVar>,
@Named("replicationWorkerConfigs") private val replicationWorkerConfigs: WorkerConfigs,
@Named("checkWorkerConfigs") private val checkWorkerConfigs: WorkerConfigs,
@Named("discoverWorkerConfigs") private val discoverWorkerConfigs: WorkerConfigs,
@Named("specWorkerConfigs") private val specWorkerConfigs: WorkerConfigs,
private val runTimeEnvVarFactory: RuntimeEnvVarFactory,
private val featureFlagClient: FeatureFlagClient,
@Named("infraFlagContexts") private val contexts: List<Context>,
) {
Expand Down Expand Up @@ -128,17 +124,11 @@ class PayloadKubeInputMapper(

val sourceImage = input.sourceLauncherConfig.dockerImage
val sourceReqs = KubePodProcess.buildResourceRequirements(input.getSourceResourceReqs())
val sourceRuntimeEnvVars =
input.sourceLauncherConfig.additionalEnvironmentVariables
?.map { EnvVar(it.key, it.value, null) }
?.toList().orEmpty() + getConnectorApmEnvVars(image = sourceImage, context = buildFFContext(workspaceId = input.workspaceId))
val sourceRuntimeEnvVars = runTimeEnvVarFactory.replicationConnectorEnvVars(input.sourceLauncherConfig)

val destinationImage = input.destinationLauncherConfig.dockerImage
val destinationReqs = KubePodProcess.buildResourceRequirements(input.getDestinationResourceReqs())
val destinationRuntimeEnvVars =
input.destinationLauncherConfig.additionalEnvironmentVariables
?.map { EnvVar(it.key, it.value, null) }
?.toList().orEmpty() + getConnectorApmEnvVars(image = destinationImage, context = buildFFContext(workspaceId = input.workspaceId))
val destinationRuntimeEnvVars = runTimeEnvVarFactory.replicationConnectorEnvVars(input.destinationLauncherConfig)

val labels =
labeler.getReplicationLabels(
Expand Down Expand Up @@ -204,10 +194,7 @@ class PayloadKubeInputMapper(

val fileMap = buildCheckFileMap(workloadId, input, logPath)

val runtimeEnvVars =
resolveAwsAssumedRoleEnvVars(input.launcherConfig) +
EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.CHECK.toString(), null) +
EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null)
val runtimeEnvVars = runTimeEnvVarFactory.checkConnectorEnvVars(input.launcherConfig, workloadId)

return ConnectorKubeInput(
labeler.getCheckLabels() + sharedLabels,
Expand Down Expand Up @@ -250,10 +237,7 @@ class PayloadKubeInputMapper(

val fileMap = buildDiscoverFileMap(workloadId, input, logPath)

val runtimeEnvVars =
resolveAwsAssumedRoleEnvVars(input.launcherConfig) +
EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.DISCOVER.toString(), null) +
EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null)
val runtimeEnvVars = runTimeEnvVarFactory.discoverConnectorEnvVars(input.launcherConfig, workloadId)

return ConnectorKubeInput(
labeler.getDiscoverLabels() + sharedLabels,
Expand Down Expand Up @@ -291,11 +275,7 @@ class PayloadKubeInputMapper(

val fileMap = buildSpecFileMap(workloadId, input, logPath)

val runtimeEnvVars =
listOf(
EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.SPEC.toString(), null),
EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null),
)
val runtimeEnvVars = runTimeEnvVarFactory.specConnectorEnvVars(workloadId)

return ConnectorKubeInput(
labeler.getSpecLabels() + sharedLabels,
Expand All @@ -308,24 +288,6 @@ class PayloadKubeInputMapper(
)
}

private fun resolveAwsAssumedRoleEnvVars(launcherConfig: IntegrationLauncherConfig): List<EnvVar> {
// Only inject into connectors we own.
if (launcherConfig.isCustomConnector) {
return listOf()
}
// Only inject into enabled workspaces.
val workspaceEnabled =
launcherConfig.workspaceId != null &&
this.featureFlagClient.boolVariation(InjectAwsSecretsToConnectorPods, Workspace(launcherConfig.workspaceId))
if (!workspaceEnabled) {
return listOf()
}

val externalIdVar = EnvVar(AWS_ASSUME_ROLE_EXTERNAL_ID, launcherConfig.workspaceId.toString(), null)

return connectorAwsAssumedRoleSecretEnvList + externalIdVar
}

private fun getNodeSelectors(
usesCustomConnector: Boolean,
workerConfigs: WorkerConfigs,
Expand Down Expand Up @@ -419,22 +381,6 @@ class PayloadKubeInputMapper(
},
)
}

private fun getConnectorApmEnvVars(
image: String,
context: Context,
): List<EnvVar> {
val connectorApmEnvVars = mutableListOf<EnvVar>()
if (featureFlagClient.boolVariation(ConnectorApmEnabled, context)) {
connectorApmSupportHelper.addApmEnvVars(connectorApmEnvVars)
connectorApmSupportHelper.addServerNameAndVersionToEnvVars(image, connectorApmEnvVars)
}
return connectorApmEnvVars.toList()
}

companion object {
var connectorApmSupportHelper: ConnectorApmSupportHelper = ConnectorApmSupportHelper()
}
}

data class OrchestratorKubeInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import io.fabric8.kubernetes.api.model.EnvVar
import io.fabric8.kubernetes.api.model.ResourceRequirements
import io.fabric8.kubernetes.api.model.VolumeMount
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton
import java.util.UUID

@Singleton
class ReplicationContainerFactory(
private val orchestratorEnvFactory: OrchestratorEnvSingleton,
private val workloadSecurityContextProvider: WorkloadSecurityContextProvider,
@Named("readEnvVars") private val sourceEnvVars: List<EnvVar>,
@Named("writeEnvVars") private val destinationEnvVars: List<EnvVar>,
@Value("\${airbyte.worker.job.kube.main.container.image-pull-policy}") private val imagePullPolicy: String,
) {
fun createOrchestrator(
Expand Down Expand Up @@ -57,7 +60,7 @@ class ReplicationContainerFactory(
.withImage(image)
.withImagePullPolicy(imagePullPolicy)
.withCommand("sh", "-c", mainCommand)
.withEnv(runtimeEnvVars)
.withEnv(sourceEnvVars + runtimeEnvVars)
.withWorkingDir(SOURCE_DIR)
.withVolumeMounts(volumeMounts)
.withSecurityContext(workloadSecurityContextProvider.rootlessContainerSecurityContext())
Expand All @@ -78,7 +81,7 @@ class ReplicationContainerFactory(
.withImage(image)
.withImagePullPolicy(imagePullPolicy)
.withCommand("sh", "-c", mainCommand)
.withEnv(runtimeEnvVars)
.withEnv(destinationEnvVars + runtimeEnvVars)
.withWorkingDir(DEST_DIR)
.withVolumeMounts(volumeMounts)
.withSecurityContext(workloadSecurityContextProvider.rootlessContainerSecurityContext())
Expand Down
Loading

0 comments on commit 6361736

Please sign in to comment.