diff --git a/.gitignore b/.gitignore index a6e0049c2f6..d8aaa2373bd 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ data .vscode **/gmon.out static_checker_reports/ +bin/ # Logs acceptance_tests_logs/ diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 6b45d870e3d..b8463a76787 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -381,6 +381,7 @@ public KubePodProcess(final boolean isOrchestrator, final KubernetesClient fabricClient, final String podName, final String namespace, + final String serviceAccount, final String image, final String imagePullPolicy, final String sidecarImagePullPolicy, @@ -534,11 +535,9 @@ public KubePodProcess(final boolean isOrchestrator, .withLabels(labels) .withAnnotations(annotations) .endMetadata() - .withNewSpec(); - - if (isOrchestrator) { - podBuilder = podBuilder.withServiceAccount("airbyte-admin").withAutomountServiceAccountToken(true); - } + .withNewSpec() + .withServiceAccount(isOrchestrator ? "airbyte-admin" : serviceAccount) + .withAutomountServiceAccountToken(true); List pullSecrets = imagePullSecrets .stream() diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 08d86ba9548..8333734d15f 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -31,6 +31,7 @@ public class KubeProcessFactory implements ProcessFactory { private final WorkerConfigs workerConfigs; private final String namespace; + private final String serviceAccount; private final KubernetesClient fabricClient; private final String kubeHeartbeatUrl; private final String processRunnerHost; @@ -41,12 +42,14 @@ public class KubeProcessFactory implements ProcessFactory { */ public KubeProcessFactory(final WorkerConfigs workerConfigs, final String namespace, + final String serviceAccount, final KubernetesClient fabricClient, final String kubeHeartbeatUrl, final boolean isOrchestrator) { this( workerConfigs, namespace, + serviceAccount, fabricClient, kubeHeartbeatUrl, Exceptions.toRuntime(() -> InetAddress.getLocalHost().getHostAddress()), @@ -67,12 +70,14 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs, @VisibleForTesting public KubeProcessFactory(final WorkerConfigs workerConfigs, final String namespace, + final String serviceAccount, final KubernetesClient fabricClient, final String kubeHeartbeatUrl, final String processRunnerHost, final boolean isOrchestrator) { this.workerConfigs = workerConfigs; this.namespace = namespace; + this.serviceAccount = serviceAccount; this.fabricClient = fabricClient; this.kubeHeartbeatUrl = kubeHeartbeatUrl; this.processRunnerHost = processRunnerHost; @@ -123,6 +128,7 @@ public Process create( fabricClient, podName, namespace, + serviceAccount, imageName, workerConfigs.getJobImagePullPolicy(), workerConfigs.getSidecarImagePullPolicy(), diff --git a/airbyte-commons/src/main/resources/log4j2.xml b/airbyte-commons/src/main/resources/log4j2.xml index 22d52667d0e..8504ecb08cd 100644 --- a/airbyte-commons/src/main/resources/log4j2.xml +++ b/airbyte-commons/src/main/resources/log4j2.xml @@ -12,8 +12,6 @@ ${sys:S3_LOG_BUCKET:-${env:S3_LOG_BUCKET}} ${sys:S3_LOG_BUCKET_REGION:-${env:S3_LOG_BUCKET_REGION}} - ${sys:AWS_ACCESS_KEY_ID:-${env:AWS_ACCESS_KEY_ID}} - ${sys:AWS_SECRET_ACCESS_KEY:-${env:AWS_SECRET_ACCESS_KEY}} ${sys:S3_MINIO_ENDPOINT:-${env:S3_MINIO_ENDPOINT}} ${sys:S3_PATH_STYLE_ACCESS:-${env:S3_PATH_STYLE_ACCESS}} @@ -102,7 +100,6 @@ verbose="true" stagingBufferAge="1" s3Bucket="${s3-bucket}" s3Path="job-logging${ctx:cloud_job_log_path}" s3Region="${s3-region}" - s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}" s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}" gcpStorageBucket="${gcs-log-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:cloud_job_log_path}"> @@ -126,7 +123,6 @@ verbose="true" stagingBufferAge="1" s3Bucket="${s3-bucket}" s3Path="job-logging${ctx:cloud_job_log_path}" s3Region="${s3-region}" - s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}" s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}" gcpStorageBucket="${gcs-log-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:cloud_job_log_path}"> @@ -168,7 +164,6 @@ diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 22e19bae966..d4670fc09e9 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -540,6 +540,11 @@ public interface Configs { */ String getJobKubeNamespace(); + /** + * Define the Kubernetes service account Job pods are created in. + */ + String getJobKubeServiceAccount(); + // Logging/Monitoring/Tracking /** diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index e853487ebd6..286bacadd94 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -106,6 +106,7 @@ public class EnvConfigs implements Configs { private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS"; private static final String TEMPORAL_HISTORY_RETENTION_IN_DAYS = "TEMPORAL_HISTORY_RETENTION_IN_DAYS"; public static final String JOB_KUBE_NAMESPACE = "JOB_KUBE_NAMESPACE"; + public static final String JOB_KUBE_SERVICE_ACCOUNT = "JOB_KUBE_SERVICE_ACCOUNT"; public static final String JOB_MAIN_CONTAINER_CPU_REQUEST = "JOB_MAIN_CONTAINER_CPU_REQUEST"; public static final String JOB_MAIN_CONTAINER_CPU_LIMIT = "JOB_MAIN_CONTAINER_CPU_LIMIT"; public static final String JOB_MAIN_CONTAINER_MEMORY_REQUEST = "JOB_MAIN_CONTAINER_MEMORY_REQUEST"; @@ -204,6 +205,7 @@ public class EnvConfigs implements Configs { private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache"; private static final String DEFAULT_GITHUB_STORE_BRANCH = "master"; private static final String DEFAULT_JOB_KUBE_NAMESPACE = "default"; + private static final String DEFAULT_JOB_KUBE_SERVICE_ACCOUNT = "default"; private static final String DEFAULT_JOB_CPU_REQUIREMENT = null; private static final String DEFAULT_JOB_MEMORY_REQUIREMENT = null; private static final String DEFAULT_JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"; @@ -832,6 +834,11 @@ public String getJobKubeNamespace() { return getEnvOrDefault(JOB_KUBE_NAMESPACE, DEFAULT_JOB_KUBE_NAMESPACE); } + @Override + public String getJobKubeServiceAccount() { + return getEnvOrDefault(JOB_KUBE_SERVICE_ACCOUNT, DEFAULT_JOB_KUBE_SERVICE_ACCOUNT); + } + @Override public String getJobMainContainerCpuRequest() { return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_CPU_REQUIREMENT); diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java index 5120f4d852b..7971422f88e 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/storage/DefaultS3ClientFactory.java @@ -8,7 +8,7 @@ import io.airbyte.config.storage.CloudStorageConfigs.S3ApiWorkerStorageConfig; import io.airbyte.config.storage.CloudStorageConfigs.S3Config; import java.util.function.Supplier; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; @@ -33,8 +33,6 @@ private static void validate(final S3Config config) { } static void validateBase(final S3ApiWorkerStorageConfig s3BaseConfig) { - Preconditions.checkArgument(!s3BaseConfig.getAwsAccessKey().isBlank()); - Preconditions.checkArgument(!s3BaseConfig.getAwsSecretAccessKey().isBlank()); Preconditions.checkArgument(!s3BaseConfig.getBucketName().isBlank()); Preconditions.checkArgument(!s3BaseConfig.getBucketName().isBlank()); } @@ -42,7 +40,7 @@ static void validateBase(final S3ApiWorkerStorageConfig s3BaseConfig) { @Override public S3Client get() { final var builder = S3Client.builder(); - builder.credentialsProvider(() -> AwsBasicCredentials.create(s3Config.getAwsAccessKey(), s3Config.getAwsSecretAccessKey())); + builder.credentialsProvider(DefaultAWSCredentialsProviderChain()); builder.region(Region.of(s3Config.getRegion())); return builder.build(); } diff --git a/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/CloudLogsClientTest.java b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/CloudLogsClientTest.java index 06a8cdc70f3..325ec37b432 100644 --- a/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/CloudLogsClientTest.java +++ b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/CloudLogsClientTest.java @@ -31,8 +31,8 @@ void createCloudLogClientTestMinio() { void createCloudLogClientTestAws() { final var configs = new LogConfigs(Optional.of(CloudStorageConfigs.s3(new S3Config( "test-bucket", - "access-key", - "access-key-secret", + null, + null, "us-east-1")))); assertEquals(S3Logs.class, CloudLogs.createCloudLogClient(configs).getClass()); diff --git a/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/S3LogsTest.java b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/S3LogsTest.java index f43e257c842..d89276897ea 100644 --- a/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/S3LogsTest.java +++ b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/S3LogsTest.java @@ -30,8 +30,8 @@ class S3LogsTest { private static final LogConfigs LOG_CONFIGS = new LogConfigs(Optional.of(CloudStorageConfigs.s3(new CloudStorageConfigs.S3Config( System.getenv(LogClientSingleton.S3_LOG_BUCKET), - System.getenv(LogClientSingleton.AWS_ACCESS_KEY_ID), - System.getenv(LogClientSingleton.AWS_SECRET_ACCESS_KEY), + null, + null, System.getenv(LogClientSingleton.S3_LOG_BUCKET_REGION))))); private S3Client s3Client; diff --git a/airbyte-config/config-models/src/test/java/io/airbyte/config/storage/DefaultS3ClientFactoryTest.java b/airbyte-config/config-models/src/test/java/io/airbyte/config/storage/DefaultS3ClientFactoryTest.java index d70f08a545e..f4d66036599 100644 --- a/airbyte-config/config-models/src/test/java/io/airbyte/config/storage/DefaultS3ClientFactoryTest.java +++ b/airbyte-config/config-models/src/test/java/io/airbyte/config/storage/DefaultS3ClientFactoryTest.java @@ -16,8 +16,8 @@ class DefaultS3ClientFactoryTest { @Test void testS3() { final var s3Config = Mockito.mock(S3Config.class); - Mockito.when(s3Config.getAwsAccessKey()).thenReturn("access-key"); - Mockito.when(s3Config.getAwsSecretAccessKey()).thenReturn("access-key-secret"); + Mockito.when(s3Config.getAwsAccessKey()).thenReturn(null); + Mockito.when(s3Config.getAwsSecretAccessKey()).thenReturn(null); Mockito.when(s3Config.getBucketName()).thenReturn("test-bucket"); Mockito.when(s3Config.getRegion()).thenReturn("us-east-1"); @@ -28,8 +28,8 @@ void testS3() { void testS3RegionNotSet() { final var s3Config = Mockito.mock(S3Config.class); // Missing bucket and access key. - Mockito.when(s3Config.getAwsAccessKey()).thenReturn(""); - Mockito.when(s3Config.getAwsSecretAccessKey()).thenReturn("access-key-secret"); + Mockito.when(s3Config.getAwsAccessKey()).thenReturn(null); + Mockito.when(s3Config.getAwsSecretAccessKey()).thenReturn(null); Mockito.when(s3Config.getBucketName()).thenReturn(""); Mockito.when(s3Config.getRegion()).thenReturn(""); diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactory.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactory.java index 74004a3e802..d0e4e5f275d 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactory.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/config/ContainerOrchestratorFactory.java @@ -89,6 +89,7 @@ ProcessFactory kubeProcessFactory( return new KubeProcessFactory( workerConfigs, configs.getJobKubeNamespace(), + configs.getJobKubeServiceAccount(), new DefaultKubernetesClient(), kubeHeartbeatUrl, false); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ProcessFactoryBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ProcessFactoryBeanFactory.java index 33d6b618738..74f08917dca 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ProcessFactoryBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ProcessFactoryBeanFactory.java @@ -54,10 +54,12 @@ public ProcessFactory checkDockerProcessFactory( public ProcessFactory checkKubernetesProcessFactory( @Named("checkWorkerConfigs") final WorkerConfigs workerConfigs, @Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace, + @Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount, @Value("${micronaut.server.port}") final Integer serverPort) throws UnknownHostException { return createKubernetesProcessFactory(workerConfigs, kubernetesNamespace, + kubernetesServiceAccount, serverPort); } @@ -86,10 +88,12 @@ public ProcessFactory defaultDockerProcessFactory( public ProcessFactory defaultKubernetesProcessFactory( @Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs, @Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace, + @Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount, @Value("${micronaut.server.port}") final Integer serverPort) throws UnknownHostException { return createKubernetesProcessFactory(workerConfigs, kubernetesNamespace, + kubernetesServiceAccount, serverPort); } @@ -118,10 +122,12 @@ public ProcessFactory discoverDockerProcessFactory( public ProcessFactory discoverKubernetesProcessFactory( @Named("discoverWorkerConfigs") final WorkerConfigs workerConfigs, @Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace, + @Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount, @Value("${micronaut.server.port}") final Integer serverPort) throws UnknownHostException { return createKubernetesProcessFactory(workerConfigs, kubernetesNamespace, + kubernetesServiceAccount, serverPort); } @@ -150,10 +156,12 @@ public ProcessFactory replicationDockerProcessFactory( public ProcessFactory replicationKubernetesProcessFactory( @Named("replicationWorkerConfigs") final WorkerConfigs workerConfigs, @Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace, + @Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount, @Value("${micronaut.server.port}") final Integer serverPort) throws UnknownHostException { return createKubernetesProcessFactory(workerConfigs, kubernetesNamespace, + kubernetesServiceAccount, serverPort); } @@ -184,10 +192,12 @@ public ProcessFactory specDockerProcessFactory( public ProcessFactory specKubernetesProcessFactory( @Named("specWorkerConfigs") final WorkerConfigs workerConfigs, @Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace, + @Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount, @Value("${micronaut.server.port}") final Integer serverPort) throws UnknownHostException { return createKubernetesProcessFactory(workerConfigs, kubernetesNamespace, + kubernetesServiceAccount, serverPort); } @@ -207,6 +217,7 @@ private ProcessFactory createDockerProcessFactory(final WorkerConfigs workerConf private ProcessFactory createKubernetesProcessFactory(final WorkerConfigs workerConfigs, final String kubernetesNamespace, + final String kubernetesServiceAccount, final Integer serverPort) throws UnknownHostException { final KubernetesClient fabricClient = new DefaultKubernetesClient(); @@ -214,6 +225,7 @@ private ProcessFactory createKubernetesProcessFactory(final WorkerConfigs worker final String kubeHeartbeatUrl = localIp + ":" + serverPort; return new KubeProcessFactory(workerConfigs, kubernetesNamespace, + kubernetesServiceAccount, fabricClient, kubeHeartbeatUrl, false); diff --git a/build.gradle b/build.gradle index bb4b47dd159..6815af4e615 100644 --- a/build.gradle +++ b/build.gradle @@ -414,9 +414,11 @@ subprojects { subproj -> // Dependencies for logging to cloud storage, as well as the various clients used to do so. implementation libs.appender.log4j2 implementation libs.aws.java.sdk.s3 + implementation libs.aws.java.sdk.sts implementation libs.google.cloud.storage - implementation libs.s3 + implementation(platform(libs.aws-bom)) + implementation libs.bundles.aws // Lombok dependencies compileOnly libs.lombok diff --git a/charts/airbyte-worker/templates/deployment.yaml b/charts/airbyte-worker/templates/deployment.yaml index e37bf9af02e..2215695ee4b 100644 --- a/charts/airbyte-worker/templates/deployment.yaml +++ b/charts/airbyte-worker/templates/deployment.yaml @@ -131,6 +131,8 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: JOB_KUBE_NAMESPACE + value: "{{ .Values.global.jobs.kube.service_account }}" {{- if $.Values.global.jobs.kube.annotations }} - name: JOB_KUBE_ANNOTATIONS valueFrom: diff --git a/charts/airbyte-worker/values.yaml b/charts/airbyte-worker/values.yaml index 8396d727e7b..45c00872af7 100644 --- a/charts/airbyte-worker/values.yaml +++ b/charts/airbyte-worker/values.yaml @@ -82,6 +82,8 @@ global: limits: {} kube: + + service_account: default ## JOB_KUBE_ANNOTATIONS ## pod annotations of the sync job and the default pod annotations fallback for others jobs ## jobs.kube.annotations [object] key/value annotations applied to kube jobs diff --git a/charts/airbyte/values.yaml b/charts/airbyte/values.yaml index a3c280b7cbe..8265b966a84 100644 --- a/charts/airbyte/values.yaml +++ b/charts/airbyte/values.yaml @@ -105,6 +105,7 @@ global: limits: {} kube: + service_account: default ## JOB_KUBE_ANNOTATIONS ## pod annotations of the sync job and the default pod annotations fallback for others jobs ## jobs.kube.annotations [object] key/value annotations applied to kube jobs diff --git a/deps.toml b/deps.toml index 07b0608c1dc..42bd1d114c0 100644 --- a/deps.toml +++ b/deps.toml @@ -43,6 +43,7 @@ apache-commons-text = { module = "org.apache.commons:commons-text", version = "1 appender-log4j2 = { module = "com.therealvan:appender-log4j2", version = "3.6.0" } assertj-core = { module = "org.assertj:assertj-core", version = "3.21.0" } aws-java-sdk-s3 = { module = "com.amazonaws:aws-java-sdk-s3", version = "1.12.6" } +aws-java-sdk-sts = {module = "com.amazonaws:aws-java-sdk-sts", version = "1.12.6"} byte-buddy = { module = "net.bytebuddy:byte-buddy", version = "1.12.14" } commons-io = { module = "commons-io:commons-io", version.ref = "commons_io" } connectors-testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "connectors-testcontainers" } @@ -105,7 +106,11 @@ postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql" quartz-scheduler = { module = "org.quartz-scheduler:quartz", version = "2.3.2" } reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "reactor" } reactor-test = { module = "io.projectreactor:reactor-test", version.ref = "reactor" } -s3 = { module = "software.amazon.awssdk:s3", version = "2.16.84" } +aws-bom = "software.amazon.awssdk:bom:2.16.84" +aws-s3 = { module = "software.amazon.awssdk:s3" } +aws-sts = { module = "software.amazon.awssdk:sts" } +aws-auth = { module = "software.amazon.awssdk:auth" } +aws-iam = { module = "software.amazon.awssdk:iam" } segment-java-analytics = { module = "com.segment.analytics.java:analytics", version.ref = "segment" } sentry-java = { module = "io.sentry:sentry", version.ref = "sentry" } slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } @@ -148,6 +153,7 @@ micronaut-validation = { module = "io.micronaut:micronaut-validation", version.r [bundles] apache = ["apache-commons", "apache-commons-lang"] +aws = ["aws-s3","aws-sts","aws-auth","aws-iam"] datadog = ["datadog-trace-api", "datadog-trace-ot"] jackson = ["jackson-databind", "jackson-annotations", "jackson-dataformat", "jackson-datatype"] junit = ["junit-jupiter-api", "junit-jupiter-params", "mockito-junit-jupiter"]