diff --git a/airbyte-workload-launcher/src/main/kotlin/config/ApplicationBeanFactory.kt b/airbyte-workload-launcher/src/main/kotlin/config/ApplicationBeanFactory.kt index 29555fde2f8..42cd5e7bf5c 100644 --- a/airbyte-workload-launcher/src/main/kotlin/config/ApplicationBeanFactory.kt +++ b/airbyte-workload-launcher/src/main/kotlin/config/ApplicationBeanFactory.kt @@ -18,6 +18,7 @@ import io.airbyte.workers.CheckConnectionInputHydrator import io.airbyte.workers.ConnectorSecretsHydrator import io.airbyte.workers.DiscoverCatalogInputHydrator import io.airbyte.workers.ReplicationInputHydrator +import io.airbyte.workers.helper.ConnectorApmSupportHelper import io.airbyte.workers.helper.ResumableFullRefreshStatsHelper import io.micrometer.core.instrument.MeterRegistry import io.micronaut.context.annotation.Factory @@ -150,4 +151,9 @@ class ApplicationBeanFactory { listOf(Geography(geography), PlaneName(dataPlaneName)) } } + + @Singleton + fun connectorApmSupportHelper(): ConnectorApmSupportHelper { + return ConnectorApmSupportHelper() + } } diff --git a/airbyte-workload-launcher/src/main/kotlin/config/EnvVarConfigBeanFactory.kt b/airbyte-workload-launcher/src/main/kotlin/config/EnvVarConfigBeanFactory.kt index b1aa45142a7..82340ed4d73 100644 --- a/airbyte-workload-launcher/src/main/kotlin/config/EnvVarConfigBeanFactory.kt +++ b/airbyte-workload-launcher/src/main/kotlin/config/EnvVarConfigBeanFactory.kt @@ -12,7 +12,9 @@ import io.airbyte.commons.workers.config.WorkerConfigs import io.airbyte.config.Configs import io.airbyte.workers.process.Metadata.AWS_ACCESS_KEY_ID import io.airbyte.workers.process.Metadata.AWS_SECRET_ACCESS_KEY -import io.airbyte.workers.workload.WorkloadConstants +import io.airbyte.workload.launcher.constants.EnvVarConstants +import io.airbyte.workload.launcher.model.toEnvVarList +import io.airbyte.workload.launcher.model.toRefEnvVarList import io.fabric8.kubernetes.api.model.EnvVar import io.fabric8.kubernetes.api.model.EnvVarSource import io.fabric8.kubernetes.api.model.SecretKeySelector @@ -26,7 +28,8 @@ import java.util.function.Consumer import io.airbyte.commons.envvar.EnvVar as AirbyteEnvVar /** - * Provides and configures the environment variables for the containers we launch. + * Provides and configures the static environment variables for the containers we launch. + * For dynamic configuration see RuntimeEnvVarFactory. */ @Factory class EnvVarConfigBeanFactory { @@ -66,15 +69,11 @@ class EnvVarConfigBeanFactory { // Add db env vars for local deployments if applicable envMap.putAll(dbEnvMap) - val envVars = - envMap - .map { EnvVar(it.key, it.value, null) } - .toList() + val envVars = envMap.toEnvVarList() val secretEnvVars = (secretsEnvMap + secretPersistenceSecretsEnvMap + awsAssumedRoleSecretEnv) - .map { EnvVar(it.key, null, it.value) } - .toList() + .toRefEnvVarList() return envVars + secretEnvVars } @@ -108,15 +107,9 @@ class EnvVarConfigBeanFactory { // Micronaut environment (secretly necessary for configuring API client auth) envMap.putAll(micronautEnvMap) - val envVars = - envMap - .map { EnvVar(it.key, it.value, null) } - .toList() + val envVars = envMap.toEnvVarList() - val secretEnvVars = - secretsEnvMap - .map { EnvVar(it.key, null, it.value) } - .toList() + val secretEnvVars = secretsEnvMap.toRefEnvVarList() return envVars + secretEnvVars } @@ -156,9 +149,7 @@ class EnvVarConfigBeanFactory { fun checkEnvVars( @Named("checkWorkerConfigs") workerConfigs: WorkerConfigs, ): List { - return workerConfigs.envMap - .map { EnvVar(it.key, it.value, null) } - .toList() + return workerConfigs.envMap.toEnvVarList() } /** @@ -169,9 +160,7 @@ class EnvVarConfigBeanFactory { fun discoverEnvVars( @Named("discoverWorkerConfigs") workerConfigs: WorkerConfigs, ): List { - return workerConfigs.envMap - .map { EnvVar(it.key, it.value, null) } - .toList() + return workerConfigs.envMap.toEnvVarList() } /** @@ -182,9 +171,31 @@ class EnvVarConfigBeanFactory { fun specEnvVars( @Named("specWorkerConfigs") workerConfigs: WorkerConfigs, ): List { - return workerConfigs.envMap - .map { EnvVar(it.key, it.value, null) } - .toList() + return workerConfigs.envMap.toEnvVarList() + } + + /** + * The list of env vars to be passed to the connector container we are reading from (the source). + */ + @Singleton + @Named("readEnvVars") + fun readEnvVars( + @Named("replicationWorkerConfigs") workerConfigs: WorkerConfigs, + @Named("featureFlagEnvVars") ffEnvVars: Map, + ): List { + return workerConfigs.envMap.toEnvVarList() + ffEnvVars.toEnvVarList() + } + + /** + * The list of env vars to be passed to the connector container we are writing to (the destination). + */ + @Singleton + @Named("writeEnvVars") + fun writeEnvVars( + @Named("replicationWorkerConfigs") workerConfigs: WorkerConfigs, + @Named("featureFlagEnvVars") ffEnvVars: Map, + ): List { + return workerConfigs.envMap.toEnvVarList() + ffEnvVars.toEnvVarList() } @Singleton @@ -197,10 +208,10 @@ class EnvVarConfigBeanFactory { ): Map { return buildMap { if (bearerTokenSecretName.isNotBlank()) { - put(WORKLOAD_API_BEARER_TOKEN_ENV_VAR, createEnvVarSource(bearerTokenSecretName, bearerTokenSecretKey)) + put(EnvVarConstants.WORKLOAD_API_BEARER_TOKEN_ENV_VAR, createEnvVarSource(bearerTokenSecretName, bearerTokenSecretKey)) } if (keycloakAuthSecretName.isNotBlank()) { - put(KEYCLOAK_CLIENT_SECRET_ENV_VAR, createEnvVarSource(keycloakAuthSecretName, keycloakAuthSecretKey)) + put(EnvVarConstants.KEYCLOAK_CLIENT_SECRET_ENV_VAR, createEnvVarSource(keycloakAuthSecretName, keycloakAuthSecretKey)) } } } @@ -217,8 +228,8 @@ class EnvVarConfigBeanFactory { ): Map { return buildMap { if (awsAssumedRoleSecretName.isNotBlank()) { - put(AWS_ASSUME_ROLE_ACCESS_KEY_ID_ENV_VAR, createEnvVarSource(awsAssumedRoleSecretName, awsAssumedRoleAccessKey)) - put(AWS_ASSUME_ROLE_SECRET_ACCESS_KEY_ENV_VAR, createEnvVarSource(awsAssumedRoleSecretName, awsAssumedRoleSecretKey)) + put(EnvVarConstants.AWS_ASSUME_ROLE_ACCESS_KEY_ID_ENV_VAR, createEnvVarSource(awsAssumedRoleSecretName, awsAssumedRoleAccessKey)) + put(EnvVarConstants.AWS_ASSUME_ROLE_SECRET_ACCESS_KEY_ENV_VAR, createEnvVarSource(awsAssumedRoleSecretName, awsAssumedRoleSecretKey)) } } } @@ -295,17 +306,17 @@ class EnvVarConfigBeanFactory { ): Map { val envMap: MutableMap = HashMap() - envMap[INTERNAL_API_HOST_ENV_VAR] = internalApiHost - envMap[AIRBYTE_API_AUTH_HEADER_NAME_ENV_VAR] = apiAuthHeaderName - envMap[AIRBYTE_API_AUTH_HEADER_VALUE_ENV_VAR] = apiAuthHeaderValue - envMap[CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR] = controlPlaneAuthEndpoint - envMap[DATA_PLANE_SERVICE_ACCOUNT_EMAIL_ENV_VAR] = dataPlaneServiceAccountEmail - envMap[DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR] = dataPlaneServiceAccountCredentialsPath - envMap[ACCEPTANCE_TEST_ENABLED_VAR] = java.lang.Boolean.toString(isInTestMode) + envMap[EnvVarConstants.INTERNAL_API_HOST_ENV_VAR] = internalApiHost + envMap[EnvVarConstants.AIRBYTE_API_AUTH_HEADER_NAME_ENV_VAR] = apiAuthHeaderName + envMap[EnvVarConstants.AIRBYTE_API_AUTH_HEADER_VALUE_ENV_VAR] = apiAuthHeaderValue + envMap[EnvVarConstants.CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR] = controlPlaneAuthEndpoint + envMap[EnvVarConstants.DATA_PLANE_SERVICE_ACCOUNT_EMAIL_ENV_VAR] = dataPlaneServiceAccountEmail + envMap[EnvVarConstants.DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR] = dataPlaneServiceAccountCredentialsPath + envMap[EnvVarConstants.ACCEPTANCE_TEST_ENABLED_VAR] = java.lang.Boolean.toString(isInTestMode) // Expected to be present in Cloud for internal api auth - envMap[KEYCLOAK_CLIENT_ID_ENV_VAR] = keycloakAuthClientId - envMap[KEYCLOAK_INTERNAL_REALM_ISSUER_ENV_VAR] = keycloakAuthOpenIdIssuer + envMap[EnvVarConstants.KEYCLOAK_CLIENT_ID_ENV_VAR] = keycloakAuthClientId + envMap[EnvVarConstants.KEYCLOAK_INTERNAL_REALM_ISSUER_ENV_VAR] = keycloakAuthOpenIdIssuer return envMap } @@ -320,9 +331,9 @@ class EnvVarConfigBeanFactory { val envMap: MutableMap = HashMap() if (System.getenv(Environment.ENVIRONMENTS_ENV) != null) { - envMap[Environment.ENVIRONMENTS_ENV] = "$WORKER_V2_MICRONAUT_ENV,${System.getenv(Environment.ENVIRONMENTS_ENV)}" + envMap[Environment.ENVIRONMENTS_ENV] = "${EnvVarConstants.WORKER_V2_MICRONAUT_ENV},${System.getenv(Environment.ENVIRONMENTS_ENV)}" } else { - envMap[Environment.ENVIRONMENTS_ENV] = WORKER_V2_MICRONAUT_ENV + envMap[Environment.ENVIRONMENTS_ENV] = EnvVarConstants.WORKER_V2_MICRONAUT_ENV } return envMap @@ -338,19 +349,21 @@ class EnvVarConfigBeanFactory { @Value("\${datadog.agent.port}") dataDogStatsdPort: String, @Value("\${airbyte.metric.client}") metricClient: String, @Value("\${airbyte.metric.should-publish}") shouldPublishMetrics: String, + @Value("\${airbyte.metric.otel-collector-endpoint}") otelCollectorEndPoint: String, @Value("\${datadog.orchestrator.disabled.integrations}") disabledIntegrations: String, ): Map { val envMap: MutableMap = HashMap() - envMap[METRIC_CLIENT_ENV_VAR] = metricClient - envMap[DD_AGENT_HOST_ENV_VAR] = dataDogAgentHost - envMap[DD_SERVICE_ENV_VAR] = "airbyte-container-orchestrator" - envMap[DD_DOGSTATSD_PORT_ENV_VAR] = dataDogStatsdPort - envMap[PUBLISH_METRICS_ENV_VAR] = shouldPublishMetrics - if (System.getenv(DD_ENV_ENV_VAR) != null) { - envMap[DD_ENV_ENV_VAR] = System.getenv(DD_ENV_ENV_VAR) + envMap[EnvVarConstants.METRIC_CLIENT_ENV_VAR] = metricClient + envMap[EnvVarConstants.DD_AGENT_HOST_ENV_VAR] = dataDogAgentHost + envMap[EnvVarConstants.DD_SERVICE_ENV_VAR] = "airbyte-container-orchestrator" + envMap[EnvVarConstants.DD_DOGSTATSD_PORT_ENV_VAR] = dataDogStatsdPort + envMap[EnvVarConstants.PUBLISH_METRICS_ENV_VAR] = shouldPublishMetrics + envMap[EnvVarConstants.OTEL_COLLECTOR_ENDPOINT_ENV_VAR] = otelCollectorEndPoint + if (System.getenv(EnvVarConstants.DD_ENV_ENV_VAR) != null) { + envMap[EnvVarConstants.DD_ENV_ENV_VAR] = System.getenv(EnvVarConstants.DD_ENV_ENV_VAR) } - if (System.getenv(DD_VERSION_ENV_VAR) != null) { - envMap[DD_VERSION_ENV_VAR] = System.getenv(DD_VERSION_ENV_VAR) + if (System.getenv(EnvVarConstants.DD_VERSION_ENV_VAR) != null) { + envMap[EnvVarConstants.DD_VERSION_ENV_VAR] = System.getenv(EnvVarConstants.DD_VERSION_ENV_VAR) } // Disable DD agent integrations based on the configuration @@ -358,7 +371,7 @@ class EnvVarConfigBeanFactory { listOf(*disabledIntegrations.split(",".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray()) .forEach( Consumer { e: String -> - envMap[String.format(DD_INTEGRATION_ENV_VAR_FORMAT, e.trim { it <= ' ' })] = java.lang.Boolean.FALSE.toString() + envMap[String.format(EnvVarConstants.DD_INTEGRATION_ENV_VAR_FORMAT, e.trim { it <= ' ' })] = java.lang.Boolean.FALSE.toString() }, ) } @@ -382,13 +395,13 @@ class EnvVarConfigBeanFactory { @Value("\${airbyte.secret.store.vault.prefix}") vaultPrefix: String, ): Map { return buildMap { - put(SECRET_PERSISTENCE, persistenceType) - put(SECRET_STORE_GCP_PROJECT_ID, gcpProjectId) - put(AWS_SECRET_MANAGER_REGION, awsRegion) - put(AWS_KMS_KEY_ARN, awsKmsKeyArn) - put(AWS_SECRET_MANAGER_SECRET_TAGS, awsTags) - put(VAULT_ADDRESS, vaultAddress) - put(VAULT_PREFIX, vaultPrefix) + put(EnvVarConstants.SECRET_PERSISTENCE, persistenceType) + put(EnvVarConstants.SECRET_STORE_GCP_PROJECT_ID, gcpProjectId) + put(EnvVarConstants.AWS_SECRET_MANAGER_REGION, awsRegion) + put(EnvVarConstants.AWS_KMS_KEY_ARN, awsKmsKeyArn) + put(EnvVarConstants.AWS_SECRET_MANAGER_SECRET_TAGS, awsTags) + put(EnvVarConstants.VAULT_ADDRESS, vaultAddress) + put(EnvVarConstants.VAULT_PREFIX, vaultPrefix) } } @@ -413,16 +426,16 @@ class EnvVarConfigBeanFactory { return buildMap { // Note: If any of the secret ref names or keys are blank kube will fail to create the pod, so we have to manually exclude empties if (gcpCredsRefName.isNotBlank() && gcpCredsRefKey.isNotBlank()) { - put(SECRET_STORE_GCP_CREDENTIALS, createEnvVarSource(gcpCredsRefName, gcpCredsRefKey)) + put(EnvVarConstants.SECRET_STORE_GCP_CREDENTIALS, createEnvVarSource(gcpCredsRefName, gcpCredsRefKey)) } if (awsAccessKeyRefName.isNotBlank() && awsAccessKeyRefKey.isNotBlank()) { - put(AWS_SECRET_MANAGER_ACCESS_KEY_ID, createEnvVarSource(awsAccessKeyRefName, awsAccessKeyRefKey)) + put(EnvVarConstants.AWS_SECRET_MANAGER_ACCESS_KEY_ID, createEnvVarSource(awsAccessKeyRefName, awsAccessKeyRefKey)) } if (awsSecretKeyRefName.isNotBlank() && awsSecretKeyRefKey.isNotBlank()) { - put(AWS_SECRET_MANAGER_SECRET_ACCESS_KEY, createEnvVarSource(awsSecretKeyRefName, awsSecretKeyRefKey)) + put(EnvVarConstants.AWS_SECRET_MANAGER_SECRET_ACCESS_KEY, createEnvVarSource(awsSecretKeyRefName, awsSecretKeyRefKey)) } if (vaultTokenRefName.isNotBlank() && vaultTokenRefKey.isNotBlank()) { - put(VAULT_AUTH_TOKEN, createEnvVarSource(vaultTokenRefName, vaultTokenRefKey)) + put(EnvVarConstants.VAULT_AUTH_TOKEN, createEnvVarSource(vaultTokenRefName, vaultTokenRefKey)) } } } @@ -440,11 +453,11 @@ class EnvVarConfigBeanFactory { @Value("\${airbyte.workload-api.base-path}") workloadApiBasePath: String, ): Map { val envMap: MutableMap = HashMap() - envMap[WORKLOAD_API_HOST_ENV_VAR] = workloadApiBasePath - envMap[WORKLOAD_API_CONNECT_TIMEOUT_SECONDS_ENV_VAR] = workloadApiConnectTimeoutSeconds - envMap[WORKLOAD_API_READ_TIMEOUT_SECONDS_ENV_VAR] = workloadApiReadTimeoutSeconds - envMap[WORKLOAD_API_RETRY_DELAY_SECONDS_ENV_VAR] = workloadApiRetriesDelaySeconds - envMap[WORKLOAD_API_MAX_RETRIES_ENV_VAR] = workloadApiRetriesMax + envMap[EnvVarConstants.WORKLOAD_API_HOST_ENV_VAR] = workloadApiBasePath + envMap[EnvVarConstants.WORKLOAD_API_CONNECT_TIMEOUT_SECONDS_ENV_VAR] = workloadApiConnectTimeoutSeconds + envMap[EnvVarConstants.WORKLOAD_API_READ_TIMEOUT_SECONDS_ENV_VAR] = workloadApiReadTimeoutSeconds + envMap[EnvVarConstants.WORKLOAD_API_RETRY_DELAY_SECONDS_ENV_VAR] = workloadApiRetriesDelaySeconds + envMap[EnvVarConstants.WORKLOAD_API_MAX_RETRIES_ENV_VAR] = workloadApiRetriesMax return envMap } @@ -468,48 +481,4 @@ class EnvVarConfigBeanFactory { AirbyteEnvVar.DATABASE_PASSWORD.toString() to dbPassword, ) } - - companion object { - private const val METRIC_CLIENT_ENV_VAR = "METRIC_CLIENT" - private const val DD_AGENT_HOST_ENV_VAR = "DD_AGENT_HOST" - private const val DD_DOGSTATSD_PORT_ENV_VAR = "DD_DOGSTATSD_PORT" - private const val DD_ENV_ENV_VAR = "DD_ENV" - private const val DD_SERVICE_ENV_VAR = "DD_SERVICE" - private const val DD_VERSION_ENV_VAR = "DD_VERSION" - const val JAVA_OPTS_ENV_VAR = "JAVA_OPTS" - private const val PUBLISH_METRICS_ENV_VAR = "PUBLISH_METRICS" - private const val CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR = "CONTROL_PLANE_AUTH_ENDPOINT" - private const val DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR = "DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH" - private const val DATA_PLANE_SERVICE_ACCOUNT_EMAIL_ENV_VAR = "DATA_PLANE_SERVICE_ACCOUNT_EMAIL" - private const val AIRBYTE_API_AUTH_HEADER_NAME_ENV_VAR = "AIRBYTE_API_AUTH_HEADER_NAME" - private const val AIRBYTE_API_AUTH_HEADER_VALUE_ENV_VAR = "AIRBYTE_API_AUTH_HEADER_VALUE" - private const val KEYCLOAK_CLIENT_ID_ENV_VAR = "KEYCLOAK_CLIENT_ID" - private const val KEYCLOAK_INTERNAL_REALM_ISSUER_ENV_VAR = "KEYCLOAK_INTERNAL_REALM_ISSUER" - private const val INTERNAL_API_HOST_ENV_VAR = "INTERNAL_API_HOST" - private const val ACCEPTANCE_TEST_ENABLED_VAR = "ACCEPTANCE_TEST_ENABLED" - private const val DD_INTEGRATION_ENV_VAR_FORMAT = "DD_INTEGRATION_%s_ENABLED" - private const val WORKER_V2_MICRONAUT_ENV = WorkloadConstants.WORKER_V2_MICRONAUT_ENV - private const val WORKLOAD_API_HOST_ENV_VAR = "WORKLOAD_API_HOST" - private const val WORKLOAD_API_CONNECT_TIMEOUT_SECONDS_ENV_VAR = "WORKLOAD_API_CONNECT_TIMEOUT_SECONDS" - private const val WORKLOAD_API_READ_TIMEOUT_SECONDS_ENV_VAR = "WORKLOAD_API_READ_TIMEOUT_SECONDS" - private const val WORKLOAD_API_RETRY_DELAY_SECONDS_ENV_VAR = "WORKLOAD_API_RETRY_DELAY_SECONDS" - private const val WORKLOAD_API_MAX_RETRIES_ENV_VAR = "WORKLOAD_API_MAX_RETRIES" - private const val SECRET_PERSISTENCE = "SECRET_PERSISTENCE" - private const val SECRET_STORE_GCP_PROJECT_ID = "SECRET_STORE_GCP_PROJECT_ID" - private const val AWS_SECRET_MANAGER_REGION = "AWS_SECRET_MANAGER_REGION" - private const val AWS_KMS_KEY_ARN = "AWS_KMS_KEY_ARN" - private const val AWS_SECRET_MANAGER_SECRET_TAGS = "AWS_SECRET_MANAGER_SECRET_TAGS" - private const val VAULT_ADDRESS = "VAULT_ADDRESS" - private const val VAULT_PREFIX = "VAULT_PREFIX" - - // secrets - const val AWS_ASSUME_ROLE_ACCESS_KEY_ID_ENV_VAR = "AWS_ASSUME_ROLE_ACCESS_KEY_ID" - const val AWS_ASSUME_ROLE_SECRET_ACCESS_KEY_ENV_VAR = "AWS_ASSUME_ROLE_SECRET_ACCESS_KEY" - const val WORKLOAD_API_BEARER_TOKEN_ENV_VAR = "WORKLOAD_API_BEARER_TOKEN" - const val KEYCLOAK_CLIENT_SECRET_ENV_VAR = "KEYCLOAK_CLIENT_SECRET" - private const val SECRET_STORE_GCP_CREDENTIALS = "SECRET_STORE_GCP_CREDENTIALS" - private const val AWS_SECRET_MANAGER_ACCESS_KEY_ID = "AWS_SECRET_MANAGER_ACCESS_KEY_ID" - private const val AWS_SECRET_MANAGER_SECRET_ACCESS_KEY = "AWS_SECRET_MANAGER_SECRET_ACCESS_KEY" - private const val VAULT_AUTH_TOKEN = "VAULT_AUTH_TOKEN" - } } diff --git a/airbyte-workload-launcher/src/main/kotlin/config/OrchestratorEnvSingleton.kt b/airbyte-workload-launcher/src/main/kotlin/config/OrchestratorEnvSingleton.kt index 99e6f2ee02c..05fff4ab16b 100644 --- a/airbyte-workload-launcher/src/main/kotlin/config/OrchestratorEnvSingleton.kt +++ b/airbyte-workload-launcher/src/main/kotlin/config/OrchestratorEnvSingleton.kt @@ -8,6 +8,7 @@ import io.airbyte.featureflag.Connection import io.airbyte.featureflag.ContainerOrchestratorJavaOpts import io.airbyte.featureflag.FeatureFlagClient import io.airbyte.workers.sync.OrchestratorConstants +import io.airbyte.workload.launcher.constants.EnvVarConstants import io.fabric8.kubernetes.api.model.EnvVar import io.fabric8.kubernetes.api.model.EnvVarSource import io.micronaut.context.annotation.Value @@ -80,9 +81,9 @@ class OrchestratorEnvSingleton( ) { val injectedJavaOpts: String = featureFlagClient.stringVariation(ContainerOrchestratorJavaOpts, Connection(connectionId)) if (injectedJavaOpts.isNotEmpty()) { - envMap[EnvVarConfigBeanFactory.JAVA_OPTS_ENV_VAR] = injectedJavaOpts.trim() + envMap[EnvVarConstants.JAVA_OPTS_ENV_VAR] = injectedJavaOpts.trim() } else { - envMap[EnvVarConfigBeanFactory.JAVA_OPTS_ENV_VAR] = containerOrchestratorJavaOpts + envMap[EnvVarConstants.JAVA_OPTS_ENV_VAR] = containerOrchestratorJavaOpts } } diff --git a/airbyte-workload-launcher/src/main/kotlin/constants/EnvVarConstants.kt b/airbyte-workload-launcher/src/main/kotlin/constants/EnvVarConstants.kt new file mode 100644 index 00000000000..74090d707b1 --- /dev/null +++ b/airbyte-workload-launcher/src/main/kotlin/constants/EnvVarConstants.kt @@ -0,0 +1,55 @@ +package io.airbyte.workload.launcher.constants + +import io.airbyte.workers.workload.WorkloadConstants + +/** + * Env var names and other string constants used to set env vars for the containers we launch. + * + * Over time, we should try to collect the env vars here. Moving this to a shared library is also likely. + */ +object EnvVarConstants { + const val METRIC_CLIENT_ENV_VAR = "METRIC_CLIENT" + const val DD_AGENT_HOST_ENV_VAR = "DD_AGENT_HOST" + const val DD_DOGSTATSD_PORT_ENV_VAR = "DD_DOGSTATSD_PORT" + const val DD_ENV_ENV_VAR = "DD_ENV" + const val DD_SERVICE_ENV_VAR = "DD_SERVICE" + const val DD_VERSION_ENV_VAR = "DD_VERSION" + const val JAVA_OPTS_ENV_VAR = "JAVA_OPTS" + const val PUBLISH_METRICS_ENV_VAR = "PUBLISH_METRICS" + const val CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR = "CONTROL_PLANE_AUTH_ENDPOINT" + const val DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR = "DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH" + const val DATA_PLANE_SERVICE_ACCOUNT_EMAIL_ENV_VAR = "DATA_PLANE_SERVICE_ACCOUNT_EMAIL" + const val AIRBYTE_API_AUTH_HEADER_NAME_ENV_VAR = "AIRBYTE_API_AUTH_HEADER_NAME" + const val AIRBYTE_API_AUTH_HEADER_VALUE_ENV_VAR = "AIRBYTE_API_AUTH_HEADER_VALUE" + const val KEYCLOAK_CLIENT_ID_ENV_VAR = "KEYCLOAK_CLIENT_ID" + const val KEYCLOAK_INTERNAL_REALM_ISSUER_ENV_VAR = "KEYCLOAK_INTERNAL_REALM_ISSUER" + const val INTERNAL_API_HOST_ENV_VAR = "INTERNAL_API_HOST" + const val ACCEPTANCE_TEST_ENABLED_VAR = "ACCEPTANCE_TEST_ENABLED" + const val DD_INTEGRATION_ENV_VAR_FORMAT = "DD_INTEGRATION_%s_ENABLED" + const val WORKER_V2_MICRONAUT_ENV = WorkloadConstants.WORKER_V2_MICRONAUT_ENV + const val WORKLOAD_API_HOST_ENV_VAR = "WORKLOAD_API_HOST" + const val WORKLOAD_API_CONNECT_TIMEOUT_SECONDS_ENV_VAR = "WORKLOAD_API_CONNECT_TIMEOUT_SECONDS" + const val WORKLOAD_API_READ_TIMEOUT_SECONDS_ENV_VAR = "WORKLOAD_API_READ_TIMEOUT_SECONDS" + const val WORKLOAD_API_RETRY_DELAY_SECONDS_ENV_VAR = "WORKLOAD_API_RETRY_DELAY_SECONDS" + const val WORKLOAD_API_MAX_RETRIES_ENV_VAR = "WORKLOAD_API_MAX_RETRIES" + const val SECRET_PERSISTENCE = "SECRET_PERSISTENCE" + const val SECRET_STORE_GCP_PROJECT_ID = "SECRET_STORE_GCP_PROJECT_ID" + const val AWS_SECRET_MANAGER_REGION = "AWS_SECRET_MANAGER_REGION" + const val AWS_KMS_KEY_ARN = "AWS_KMS_KEY_ARN" + const val AWS_SECRET_MANAGER_SECRET_TAGS = "AWS_SECRET_MANAGER_SECRET_TAGS" + const val VAULT_ADDRESS = "VAULT_ADDRESS" + const val VAULT_PREFIX = "VAULT_PREFIX" + const val CONCURRENT_SOURCE_STREAM_READ_ENV_VAR = "CONCURRENT_SOURCE_STREAM_READ" + const val USE_STREAM_CAPABLE_STATE_ENV_VAR = "USE_STREAM_CAPABLE_STATE" + const val OTEL_COLLECTOR_ENDPOINT_ENV_VAR = "OTEL_COLLECTOR_ENDPOINT" + + // secrets + const val AWS_ASSUME_ROLE_ACCESS_KEY_ID_ENV_VAR = "AWS_ASSUME_ROLE_ACCESS_KEY_ID" + const val AWS_ASSUME_ROLE_SECRET_ACCESS_KEY_ENV_VAR = "AWS_ASSUME_ROLE_SECRET_ACCESS_KEY" + const val WORKLOAD_API_BEARER_TOKEN_ENV_VAR = "WORKLOAD_API_BEARER_TOKEN" + const val KEYCLOAK_CLIENT_SECRET_ENV_VAR = "KEYCLOAK_CLIENT_SECRET" + const val SECRET_STORE_GCP_CREDENTIALS = "SECRET_STORE_GCP_CREDENTIALS" + const val AWS_SECRET_MANAGER_ACCESS_KEY_ID = "AWS_SECRET_MANAGER_ACCESS_KEY_ID" + const val AWS_SECRET_MANAGER_SECRET_ACCESS_KEY = "AWS_SECRET_MANAGER_SECRET_ACCESS_KEY" + const val VAULT_AUTH_TOKEN = "VAULT_AUTH_TOKEN" +} diff --git a/airbyte-workload-launcher/src/main/kotlin/model/MapEnvVarExtensions.kt b/airbyte-workload-launcher/src/main/kotlin/model/MapEnvVarExtensions.kt new file mode 100644 index 00000000000..98f193fa47a --- /dev/null +++ b/airbyte-workload-launcher/src/main/kotlin/model/MapEnvVarExtensions.kt @@ -0,0 +1,18 @@ +package io.airbyte.workload.launcher.model + +import io.fabric8.kubernetes.api.model.EnvVar +import io.fabric8.kubernetes.api.model.EnvVarSource + +/** + * For EnvVar that directly contain their values. + */ +private fun Map.Entry.toEnvVar() = EnvVar(this.key, this.value, null) + +/** + * For EnvVar that contain a reference to their values (EnvVarSource). Usually secrets. + */ +private fun Map.Entry.toRefEnvVar() = EnvVar(this.key, null, this.value) + +fun Map.toEnvVarList() = this.map { it.toEnvVar() }.toList() + +fun Map.toRefEnvVarList() = this.map { it.toRefEnvVar() }.toList() diff --git a/airbyte-workload-launcher/src/main/kotlin/pods/PayloadKubeInputMapper.kt b/airbyte-workload-launcher/src/main/kotlin/pods/PayloadKubeInputMapper.kt index 9499e1c7a8f..f60e66e5182 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pods/PayloadKubeInputMapper.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pods/PayloadKubeInputMapper.kt @@ -4,17 +4,13 @@ import io.airbyte.commons.workers.config.WorkerConfigs import io.airbyte.config.WorkloadPriority import io.airbyte.config.WorkloadType import io.airbyte.featureflag.Connection -import io.airbyte.featureflag.ConnectorApmEnabled import io.airbyte.featureflag.ConnectorSidecarFetchesInputFromInit import io.airbyte.featureflag.ContainerOrchestratorDevImage import io.airbyte.featureflag.Context import io.airbyte.featureflag.FeatureFlagClient -import io.airbyte.featureflag.InjectAwsSecretsToConnectorPods import io.airbyte.featureflag.Multi import io.airbyte.featureflag.Workspace -import io.airbyte.persistence.job.models.IntegrationLauncherConfig import io.airbyte.persistence.job.models.ReplicationInput -import io.airbyte.workers.helper.ConnectorApmSupportHelper import io.airbyte.workers.input.getAttemptId import io.airbyte.workers.input.getDestinationResourceReqs import io.airbyte.workers.input.getJobId @@ -32,11 +28,11 @@ import io.airbyte.workers.pod.PodNameGenerator import io.airbyte.workers.process.KubeContainerInfo import io.airbyte.workers.process.KubePodInfo import io.airbyte.workers.process.KubePodProcess -import io.airbyte.workers.process.Metadata.AWS_ASSUME_ROLE_EXTERNAL_ID import io.airbyte.workers.serde.ObjectSerializer import io.airbyte.workload.launcher.model.getAttemptId import io.airbyte.workload.launcher.model.getJobId import io.airbyte.workload.launcher.model.usesCustomConnector +import io.airbyte.workload.launcher.pods.factories.RuntimeEnvVarFactory import io.fabric8.kubernetes.api.model.EnvVar import io.fabric8.kubernetes.api.model.ResourceRequirements import io.micronaut.context.annotation.Value @@ -56,11 +52,11 @@ class PayloadKubeInputMapper( private val podNameGenerator: PodNameGenerator, @Value("\${airbyte.worker.job.kube.namespace}") private val namespace: String?, @Named("orchestratorKubeContainerInfo") private val orchestratorKubeContainerInfo: KubeContainerInfo, - @Named("connectorAwsAssumedRoleSecretEnv") private val connectorAwsAssumedRoleSecretEnvList: List, @Named("replicationWorkerConfigs") private val replicationWorkerConfigs: WorkerConfigs, @Named("checkWorkerConfigs") private val checkWorkerConfigs: WorkerConfigs, @Named("discoverWorkerConfigs") private val discoverWorkerConfigs: WorkerConfigs, @Named("specWorkerConfigs") private val specWorkerConfigs: WorkerConfigs, + private val runTimeEnvVarFactory: RuntimeEnvVarFactory, private val featureFlagClient: FeatureFlagClient, @Named("infraFlagContexts") private val contexts: List, ) { @@ -128,17 +124,11 @@ class PayloadKubeInputMapper( val sourceImage = input.sourceLauncherConfig.dockerImage val sourceReqs = KubePodProcess.buildResourceRequirements(input.getSourceResourceReqs()) - val sourceRuntimeEnvVars = - input.sourceLauncherConfig.additionalEnvironmentVariables - ?.map { EnvVar(it.key, it.value, null) } - ?.toList().orEmpty() + getConnectorApmEnvVars(image = sourceImage, context = buildFFContext(workspaceId = input.workspaceId)) + val sourceRuntimeEnvVars = runTimeEnvVarFactory.replicationConnectorEnvVars(input.sourceLauncherConfig) val destinationImage = input.destinationLauncherConfig.dockerImage val destinationReqs = KubePodProcess.buildResourceRequirements(input.getDestinationResourceReqs()) - val destinationRuntimeEnvVars = - input.destinationLauncherConfig.additionalEnvironmentVariables - ?.map { EnvVar(it.key, it.value, null) } - ?.toList().orEmpty() + getConnectorApmEnvVars(image = destinationImage, context = buildFFContext(workspaceId = input.workspaceId)) + val destinationRuntimeEnvVars = runTimeEnvVarFactory.replicationConnectorEnvVars(input.destinationLauncherConfig) val labels = labeler.getReplicationLabels( @@ -204,10 +194,7 @@ class PayloadKubeInputMapper( val fileMap = buildCheckFileMap(workloadId, input, logPath) - val runtimeEnvVars = - resolveAwsAssumedRoleEnvVars(input.launcherConfig) + - EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.CHECK.toString(), null) + - EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null) + val runtimeEnvVars = runTimeEnvVarFactory.checkConnectorEnvVars(input.launcherConfig, workloadId) return ConnectorKubeInput( labeler.getCheckLabels() + sharedLabels, @@ -250,10 +237,7 @@ class PayloadKubeInputMapper( val fileMap = buildDiscoverFileMap(workloadId, input, logPath) - val runtimeEnvVars = - resolveAwsAssumedRoleEnvVars(input.launcherConfig) + - EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.DISCOVER.toString(), null) + - EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null) + val runtimeEnvVars = runTimeEnvVarFactory.discoverConnectorEnvVars(input.launcherConfig, workloadId) return ConnectorKubeInput( labeler.getDiscoverLabels() + sharedLabels, @@ -291,11 +275,7 @@ class PayloadKubeInputMapper( val fileMap = buildSpecFileMap(workloadId, input, logPath) - val runtimeEnvVars = - listOf( - EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.SPEC.toString(), null), - EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null), - ) + val runtimeEnvVars = runTimeEnvVarFactory.specConnectorEnvVars(workloadId) return ConnectorKubeInput( labeler.getSpecLabels() + sharedLabels, @@ -308,24 +288,6 @@ class PayloadKubeInputMapper( ) } - private fun resolveAwsAssumedRoleEnvVars(launcherConfig: IntegrationLauncherConfig): List { - // Only inject into connectors we own. - if (launcherConfig.isCustomConnector) { - return listOf() - } - // Only inject into enabled workspaces. - val workspaceEnabled = - launcherConfig.workspaceId != null && - this.featureFlagClient.boolVariation(InjectAwsSecretsToConnectorPods, Workspace(launcherConfig.workspaceId)) - if (!workspaceEnabled) { - return listOf() - } - - val externalIdVar = EnvVar(AWS_ASSUME_ROLE_EXTERNAL_ID, launcherConfig.workspaceId.toString(), null) - - return connectorAwsAssumedRoleSecretEnvList + externalIdVar - } - private fun getNodeSelectors( usesCustomConnector: Boolean, workerConfigs: WorkerConfigs, @@ -419,22 +381,6 @@ class PayloadKubeInputMapper( }, ) } - - private fun getConnectorApmEnvVars( - image: String, - context: Context, - ): List { - val connectorApmEnvVars = mutableListOf() - if (featureFlagClient.boolVariation(ConnectorApmEnabled, context)) { - connectorApmSupportHelper.addApmEnvVars(connectorApmEnvVars) - connectorApmSupportHelper.addServerNameAndVersionToEnvVars(image, connectorApmEnvVars) - } - return connectorApmEnvVars.toList() - } - - companion object { - var connectorApmSupportHelper: ConnectorApmSupportHelper = ConnectorApmSupportHelper() - } } data class OrchestratorKubeInput( diff --git a/airbyte-workload-launcher/src/main/kotlin/pods/factories/ReplicationContainerFactory.kt b/airbyte-workload-launcher/src/main/kotlin/pods/factories/ReplicationContainerFactory.kt index 23708476efc..48a76672255 100644 --- a/airbyte-workload-launcher/src/main/kotlin/pods/factories/ReplicationContainerFactory.kt +++ b/airbyte-workload-launcher/src/main/kotlin/pods/factories/ReplicationContainerFactory.kt @@ -13,6 +13,7 @@ import io.fabric8.kubernetes.api.model.EnvVar import io.fabric8.kubernetes.api.model.ResourceRequirements import io.fabric8.kubernetes.api.model.VolumeMount import io.micronaut.context.annotation.Value +import jakarta.inject.Named import jakarta.inject.Singleton import java.util.UUID @@ -20,6 +21,8 @@ import java.util.UUID class ReplicationContainerFactory( private val orchestratorEnvFactory: OrchestratorEnvSingleton, private val workloadSecurityContextProvider: WorkloadSecurityContextProvider, + @Named("readEnvVars") private val sourceEnvVars: List, + @Named("writeEnvVars") private val destinationEnvVars: List, @Value("\${airbyte.worker.job.kube.main.container.image-pull-policy}") private val imagePullPolicy: String, ) { fun createOrchestrator( @@ -57,7 +60,7 @@ class ReplicationContainerFactory( .withImage(image) .withImagePullPolicy(imagePullPolicy) .withCommand("sh", "-c", mainCommand) - .withEnv(runtimeEnvVars) + .withEnv(sourceEnvVars + runtimeEnvVars) .withWorkingDir(SOURCE_DIR) .withVolumeMounts(volumeMounts) .withSecurityContext(workloadSecurityContextProvider.rootlessContainerSecurityContext()) @@ -78,7 +81,7 @@ class ReplicationContainerFactory( .withImage(image) .withImagePullPolicy(imagePullPolicy) .withCommand("sh", "-c", mainCommand) - .withEnv(runtimeEnvVars) + .withEnv(destinationEnvVars + runtimeEnvVars) .withWorkingDir(DEST_DIR) .withVolumeMounts(volumeMounts) .withSecurityContext(workloadSecurityContextProvider.rootlessContainerSecurityContext()) diff --git a/airbyte-workload-launcher/src/main/kotlin/pods/factories/RuntimeEnvVarFactory.kt b/airbyte-workload-launcher/src/main/kotlin/pods/factories/RuntimeEnvVarFactory.kt new file mode 100644 index 00000000000..f73474a824c --- /dev/null +++ b/airbyte-workload-launcher/src/main/kotlin/pods/factories/RuntimeEnvVarFactory.kt @@ -0,0 +1,144 @@ +package io.airbyte.workload.launcher.pods.factories + +import com.google.common.annotations.VisibleForTesting +import io.airbyte.config.WorkerEnvConstants +import io.airbyte.config.WorkloadType +import io.airbyte.featureflag.ANONYMOUS +import io.airbyte.featureflag.ConcurrentSourceStreamRead +import io.airbyte.featureflag.Connection +import io.airbyte.featureflag.ConnectorApmEnabled +import io.airbyte.featureflag.Context +import io.airbyte.featureflag.FeatureFlagClient +import io.airbyte.featureflag.InjectAwsSecretsToConnectorPods +import io.airbyte.featureflag.Workspace +import io.airbyte.persistence.job.models.IntegrationLauncherConfig +import io.airbyte.workers.helper.ConnectorApmSupportHelper +import io.airbyte.workers.process.Metadata.AWS_ASSUME_ROLE_EXTERNAL_ID +import io.airbyte.workload.launcher.constants.EnvVarConstants +import io.airbyte.workload.launcher.model.toEnvVarList +import io.fabric8.kubernetes.api.model.EnvVar +import jakarta.inject.Named +import jakarta.inject.Singleton +import java.util.UUID +import io.airbyte.commons.envvar.EnvVar as AirbyteEnvVar + +/** + * Performs dynamic mapping of config to env vars based on runtime inputs. + * For static stat time configuration see EnvVarConfigBeanFactory. + */ +@Singleton +class RuntimeEnvVarFactory( + @Named("connectorAwsAssumedRoleSecretEnv") private val connectorAwsAssumedRoleSecretEnvList: List, + private val connectorApmSupportHelper: ConnectorApmSupportHelper, + private val featureFlagClient: FeatureFlagClient, +) { + fun replicationConnectorEnvVars(launcherConfig: IntegrationLauncherConfig): List { + val awsEnvVars = resolveAwsAssumedRoleEnvVars(launcherConfig) + val apmEnvVars = getConnectorApmEnvVars(launcherConfig.dockerImage, Workspace(launcherConfig.workspaceId)) + val configurationEnvVars = getConfigurationEnvVars(launcherConfig.dockerImage, launcherConfig.connectionId ?: ANONYMOUS) + val metadataEnvVars = getMetadataEnvVars(launcherConfig) + val configPassThroughEnv = launcherConfig.additionalEnvironmentVariables?.toEnvVarList().orEmpty() + + return awsEnvVars + apmEnvVars + configurationEnvVars + metadataEnvVars + configPassThroughEnv + } + + fun checkConnectorEnvVars( + launcherConfig: IntegrationLauncherConfig, + workloadId: String, + ): List { + return resolveAwsAssumedRoleEnvVars(launcherConfig) + + EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.CHECK.toString(), null) + + EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null) + } + + fun discoverConnectorEnvVars( + launcherConfig: IntegrationLauncherConfig, + workloadId: String, + ): List { + return resolveAwsAssumedRoleEnvVars(launcherConfig) + + EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.DISCOVER.toString(), null) + + EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null) + } + + fun specConnectorEnvVars(workloadId: String): List { + return listOf( + EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.SPEC.toString(), null), + EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null), + ) + } + + /** + * Env vars to enable APM metrics for the connector if enabled. + */ + @VisibleForTesting + internal fun getConnectorApmEnvVars( + image: String, + context: Context, + ): List { + val connectorApmEnvVars = mutableListOf() + if (featureFlagClient.boolVariation(ConnectorApmEnabled, context)) { + connectorApmSupportHelper.addApmEnvVars(connectorApmEnvVars) + connectorApmSupportHelper.addServerNameAndVersionToEnvVars(image, connectorApmEnvVars) + } + return connectorApmEnvVars.toList() + } + + /** + * Metadata env vars. Unsure of purpose. Copied from AirbyteIntegrationLauncher. + */ + @VisibleForTesting + internal fun getMetadataEnvVars(launcherConfig: IntegrationLauncherConfig): List { + return listOf( + EnvVar(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, launcherConfig.dockerImage, null), + EnvVar(WorkerEnvConstants.WORKER_JOB_ID, launcherConfig.jobId, null), + EnvVar(WorkerEnvConstants.WORKER_JOB_ATTEMPT, launcherConfig.attemptId.toString(), null), + ) + } + + /** + * These theoretically configure runtime connector behavior. Copied from AirbyteIntegrationLauncher. + * Unsure if still necessary. + */ + @VisibleForTesting + internal fun getConfigurationEnvVars( + dockerImage: String, + connectionId: UUID, + ): List { + val envVars = mutableListOf() + envVars.add(EnvVar(EnvVarConstants.USE_STREAM_CAPABLE_STATE_ENV_VAR, true.toString(), null)) + + val concurrentSourceStreamReadEnabled = + dockerImage.startsWith(MYSQL_SOURCE_NAME) && + featureFlagClient.boolVariation(ConcurrentSourceStreamRead, Connection(connectionId)) + + envVars.add(EnvVar(EnvVarConstants.CONCURRENT_SOURCE_STREAM_READ_ENV_VAR, concurrentSourceStreamReadEnabled.toString(), null)) + + return envVars + } + + /** + * Conditionally adds AWS assumed role env vars for use by connector pods. + */ + @VisibleForTesting + internal fun resolveAwsAssumedRoleEnvVars(launcherConfig: IntegrationLauncherConfig): List { + // Only inject into connectors we own. + if (launcherConfig.isCustomConnector) { + return listOf() + } + // Only inject into enabled workspaces. + val workspaceEnabled = + launcherConfig.workspaceId != null && + this.featureFlagClient.boolVariation(InjectAwsSecretsToConnectorPods, Workspace(launcherConfig.workspaceId)) + if (!workspaceEnabled) { + return listOf() + } + + val externalIdVar = EnvVar(AWS_ASSUME_ROLE_EXTERNAL_ID, launcherConfig.workspaceId.toString(), null) + + return connectorAwsAssumedRoleSecretEnvList + externalIdVar + } + + companion object { + const val MYSQL_SOURCE_NAME = "airbyte/source-mysql" + } +} diff --git a/airbyte-workload-launcher/src/main/resources/application.yml b/airbyte-workload-launcher/src/main/resources/application.yml index 46304234292..94988b1f7b0 100644 --- a/airbyte-workload-launcher/src/main/resources/application.yml +++ b/airbyte-workload-launcher/src/main/resources/application.yml @@ -187,6 +187,7 @@ airbyte: metric: client: ${METRIC_CLIENT:} should-publish: ${PUBLISH_METRICS:false} + otel-collector-endpoint: ${OTEL_COLLECTOR_ENDPOINT:} internal-api: auth-header: name: ${AIRBYTE_API_AUTH_HEADER_NAME:} diff --git a/airbyte-workload-launcher/src/test/kotlin/config/EnvVarConfigBeanFactoryTest.kt b/airbyte-workload-launcher/src/test/kotlin/config/EnvVarConfigBeanFactoryTest.kt index 00f3ae72a98..bb05c5f445a 100644 --- a/airbyte-workload-launcher/src/test/kotlin/config/EnvVarConfigBeanFactoryTest.kt +++ b/airbyte-workload-launcher/src/test/kotlin/config/EnvVarConfigBeanFactoryTest.kt @@ -7,11 +7,11 @@ package config import io.airbyte.workers.process.Metadata.AWS_ACCESS_KEY_ID import io.airbyte.workers.process.Metadata.AWS_SECRET_ACCESS_KEY import io.airbyte.workload.launcher.config.EnvVarConfigBeanFactory -import io.airbyte.workload.launcher.config.EnvVarConfigBeanFactory.Companion.AWS_ASSUME_ROLE_ACCESS_KEY_ID_ENV_VAR -import io.airbyte.workload.launcher.config.EnvVarConfigBeanFactory.Companion.AWS_ASSUME_ROLE_SECRET_ACCESS_KEY_ENV_VAR -import io.airbyte.workload.launcher.config.EnvVarConfigBeanFactory.Companion.KEYCLOAK_CLIENT_SECRET_ENV_VAR -import io.airbyte.workload.launcher.config.EnvVarConfigBeanFactory.Companion.WORKLOAD_API_BEARER_TOKEN_ENV_VAR import io.airbyte.workload.launcher.config.OrchestratorEnvSingleton +import io.airbyte.workload.launcher.constants.EnvVarConstants.AWS_ASSUME_ROLE_ACCESS_KEY_ID_ENV_VAR +import io.airbyte.workload.launcher.constants.EnvVarConstants.AWS_ASSUME_ROLE_SECRET_ACCESS_KEY_ENV_VAR +import io.airbyte.workload.launcher.constants.EnvVarConstants.KEYCLOAK_CLIENT_SECRET_ENV_VAR +import io.airbyte.workload.launcher.constants.EnvVarConstants.WORKLOAD_API_BEARER_TOKEN_ENV_VAR import io.fabric8.kubernetes.api.model.EnvVarSource import io.fabric8.kubernetes.api.model.SecretKeySelector import io.mockk.every diff --git a/airbyte-workload-launcher/src/test/kotlin/model/MapEnvVarExtensionsTest.kt b/airbyte-workload-launcher/src/test/kotlin/model/MapEnvVarExtensionsTest.kt new file mode 100644 index 00000000000..6d70ab851c0 --- /dev/null +++ b/airbyte-workload-launcher/src/test/kotlin/model/MapEnvVarExtensionsTest.kt @@ -0,0 +1,85 @@ +package io.airbyte.workload.launcher.model + +import io.fabric8.kubernetes.api.model.EnvVar +import io.fabric8.kubernetes.api.model.EnvVarSource +import io.fabric8.kubernetes.api.model.SecretKeySelector +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource +import java.util.stream.Stream + +class MapEnvVarExtensionsTest { + @ParameterizedTest + @MethodSource("envVarMatrix") + fun `converts maps of string keys and values to fabric8 EnvVar`(envMap: Map) { + val result = envMap.toEnvVarList() + + assertEquals(envMap.size, result.size) + envMap.entries.forEach { + assertTrue(result.contains(EnvVar(it.key, it.value, null))) + } + } + + @ParameterizedTest + @MethodSource("refEnvVarMatrix") + fun `converts maps of string key and EnvVarSource values to fabric8 EnvVar`(envMap: Map) { + val result = envMap.toRefEnvVarList() + + assertEquals(envMap.size, result.size) + envMap.entries.forEach { + assertTrue(result.contains(EnvVar(it.key, null, it.value))) + } + } + + companion object { + @JvmStatic + private fun envVarMatrix(): Stream { + return Stream.of( + Arguments.of(mapOf("cat" to "dog", "asdf" to "ghjk")), + Arguments.of(mapOf("asdf" to "ghjk")), + Arguments.of(mapOf()), + ) + } + + @JvmStatic + private fun refEnvVarMatrix(): Stream { + return Stream.of( + Arguments.of( + mapOf( + "cat" to + EnvVarSource().apply { + secretKeyRef = + SecretKeySelector().apply { + name = "foo" + key = "barr" + } + }, + "asdf" to + EnvVarSource().apply { + secretKeyRef = + SecretKeySelector().apply { + name = "baz" + key = "buzz" + } + }, + ), + ), + Arguments.of( + mapOf( + "asdf" to + EnvVarSource().apply { + secretKeyRef = + SecretKeySelector().apply { + name = "baz" + key = "buzz" + } + }, + ), + ), + Arguments.of(mapOf()), + ) + } + } +} diff --git a/airbyte-workload-launcher/src/test/kotlin/pods/PayloadKubeInputMapperTest.kt b/airbyte-workload-launcher/src/test/kotlin/pods/PayloadKubeInputMapperTest.kt index d20a7606542..9b9cea51c8b 100644 --- a/airbyte-workload-launcher/src/test/kotlin/pods/PayloadKubeInputMapperTest.kt +++ b/airbyte-workload-launcher/src/test/kotlin/pods/PayloadKubeInputMapperTest.kt @@ -1,7 +1,6 @@ package io.airbyte.workload.launcher.pods import com.fasterxml.jackson.databind.JsonNode -import io.airbyte.commons.constants.WorkerConstants import io.airbyte.commons.workers.config.WorkerConfigs import io.airbyte.config.ActorType import io.airbyte.config.ResourceRequirements @@ -12,7 +11,6 @@ import io.airbyte.config.WorkloadType import io.airbyte.featureflag.ConnectorApmEnabled import io.airbyte.featureflag.ConnectorSidecarFetchesInputFromInit import io.airbyte.featureflag.ContainerOrchestratorDevImage -import io.airbyte.featureflag.InjectAwsSecretsToConnectorPods import io.airbyte.featureflag.TestClient import io.airbyte.persistence.job.models.IntegrationLauncherConfig import io.airbyte.persistence.job.models.JobRunConfig @@ -33,11 +31,11 @@ import io.airbyte.workers.pod.PodNameGenerator import io.airbyte.workers.process.KubeContainerInfo import io.airbyte.workers.process.KubePodInfo import io.airbyte.workers.process.KubePodProcess -import io.airbyte.workers.process.Metadata.AWS_ASSUME_ROLE_EXTERNAL_ID import io.airbyte.workers.serde.ObjectSerializer import io.airbyte.workload.launcher.model.getActorType import io.airbyte.workload.launcher.model.getAttemptId import io.airbyte.workload.launcher.model.getJobId +import io.airbyte.workload.launcher.pods.factories.RuntimeEnvVarFactory import io.fabric8.kubernetes.api.model.EnvVar import io.mockk.every import io.mockk.mockk @@ -58,10 +56,10 @@ class PayloadKubeInputMapperTest { fun `builds a kube input from a replication payload (orchestrator)`(customConnector: Boolean) { val serializer: ObjectSerializer = mockk() val labeler: PodLabeler = mockk() + val envVarFactory: RuntimeEnvVarFactory = mockk() val namespace = "test-namespace" val podNameGenerator = PodNameGenerator(namespace = namespace) val containerInfo = KubeContainerInfo("img-name", "pull-policy") - val awsAssumedRoleEnv: List = listOf() val replSelectors = mapOf("test-selector" to "normal-repl") val replCustomSelectors = mapOf("test-selector" to "custom-repl") val checkConfigs: WorkerConfigs = mockk() @@ -81,11 +79,11 @@ class PayloadKubeInputMapperTest { podNameGenerator, namespace, containerInfo, - awsAssumedRoleEnv, replConfigs, checkConfigs, discoverConfigs, specConfigs, + envVarFactory, ffClient, listOf(), ) @@ -146,7 +144,7 @@ class PayloadKubeInputMapperTest { val podNameGenerator: PodNameGenerator = mockk() every { podNameGenerator.getReplicationPodName(any(), any()) } returns podName val containerInfo = KubeContainerInfo("img-name", "pull-policy") - val awsAssumedRoleEnv: List = listOf() + val envVarFactory: RuntimeEnvVarFactory = mockk() val replSelectors = mapOf("test-selector" to "normal-repl") val replCustomSelectors = mapOf("test-selector" to "custom-repl") val checkConfigs: WorkerConfigs = mockk() @@ -168,11 +166,11 @@ class PayloadKubeInputMapperTest { podNameGenerator, namespace, containerInfo, - awsAssumedRoleEnv, replConfigs, checkConfigs, discoverConfigs, specConfigs, + envVarFactory, ffClient, listOf(), ) @@ -195,59 +193,10 @@ class PayloadKubeInputMapperTest { .withMemoryRequest("300Mi") val srcLauncherConfig = IntegrationLauncherConfig() - .withAdditionalEnvironmentVariables(mapOf("env-1" to "val-1", "env-2" to "val-2")) .withDockerImage("src-docker-img") val destLauncherConfig = IntegrationLauncherConfig() .withDockerImage("dest-docker-img") - .withAdditionalEnvironmentVariables(mapOf("env-3" to "val-3")) - - every { input.getJobId() } returns jobId - every { input.getAttemptId() } returns attemptId - every { input.getOrchestratorResourceReqs() } returns resourceReqs1 - every { input.getSourceResourceReqs() } returns resourceReqs2 - every { input.getDestinationResourceReqs() } returns resourceReqs3 - every { input.usesCustomConnector() } returns customConnector - every { input.jobRunConfig } returns mockk() - every { input.sourceLauncherConfig } returns srcLauncherConfig - every { input.destinationLauncherConfig } returns destLauncherConfig - every { input.connectionId } returns mockk() - every { input.workspaceId } returns mockk() - - val mockSerializedOutput = "Serialized Obj." - every { serializer.serialize(any()) } returns mockSerializedOutput - - val replLabels = mapOf("orchestrator" to "labels") - val sharedLabels = mapOf("pass through" to "labels") - every { - labeler.getReplicationLabels( - containerInfo.image, - srcLauncherConfig.dockerImage, - destLauncherConfig.dockerImage, - ) - } returns replLabels - val workloadId = UUID.randomUUID().toString() - val result = mapper.toReplicationKubeInput(workloadId, input, sharedLabels) - - assertEquals(podName, result.podName) - assertEquals(replLabels + sharedLabels, result.labels) - assertEquals(if (customConnector) replCustomSelectors else replSelectors, result.nodeSelectors) - assertEquals(annotations, result.annotations) - assertEquals(containerInfo.image, result.orchestratorImage) - assertEquals(srcLauncherConfig.dockerImage, result.sourceImage) - assertEquals(destLauncherConfig.dockerImage, result.destinationImage) - assertEquals(KubePodProcess.buildResourceRequirements(resourceReqs1), result.orchestratorReqs) - assertEquals(KubePodProcess.buildResourceRequirements(resourceReqs2), result.sourceReqs) - assertEquals(KubePodProcess.buildResourceRequirements(resourceReqs3), result.destinationReqs) - - val expectedOrchestratorRuntimeEnvVars = - listOf( - EnvVar(AirbyteEnvVar.MONO_POD.toString(), true.toString(), null), - EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.SYNC.toString(), null), - EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null), - EnvVar(AirbyteEnvVar.JOB_ID.toString(), jobId, null), - EnvVar(AirbyteEnvVar.ATTEMPT_ID.toString(), attemptId.toString(), null), - ) val expectedSrcRuntimeEnvVars = listOf( EnvVar("env-1", "val-1", null), @@ -257,80 +206,8 @@ class PayloadKubeInputMapperTest { listOf( EnvVar("env-3", "val-3", null), ) - - assertEquals(expectedOrchestratorRuntimeEnvVars, result.orchestratorRuntimeEnvVars) - assertEquals(expectedSrcRuntimeEnvVars, result.sourceRuntimeEnvVars) - assertEquals(expectedDestRuntimeEnvVars, result.destinationRuntimeEnvVars) - } - - @ParameterizedTest - @ValueSource(booleans = [true, false]) - fun `builds a kube input from a replication payload with apm support (mono-pod)`(customConnector: Boolean) { - val imageVersion = "1.2.3" - val destinationImageName = "dest-docker-img" - val sourceImageName = "src-docker-img" - val serializer: ObjectSerializer = mockk() - val labeler: PodLabeler = mockk() - val namespace = "test-namespace" - val podName = "a-repl-pod" - val podNameGenerator: PodNameGenerator = mockk() - every { podNameGenerator.getReplicationPodName(any(), any()) } returns podName - val containerInfo = KubeContainerInfo("img-name", "pull-policy") - val awsAssumedRoleEnv: List = listOf() - val replSelectors = mapOf("test-selector" to "normal-repl") - val replCustomSelectors = mapOf("test-selector" to "custom-repl") - val checkConfigs: WorkerConfigs = mockk() - val discoverConfigs: WorkerConfigs = mockk() - val specConfigs: WorkerConfigs = mockk() - val replConfigs: WorkerConfigs = mockk() - every { replConfigs.getworkerKubeNodeSelectors() } returns replSelectors - every { replConfigs.workerIsolatedKubeNodeSelectors } returns Optional.of(replCustomSelectors) - val annotations = mapOf("annotation" to "value2") - every { replConfigs.workerKubeAnnotations } returns annotations - val ffClient: TestClient = mockk() - every { ffClient.stringVariation(ContainerOrchestratorDevImage, any()) } returns "" - every { ffClient.boolVariation(ConnectorApmEnabled, any()) } returns true - - val mapper = - PayloadKubeInputMapper( - serializer, - labeler, - podNameGenerator, - namespace, - containerInfo, - awsAssumedRoleEnv, - replConfigs, - checkConfigs, - discoverConfigs, - specConfigs, - ffClient, - listOf(), - ) - val input: ReplicationInput = mockk() - - mockkStatic("io.airbyte.workers.input.ReplicationInputExtensionsKt") - val jobId = "415" - val attemptId = 7654L - val resourceReqs1 = - ResourceRequirements() - .withCpuLimit("1") - .withMemoryRequest("7Mi") - val resourceReqs2 = - ResourceRequirements() - .withCpuLimit("2") - .withMemoryRequest("3Mi") - val resourceReqs3 = - ResourceRequirements() - .withCpuLimit("2") - .withMemoryRequest("300Mi") - val srcLauncherConfig = - IntegrationLauncherConfig() - .withAdditionalEnvironmentVariables(mapOf("env-1" to "val-1", "env-2" to "val-2")) - .withDockerImage("$sourceImageName:$imageVersion") - val destLauncherConfig = - IntegrationLauncherConfig() - .withDockerImage("$destinationImageName:$imageVersion") - .withAdditionalEnvironmentVariables(mapOf("env-3" to "val-3")) + every { envVarFactory.replicationConnectorEnvVars(srcLauncherConfig) } returns expectedSrcRuntimeEnvVars + every { envVarFactory.replicationConnectorEnvVars(destLauncherConfig) } returns expectedDestRuntimeEnvVars every { input.getJobId() } returns jobId every { input.getAttemptId() } returns attemptId @@ -378,39 +255,22 @@ class PayloadKubeInputMapperTest { EnvVar(AirbyteEnvVar.JOB_ID.toString(), jobId, null), EnvVar(AirbyteEnvVar.ATTEMPT_ID.toString(), attemptId.toString(), null), ) - val expectedSrcRuntimeEnvVars = - listOf( - EnvVar("env-1", "val-1", null), - EnvVar("env-2", "val-2", null), - EnvVar(io.airbyte.commons.envvar.EnvVar.JAVA_OPTS.name, WorkerConstants.DD_ENV_VAR, null), - EnvVar(io.airbyte.commons.envvar.EnvVar.DD_SERVICE.name, sourceImageName, null), - EnvVar(io.airbyte.commons.envvar.EnvVar.DD_VERSION.name, imageVersion, null), - ).sortedBy { it.name } - val expectedDestRuntimeEnvVars = - listOf( - EnvVar("env-3", "val-3", null), - EnvVar(io.airbyte.commons.envvar.EnvVar.JAVA_OPTS.name, WorkerConstants.DD_ENV_VAR, null), - EnvVar(io.airbyte.commons.envvar.EnvVar.DD_SERVICE.name, destinationImageName, null), - EnvVar(io.airbyte.commons.envvar.EnvVar.DD_VERSION.name, imageVersion, null), - ).sortedBy { it.name } assertEquals(expectedOrchestratorRuntimeEnvVars, result.orchestratorRuntimeEnvVars) - assertEquals(expectedSrcRuntimeEnvVars, result.sourceRuntimeEnvVars.sortedBy { it.name }) - assertEquals(expectedDestRuntimeEnvVars, result.destinationRuntimeEnvVars.sortedBy { it.name }) + assertEquals(expectedSrcRuntimeEnvVars, result.sourceRuntimeEnvVars) + assertEquals(expectedDestRuntimeEnvVars, result.destinationRuntimeEnvVars) } @ParameterizedTest @MethodSource("connectorInputMatrix") fun `builds a kube input from a check payload`( customConnector: Boolean, - assumedRoleEnabled: Boolean, workloadPriority: WorkloadPriority, useFetchingInit: Boolean, ) { val ffClient = TestClient( mapOf( - InjectAwsSecretsToConnectorPods.key to assumedRoleEnabled, ConnectorSidecarFetchesInputFromInit.key to useFetchingInit, ), ) @@ -422,7 +282,7 @@ class PayloadKubeInputMapperTest { val podNameGenerator: PodNameGenerator = mockk() every { podNameGenerator.getCheckPodName(any(), any(), any()) } returns podName val orchestratorContainerInfo = KubeContainerInfo("img-name", "pull-policy") - val awsAssumedRoleEnv: List = listOf(EnvVar("aws-assumed-role", "value", null)) + val envVarFactory: RuntimeEnvVarFactory = mockk() val checkSelectors = mapOf("test-selector" to "normal-check") val pullPolicy = "pull-policy" val checkCustomSelectors = mapOf("test-selector" to "custom-check") @@ -444,11 +304,11 @@ class PayloadKubeInputMapperTest { podNameGenerator, namespace, orchestratorContainerInfo, - awsAssumedRoleEnv, replConfigs, checkConfigs, discoverConfigs, specConfigs, + envVarFactory, ffClient, listOf(), ) @@ -468,6 +328,8 @@ class PayloadKubeInputMapperTest { every { workspaceId } returns workspaceId1 every { priority } returns workloadPriority } + val expectedEnv = listOf(EnvVar("key-1", "value-1", null)) + every { envVarFactory.checkConnectorEnvVars(launcherConfig, workloadId) } returns expectedEnv val jobRunConfig = mockk() val checkInputConfig = mockk() val checkConnectionInput = mockk() @@ -515,32 +377,19 @@ class PayloadKubeInputMapperTest { } } assertEquals(expectedFileMap, result.fileMap) - - val workloadTypeEnvVar = EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.CHECK.toString(), null) - val workloadIdEnvVar = EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null) - assert(result.extraEnv.contains(workloadTypeEnvVar)) - assert(result.extraEnv.contains(workloadIdEnvVar)) - if (!customConnector && assumedRoleEnabled) { - val externalIdVar = EnvVar(AWS_ASSUME_ROLE_EXTERNAL_ID, workspaceId1.toString(), null) - assert(result.extraEnv.contains(externalIdVar)) - awsAssumedRoleEnv.forEach { assert(result.extraEnv.contains(it)) } - } else { - awsAssumedRoleEnv.forEach { assert(!result.extraEnv.contains(it)) } - } + assertEquals(expectedEnv, result.extraEnv) } @ParameterizedTest @MethodSource("connectorInputMatrix") fun `builds a kube input from a discover payload`( customConnector: Boolean, - assumedRoleEnabled: Boolean, workloadPriority: WorkloadPriority, useFetchingInit: Boolean, ) { val ffClient = TestClient( mapOf( - InjectAwsSecretsToConnectorPods.key to assumedRoleEnabled, ConnectorSidecarFetchesInputFromInit.key to useFetchingInit, ), ) @@ -552,7 +401,7 @@ class PayloadKubeInputMapperTest { val podNameGenerator: PodNameGenerator = mockk() every { podNameGenerator.getDiscoverPodName(any(), any(), any()) } returns podName val orchestratorContainerInfo = KubeContainerInfo("img-name", "pull-policy") - val awsAssumedRoleEnv: List = listOf(EnvVar("aws-assumed-role", "value", null)) + val envVarFactory: RuntimeEnvVarFactory = mockk() val checkSelectors = mapOf("test-selector" to "normal-check") val pullPolicy = "pull-policy" val checkCustomSelectors = mapOf("test-selector" to "custom-check") @@ -574,11 +423,11 @@ class PayloadKubeInputMapperTest { podNameGenerator, namespace, orchestratorContainerInfo, - awsAssumedRoleEnv, replConfigs, checkConfigs, discoverConfigs, specConfigs, + envVarFactory, ffClient, listOf(), ) @@ -598,6 +447,8 @@ class PayloadKubeInputMapperTest { every { workspaceId } returns workspaceId1 every { priority } returns workloadPriority } + val expectedEnv = listOf(EnvVar("key-1", "value-1", null)) + every { envVarFactory.discoverConnectorEnvVars(launcherConfig, workloadId) } returns expectedEnv val jobRunConfig = mockk() val catalogInputConfig = mockk() val discoverCatalogInput = mockk() @@ -653,19 +504,7 @@ class PayloadKubeInputMapperTest { } } assertEquals(expectedFileMap, result.fileMap) - - val workloadTypeEnvVar = EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.DISCOVER.toString(), null) - val workloadIdEnvVar = EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null) - assert(result.extraEnv.contains(workloadTypeEnvVar)) - assert(result.extraEnv.contains(workloadIdEnvVar)) - - if (!customConnector && assumedRoleEnabled) { - val externalIdVar = EnvVar(AWS_ASSUME_ROLE_EXTERNAL_ID, workspaceId1.toString(), null) - assert(result.extraEnv.contains(externalIdVar)) - awsAssumedRoleEnv.forEach { assert(result.extraEnv.contains(it)) } - } else { - awsAssumedRoleEnv.forEach { assert(!result.extraEnv.contains(it)) } - } + assertEquals(expectedEnv, result.extraEnv) } @ParameterizedTest @@ -688,7 +527,7 @@ class PayloadKubeInputMapperTest { val podNameGenerator: PodNameGenerator = mockk() every { podNameGenerator.getSpecPodName(any(), any(), any()) } returns podName val orchestratorContainerInfo = KubeContainerInfo("img-name", "pull-policy") - val awsAssumedRoleEnv: List = listOf(EnvVar("aws-assumed-role", "value", null)) + val envVarFactory: RuntimeEnvVarFactory = mockk() val checkSelectors = mapOf("test-selector" to "normal-check") val pullPolicy = "pull-policy" val checkCustomSelectors = mapOf("test-selector" to "custom-check") @@ -708,11 +547,11 @@ class PayloadKubeInputMapperTest { podNameGenerator, namespace, orchestratorContainerInfo, - awsAssumedRoleEnv, replConfigs, checkConfigs, discoverConfigs, specConfigs, + envVarFactory, ffClient, listOf(), ) @@ -731,6 +570,8 @@ class PayloadKubeInputMapperTest { every { isCustomConnector } returns customConnector every { workspaceId } returns workspaceId1 } + val expectedEnv = listOf(EnvVar("key-1", "value-1", null)) + every { envVarFactory.specConnectorEnvVars(workloadId) } returns expectedEnv val jobRunConfig = mockk() every { input.getJobId() } returns jobId @@ -771,19 +612,20 @@ class PayloadKubeInputMapperTest { } } assertEquals(expectedFileMap, result.fileMap) + assertEquals(expectedEnv, result.extraEnv) } companion object { @JvmStatic private fun connectorInputMatrix(): Stream { return Stream.of( - Arguments.of(true, true, WorkloadPriority.HIGH, true), - Arguments.of(false, true, WorkloadPriority.HIGH, true), - Arguments.of(true, false, WorkloadPriority.HIGH, true), - Arguments.of(false, false, WorkloadPriority.HIGH, true), - Arguments.of(false, false, WorkloadPriority.HIGH, false), - Arguments.of(false, false, WorkloadPriority.DEFAULT, false), - Arguments.of(false, false, WorkloadPriority.DEFAULT, true), + Arguments.of(true, WorkloadPriority.HIGH, true), + Arguments.of(false, WorkloadPriority.HIGH, true), + Arguments.of(true, WorkloadPriority.HIGH, true), + Arguments.of(false, WorkloadPriority.HIGH, true), + Arguments.of(false, WorkloadPriority.HIGH, false), + Arguments.of(false, WorkloadPriority.DEFAULT, false), + Arguments.of(false, WorkloadPriority.DEFAULT, true), ) } } diff --git a/airbyte-workload-launcher/src/test/kotlin/pods/factories/RuntimeEnvVarFactoryTest.kt b/airbyte-workload-launcher/src/test/kotlin/pods/factories/RuntimeEnvVarFactoryTest.kt new file mode 100644 index 00000000000..900d2d8f003 --- /dev/null +++ b/airbyte-workload-launcher/src/test/kotlin/pods/factories/RuntimeEnvVarFactoryTest.kt @@ -0,0 +1,293 @@ +package io.airbyte.workload.launcher.pods.factories + +import io.airbyte.config.WorkerEnvConstants +import io.airbyte.config.WorkloadType +import io.airbyte.featureflag.ConcurrentSourceStreamRead +import io.airbyte.featureflag.Connection +import io.airbyte.featureflag.ConnectorApmEnabled +import io.airbyte.featureflag.InjectAwsSecretsToConnectorPods +import io.airbyte.featureflag.TestClient +import io.airbyte.persistence.job.models.IntegrationLauncherConfig +import io.airbyte.workers.helper.ConnectorApmSupportHelper +import io.airbyte.workers.process.Metadata.AWS_ASSUME_ROLE_EXTERNAL_ID +import io.airbyte.workload.launcher.constants.EnvVarConstants +import io.airbyte.workload.launcher.model.toEnvVarList +import io.airbyte.workload.launcher.pods.factories.RuntimeEnvVarFactory.Companion.MYSQL_SOURCE_NAME +import io.airbyte.workload.launcher.pods.factories.RuntimeEnvVarFactoryTest.Fixtures.WORKLOAD_ID +import io.airbyte.workload.launcher.pods.factories.RuntimeEnvVarFactoryTest.Fixtures.connectorAwsAssumedRoleSecretEnvList +import io.airbyte.workload.launcher.pods.factories.RuntimeEnvVarFactoryTest.Fixtures.workspaceId +import io.fabric8.kubernetes.api.model.EnvVar +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.ArgumentMatchers.anyList +import java.util.UUID +import java.util.stream.Stream +import io.airbyte.commons.envvar.EnvVar as AirbyteEnvVar + +class RuntimeEnvVarFactoryTest { + private lateinit var connectorApmSupportHelper: ConnectorApmSupportHelper + + private lateinit var ffClient: TestClient + + private lateinit var factory: RuntimeEnvVarFactory + + @BeforeEach + fun setup() { + connectorApmSupportHelper = mockk() + ffClient = mockk() + every { ffClient.boolVariation(InjectAwsSecretsToConnectorPods, any()) } returns false + + factory = spyk(RuntimeEnvVarFactory(connectorAwsAssumedRoleSecretEnvList, connectorApmSupportHelper, ffClient)) + } + + @Test + fun `does not build aws env vars if custom connector`() { + val config = + IntegrationLauncherConfig() + .withIsCustomConnector(true) + .withWorkspaceId(workspaceId) + + val result = factory.resolveAwsAssumedRoleEnvVars(config) + + assertTrue(result.isEmpty()) + } + + @Test + fun `does not build aws env vars if ff disabled`() { + every { ffClient.boolVariation(InjectAwsSecretsToConnectorPods, any()) } returns false + val config = + IntegrationLauncherConfig() + .withIsCustomConnector(false) + .withWorkspaceId(workspaceId) + + val result = factory.resolveAwsAssumedRoleEnvVars(config) + + assertTrue(result.isEmpty()) + } + + @Test + fun `builds aws env vars if ff enabled and airbyte connector`() { + every { ffClient.boolVariation(InjectAwsSecretsToConnectorPods, any()) } returns true + val config = + IntegrationLauncherConfig() + .withIsCustomConnector(false) + .withWorkspaceId(workspaceId) + + val result = factory.resolveAwsAssumedRoleEnvVars(config) + + val assumedRoleExternalIdEnvVar = EnvVar(AWS_ASSUME_ROLE_EXTERNAL_ID, workspaceId.toString(), null) + assertEquals( + connectorAwsAssumedRoleSecretEnvList + assumedRoleExternalIdEnvVar, + result, + ) + } + + @Test + fun `adds apm env vars if enabled (mutative API)`() { + val image = "image-name" + val context = Connection(UUID.randomUUID()) + every { ffClient.boolVariation(ConnectorApmEnabled, context) } returns true + + every { connectorApmSupportHelper.addApmEnvVars(anyList()) } returns Unit + every { connectorApmSupportHelper.addServerNameAndVersionToEnvVars(image, anyList()) } returns Unit + + factory.getConnectorApmEnvVars(image, context) + + // the API is mutative for some reason which means this is the best we can do + verify { connectorApmSupportHelper.addApmEnvVars(anyList()) } + verify { connectorApmSupportHelper.addServerNameAndVersionToEnvVars(image, anyList()) } + } + + @Test + fun `does not build apm env vars if disabled (mutative API)`() { + val image = "image-name" + val context = Connection(UUID.randomUUID()) + every { ffClient.boolVariation(ConnectorApmEnabled, context) } returns false + + every { connectorApmSupportHelper.addApmEnvVars(anyList()) } returns Unit + every { connectorApmSupportHelper.addServerNameAndVersionToEnvVars(image, anyList()) } returns Unit + + factory.getConnectorApmEnvVars(image, context) + + // the API is mutative for some reason which means this is the best we can do + verify(exactly = 0) { connectorApmSupportHelper.addApmEnvVars(anyList()) } + verify(exactly = 0) { connectorApmSupportHelper.addServerNameAndVersionToEnvVars(image, anyList()) } + } + + @Test + fun `builds metadata env vars`() { + val image = "docker-image-name" + val jobId = "3145" + val attemptId = 7L + val config = + IntegrationLauncherConfig() + .withDockerImage(image) + .withJobId(jobId) + .withAttemptId(attemptId) + + val result = factory.getMetadataEnvVars(config) + + val expected = + listOf( + EnvVar(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, image, null), + EnvVar(WorkerEnvConstants.WORKER_JOB_ID, jobId, null), + EnvVar(WorkerEnvConstants.WORKER_JOB_ATTEMPT, attemptId.toString(), null), + ) + + assertEquals(expected, result) + } + + @ParameterizedTest + @MethodSource("concurrentStreamReadEnabledMatrix") + fun `builds connector configuration env vars (concurrentStreamReadEnabled)`( + ffEnabled: Boolean, + sourceImageName: String, + ) { + every { ffClient.boolVariation(ConcurrentSourceStreamRead, any()) } returns ffEnabled + + val result = factory.getConfigurationEnvVars(sourceImageName, UUID.randomUUID()) + + val expected = + listOf( + EnvVar(EnvVarConstants.USE_STREAM_CAPABLE_STATE_ENV_VAR, "true", null), + EnvVar(EnvVarConstants.CONCURRENT_SOURCE_STREAM_READ_ENV_VAR, "true", null), + ) + + assertEquals(expected, result) + } + + @ParameterizedTest + @MethodSource("concurrentStreamReadDisabledMatrix") + fun `builds connector configuration env vars (concurrentStreamReadDisabled)`( + ffEnabled: Boolean, + sourceImageName: String, + ) { + every { ffClient.boolVariation(ConcurrentSourceStreamRead, any()) } returns ffEnabled + + val result = factory.getConfigurationEnvVars(sourceImageName, UUID.randomUUID()) + + val expected = + listOf( + EnvVar(EnvVarConstants.USE_STREAM_CAPABLE_STATE_ENV_VAR, "true", null), + EnvVar(EnvVarConstants.CONCURRENT_SOURCE_STREAM_READ_ENV_VAR, "false", null), + ) + + assertEquals(expected, result) + } + + @ParameterizedTest + @MethodSource("additionalEnvironmentVariablesMatrix") + fun `builds expected env vars for replication connector container`(passThroughEnvMap: Map?) { + val awsEnvVars = listOf(EnvVar("aws-var", "1", null)) + val apmEnvVars = listOf(EnvVar("apm-var", "2", null)) + val configurationEnvVars = listOf(EnvVar("config-var", "3", null)) + val metadataEnvVars = listOf(EnvVar("metadata-var", "4", null)) + val passThroughVars = passThroughEnvMap?.toEnvVarList().orEmpty() + every { factory.resolveAwsAssumedRoleEnvVars(any()) } returns awsEnvVars + every { factory.getConnectorApmEnvVars(any(), any()) } returns apmEnvVars + every { factory.getConfigurationEnvVars(any(), any()) } returns configurationEnvVars + every { factory.getMetadataEnvVars(any()) } returns metadataEnvVars + + val config = + IntegrationLauncherConfig() + .withAdditionalEnvironmentVariables(passThroughEnvMap) + .withDockerImage("image-name") + .withWorkspaceId(workspaceId) + + val result = factory.replicationConnectorEnvVars(config) + + val expected = awsEnvVars + apmEnvVars + configurationEnvVars + metadataEnvVars + passThroughVars + + assertEquals(expected, result) + } + + @Test + fun `builds expected env vars for check connector container`() { + every { factory.resolveAwsAssumedRoleEnvVars(any()) } returns connectorAwsAssumedRoleSecretEnvList + val config = + IntegrationLauncherConfig() + .withWorkspaceId(workspaceId) + val result = factory.checkConnectorEnvVars(config, WORKLOAD_ID) + + assertEquals( + connectorAwsAssumedRoleSecretEnvList + + EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.CHECK.toString(), null) + + EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), WORKLOAD_ID, null), + result, + ) + } + + @Test + fun `builds expected env vars for discover connector container`() { + every { factory.resolveAwsAssumedRoleEnvVars(any()) } returns connectorAwsAssumedRoleSecretEnvList + val config = + IntegrationLauncherConfig() + .withWorkspaceId(workspaceId) + val result = factory.discoverConnectorEnvVars(config, WORKLOAD_ID) + + assertEquals( + connectorAwsAssumedRoleSecretEnvList + + EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.DISCOVER.toString(), null) + + EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), WORKLOAD_ID, null), + result, + ) + } + + @Test + fun `builds expected env vars for spec connector container`() { + val result = factory.specConnectorEnvVars(WORKLOAD_ID) + + assertEquals( + listOf( + EnvVar(AirbyteEnvVar.OPERATION_TYPE.toString(), WorkloadType.SPEC.toString(), null), + EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), WORKLOAD_ID, null), + ), + result, + ) + } + + object Fixtures { + const val WORKLOAD_ID = "test-workload-id" + val workspaceId = UUID.randomUUID()!! + val connectorAwsAssumedRoleSecretEnvList = listOf(EnvVar("test", "creds", null)) + } + + companion object { + @JvmStatic + private fun concurrentStreamReadEnabledMatrix(): Stream { + return Stream.of( + Arguments.of(true, MYSQL_SOURCE_NAME), + Arguments.of(true, MYSQL_SOURCE_NAME + "asdf"), + ) + } + + @JvmStatic + private fun concurrentStreamReadDisabledMatrix(): Stream { + return Stream.of( + Arguments.of(true, "anything else"), + Arguments.of(true, "asdf" + MYSQL_SOURCE_NAME), + Arguments.of(false, MYSQL_SOURCE_NAME), + Arguments.of(false, MYSQL_SOURCE_NAME + "asdf"), + Arguments.of(false, "anything else"), + ) + } + + @JvmStatic + private fun additionalEnvironmentVariablesMatrix(): Stream { + return Stream.of( + Arguments.of(null), + Arguments.of(mapOf("key-1" to "value-1")), + Arguments.of(mapOf("key-1" to "value-1", "key-2" to "value-2")), + ) + } + } +}