Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Core: Support for IRSA when logging on EKS and passing role to Dest/Source Pods #177

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ data
.vscode
**/gmon.out
static_checker_reports/
bin/

# Logs
acceptance_tests_logs/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ public KubePodProcess(final boolean isOrchestrator,
final KubernetesClient fabricClient,
final String podName,
final String namespace,
final String serviceAccount,
final String image,
final String imagePullPolicy,
final String sidecarImagePullPolicy,
Expand Down Expand Up @@ -534,11 +535,9 @@ public KubePodProcess(final boolean isOrchestrator,
.withLabels(labels)
.withAnnotations(annotations)
.endMetadata()
.withNewSpec();

if (isOrchestrator) {
podBuilder = podBuilder.withServiceAccount("airbyte-admin").withAutomountServiceAccountToken(true);
}
.withNewSpec()
.withServiceAccount(isOrchestrator ? "airbyte-admin" : serviceAccount)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau added a comment to the previous PR:

Should we add a dedicated variable for the orchestrator SA and well as the one added in order to have something like:
.withServiceAccount(isOrchestrator ? orchestratorServiceAccount : serviceAccount). If we go that way, we need to make sure that the default value is "airbyte-admin".

.withAutomountServiceAccountToken(true);

List<LocalObjectReference> pullSecrets = imagePullSecrets
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class KubeProcessFactory implements ProcessFactory {

private final WorkerConfigs workerConfigs;
private final String namespace;
private final String serviceAccount;
private final KubernetesClient fabricClient;
private final String kubeHeartbeatUrl;
private final String processRunnerHost;
Expand All @@ -41,12 +42,14 @@ public class KubeProcessFactory implements ProcessFactory {
*/
public KubeProcessFactory(final WorkerConfigs workerConfigs,
final String namespace,
final String serviceAccount,
final KubernetesClient fabricClient,
final String kubeHeartbeatUrl,
final boolean isOrchestrator) {
this(
workerConfigs,
namespace,
serviceAccount,
fabricClient,
kubeHeartbeatUrl,
Exceptions.toRuntime(() -> InetAddress.getLocalHost().getHostAddress()),
Expand All @@ -67,12 +70,14 @@ public KubeProcessFactory(final WorkerConfigs workerConfigs,
@VisibleForTesting
public KubeProcessFactory(final WorkerConfigs workerConfigs,
final String namespace,
final String serviceAccount,
final KubernetesClient fabricClient,
final String kubeHeartbeatUrl,
final String processRunnerHost,
final boolean isOrchestrator) {
this.workerConfigs = workerConfigs;
this.namespace = namespace;
this.serviceAccount = serviceAccount;
this.fabricClient = fabricClient;
this.kubeHeartbeatUrl = kubeHeartbeatUrl;
this.processRunnerHost = processRunnerHost;
Expand Down Expand Up @@ -123,6 +128,7 @@ public Process create(
fabricClient,
podName,
namespace,
serviceAccount,
imageName,
workerConfigs.getJobImagePullPolicy(),
workerConfigs.getSidecarImagePullPolicy(),
Expand Down
5 changes: 0 additions & 5 deletions airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

<Property name="s3-bucket">${sys:S3_LOG_BUCKET:-${env:S3_LOG_BUCKET}}</Property>
<Property name="s3-region">${sys:S3_LOG_BUCKET_REGION:-${env:S3_LOG_BUCKET_REGION}}</Property>
<Property name="s3-aws-key">${sys:AWS_ACCESS_KEY_ID:-${env:AWS_ACCESS_KEY_ID}}</Property>
<Property name="s3-aws-secret">${sys:AWS_SECRET_ACCESS_KEY:-${env:AWS_SECRET_ACCESS_KEY}}</Property>
<Property name="s3-minio-endpoint">${sys:S3_MINIO_ENDPOINT:-${env:S3_MINIO_ENDPOINT}}</Property>
<Property name="s3-path-style-access">${sys:S3_PATH_STYLE_ACCESS:-${env:S3_PATH_STYLE_ACCESS}}</Property>

Expand Down Expand Up @@ -102,7 +100,6 @@
verbose="true"
stagingBufferAge="1"
s3Bucket="${s3-bucket}" s3Path="job-logging${ctx:cloud_job_log_path}" s3Region="${s3-region}"
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}"
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}"
gcpStorageBucket="${gcs-log-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:cloud_job_log_path}">
<PatternLayout pattern="${default-pattern}"/>
Expand All @@ -126,7 +123,6 @@
verbose="true"
stagingBufferAge="1"
s3Bucket="${s3-bucket}" s3Path="job-logging${ctx:cloud_job_log_path}" s3Region="${s3-region}"
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}"
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}"
gcpStorageBucket="${gcs-log-bucket}" gcpStorageBlobNamePrefix="job-logging${ctx:cloud_job_log_path}">
<PatternLayout pattern="${simple-pattern}"/>
Expand Down Expand Up @@ -168,7 +164,6 @@
<Log4j2Appender name="app-logging/${ctx:cloud_workspace_app_root}/"
stagingBufferAge="1"
s3Bucket="${s3-bucket}" s3Path="app-logging${ctx:cloud_workspace_app_root}" s3Region="${s3-region}"
s3AwsKey="${s3-aws-key}" s3AwsSecret="${s3-aws-secret}"
s3ServiceEndpoint="${s3-minio-endpoint}" s3PathStyleAccess="${s3-path-style-access}"
gcpStorageBucket="${gcs-log-bucket}" gcpStorageBlobNamePrefix="app-logging${ctx:cloud_workspace_app_root}">
<PatternLayout pattern="${default-pattern}"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ public interface Configs {
*/
String getJobKubeNamespace();

/**
* Define the Kubernetes service account Job pods are created in.
*/
String getJobKubeServiceAccount();

// Logging/Monitoring/Tracking

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class EnvConfigs implements Configs {
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final String TEMPORAL_HISTORY_RETENTION_IN_DAYS = "TEMPORAL_HISTORY_RETENTION_IN_DAYS";
public static final String JOB_KUBE_NAMESPACE = "JOB_KUBE_NAMESPACE";
public static final String JOB_KUBE_SERVICE_ACCOUNT = "JOB_KUBE_SERVICE_ACCOUNT";
public static final String JOB_MAIN_CONTAINER_CPU_REQUEST = "JOB_MAIN_CONTAINER_CPU_REQUEST";
public static final String JOB_MAIN_CONTAINER_CPU_LIMIT = "JOB_MAIN_CONTAINER_CPU_LIMIT";
public static final String JOB_MAIN_CONTAINER_MEMORY_REQUEST = "JOB_MAIN_CONTAINER_MEMORY_REQUEST";
Expand Down Expand Up @@ -204,6 +205,7 @@ public class EnvConfigs implements Configs {
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
private static final String DEFAULT_GITHUB_STORE_BRANCH = "master";
private static final String DEFAULT_JOB_KUBE_NAMESPACE = "default";
private static final String DEFAULT_JOB_KUBE_SERVICE_ACCOUNT = "default";
private static final String DEFAULT_JOB_CPU_REQUIREMENT = null;
private static final String DEFAULT_JOB_MEMORY_REQUIREMENT = null;
private static final String DEFAULT_JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent";
Expand Down Expand Up @@ -832,6 +834,11 @@ public String getJobKubeNamespace() {
return getEnvOrDefault(JOB_KUBE_NAMESPACE, DEFAULT_JOB_KUBE_NAMESPACE);
}

@Override
public String getJobKubeServiceAccount() {
return getEnvOrDefault(JOB_KUBE_SERVICE_ACCOUNT, DEFAULT_JOB_KUBE_SERVICE_ACCOUNT);
}

@Override
public String getJobMainContainerCpuRequest() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_CPU_REQUIREMENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.airbyte.config.storage.CloudStorageConfigs.S3ApiWorkerStorageConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.util.function.Supplier;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

Expand All @@ -33,16 +33,14 @@ private static void validate(final S3Config config) {
}

static void validateBase(final S3ApiWorkerStorageConfig s3BaseConfig) {
Preconditions.checkArgument(!s3BaseConfig.getAwsAccessKey().isBlank());
Preconditions.checkArgument(!s3BaseConfig.getAwsSecretAccessKey().isBlank());
Preconditions.checkArgument(!s3BaseConfig.getBucketName().isBlank());
Preconditions.checkArgument(!s3BaseConfig.getBucketName().isBlank());
}

@Override
public S3Client get() {
final var builder = S3Client.builder();
builder.credentialsProvider(() -> AwsBasicCredentials.create(s3Config.getAwsAccessKey(), s3Config.getAwsSecretAccessKey()));
builder.credentialsProvider(DefaultAWSCredentialsProviderChain());
builder.region(Region.of(s3Config.getRegion()));
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ void createCloudLogClientTestMinio() {
void createCloudLogClientTestAws() {
final var configs = new LogConfigs(Optional.of(CloudStorageConfigs.s3(new S3Config(
"test-bucket",
"access-key",
"access-key-secret",
null,
null,
"us-east-1"))));

assertEquals(S3Logs.class, CloudLogs.createCloudLogClient(configs).getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class S3LogsTest {

private static final LogConfigs LOG_CONFIGS = new LogConfigs(Optional.of(CloudStorageConfigs.s3(new CloudStorageConfigs.S3Config(
System.getenv(LogClientSingleton.S3_LOG_BUCKET),
System.getenv(LogClientSingleton.AWS_ACCESS_KEY_ID),
System.getenv(LogClientSingleton.AWS_SECRET_ACCESS_KEY),
null,
null,
System.getenv(LogClientSingleton.S3_LOG_BUCKET_REGION)))));

private S3Client s3Client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class DefaultS3ClientFactoryTest {
@Test
void testS3() {
final var s3Config = Mockito.mock(S3Config.class);
Mockito.when(s3Config.getAwsAccessKey()).thenReturn("access-key");
Mockito.when(s3Config.getAwsSecretAccessKey()).thenReturn("access-key-secret");
Mockito.when(s3Config.getAwsAccessKey()).thenReturn(null);
Mockito.when(s3Config.getAwsSecretAccessKey()).thenReturn(null);
Mockito.when(s3Config.getBucketName()).thenReturn("test-bucket");
Mockito.when(s3Config.getRegion()).thenReturn("us-east-1");

Expand All @@ -28,8 +28,8 @@ void testS3() {
void testS3RegionNotSet() {
final var s3Config = Mockito.mock(S3Config.class);
// Missing bucket and access key.
Mockito.when(s3Config.getAwsAccessKey()).thenReturn("");
Mockito.when(s3Config.getAwsSecretAccessKey()).thenReturn("access-key-secret");
Mockito.when(s3Config.getAwsAccessKey()).thenReturn(null);
Mockito.when(s3Config.getAwsSecretAccessKey()).thenReturn(null);
Mockito.when(s3Config.getBucketName()).thenReturn("");
Mockito.when(s3Config.getRegion()).thenReturn("");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ ProcessFactory kubeProcessFactory(
return new KubeProcessFactory(
workerConfigs,
configs.getJobKubeNamespace(),
configs.getJobKubeServiceAccount(),
new DefaultKubernetesClient(),
kubeHeartbeatUrl,
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ public ProcessFactory checkDockerProcessFactory(
public ProcessFactory checkKubernetesProcessFactory(
@Named("checkWorkerConfigs") final WorkerConfigs workerConfigs,
@Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace,
@Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not wrong, there is a missing bracket here

@Value("${micronaut.server.port}") final Integer serverPort)
throws UnknownHostException {
return createKubernetesProcessFactory(workerConfigs,
kubernetesNamespace,
kubernetesServiceAccount,
serverPort);
}

Expand Down Expand Up @@ -86,10 +88,12 @@ public ProcessFactory defaultDockerProcessFactory(
public ProcessFactory defaultKubernetesProcessFactory(
@Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs,
@Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace,
@Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount,
@Value("${micronaut.server.port}") final Integer serverPort)
throws UnknownHostException {
return createKubernetesProcessFactory(workerConfigs,
kubernetesNamespace,
kubernetesServiceAccount,
serverPort);
}

Expand Down Expand Up @@ -118,10 +122,12 @@ public ProcessFactory discoverDockerProcessFactory(
public ProcessFactory discoverKubernetesProcessFactory(
@Named("discoverWorkerConfigs") final WorkerConfigs workerConfigs,
@Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace,
@Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau left a comment:

This value is missing from the file application.yaml in the resource folder of the airbyte-worker module. This will cause some issue while being created.

@Value("${micronaut.server.port}") final Integer serverPort)
throws UnknownHostException {
return createKubernetesProcessFactory(workerConfigs,
kubernetesNamespace,
kubernetesServiceAccount,
serverPort);
}

Expand Down Expand Up @@ -150,10 +156,12 @@ public ProcessFactory replicationDockerProcessFactory(
public ProcessFactory replicationKubernetesProcessFactory(
@Named("replicationWorkerConfigs") final WorkerConfigs workerConfigs,
@Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace,
@Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount,
@Value("${micronaut.server.port}") final Integer serverPort)
throws UnknownHostException {
return createKubernetesProcessFactory(workerConfigs,
kubernetesNamespace,
kubernetesServiceAccount,
serverPort);
}

Expand Down Expand Up @@ -184,10 +192,12 @@ public ProcessFactory specDockerProcessFactory(
public ProcessFactory specKubernetesProcessFactory(
@Named("specWorkerConfigs") final WorkerConfigs workerConfigs,
@Value("${airbyte.worker.job.kube.namespace}") final String kubernetesNamespace,
@Value("${airbyte.worker.job.kube.service.account") final String kubernetesServiceAccount,
@Value("${micronaut.server.port}") final Integer serverPort)
throws UnknownHostException {
return createKubernetesProcessFactory(workerConfigs,
kubernetesNamespace,
kubernetesServiceAccount,
serverPort);
}

Expand All @@ -207,13 +217,15 @@ private ProcessFactory createDockerProcessFactory(final WorkerConfigs workerConf

private ProcessFactory createKubernetesProcessFactory(final WorkerConfigs workerConfigs,
final String kubernetesNamespace,
final String kubernetesServiceAccount,
final Integer serverPort)
throws UnknownHostException {
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + serverPort;
return new KubeProcessFactory(workerConfigs,
kubernetesNamespace,
kubernetesServiceAccount,
fabricClient,
kubeHeartbeatUrl,
false);
Expand Down
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,11 @@ subprojects { subproj ->
// Dependencies for logging to cloud storage, as well as the various clients used to do so.
implementation libs.appender.log4j2
implementation libs.aws.java.sdk.s3
implementation libs.aws.java.sdk.sts
implementation libs.google.cloud.storage

implementation libs.s3
implementation(platform(libs.aws-bom))
implementation libs.bundles.aws

// Lombok dependencies
compileOnly libs.lombok
Expand Down
2 changes: 2 additions & 0 deletions charts/airbyte-worker/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: JOB_KUBE_NAMESPACE
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment from previous PR by @mjstel:

This is probably a typo and should be JOB_KUBE_SERVICE_ACCOUNT right?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- name: JOB_KUBE_NAMESPACE
- name: JOB_KUBE_SERVICE_ACCOUNT

value: "{{ .Values.global.jobs.kube.service_account }}"
{{- if $.Values.global.jobs.kube.annotations }}
- name: JOB_KUBE_ANNOTATIONS
valueFrom:
Expand Down
2 changes: 2 additions & 0 deletions charts/airbyte-worker/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ global:
limits: {}

kube:

service_account: default
## JOB_KUBE_ANNOTATIONS
## pod annotations of the sync job and the default pod annotations fallback for others jobs
## jobs.kube.annotations [object] key/value annotations applied to kube jobs
Expand Down
1 change: 1 addition & 0 deletions charts/airbyte/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ global:
limits: {}

kube:
service_account: default
## JOB_KUBE_ANNOTATIONS
## pod annotations of the sync job and the default pod annotations fallback for others jobs
## jobs.kube.annotations [object] key/value annotations applied to kube jobs
Expand Down
8 changes: 7 additions & 1 deletion deps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ apache-commons-text = { module = "org.apache.commons:commons-text", version = "1
appender-log4j2 = { module = "com.therealvan:appender-log4j2", version = "3.6.0" }
assertj-core = { module = "org.assertj:assertj-core", version = "3.21.0" }
aws-java-sdk-s3 = { module = "com.amazonaws:aws-java-sdk-s3", version = "1.12.6" }
aws-java-sdk-sts = {module = "com.amazonaws:aws-java-sdk-sts", version = "1.12.6"}
byte-buddy = { module = "net.bytebuddy:byte-buddy", version = "1.12.14" }
commons-io = { module = "commons-io:commons-io", version.ref = "commons_io" }
connectors-testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "connectors-testcontainers" }
Expand Down Expand Up @@ -105,7 +106,11 @@ postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql"
quartz-scheduler = { module = "org.quartz-scheduler:quartz", version = "2.3.2" }
reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "reactor" }
reactor-test = { module = "io.projectreactor:reactor-test", version.ref = "reactor" }
s3 = { module = "software.amazon.awssdk:s3", version = "2.16.84" }
aws-bom = "software.amazon.awssdk:bom:2.16.84"
aws-s3 = { module = "software.amazon.awssdk:s3" }
aws-sts = { module = "software.amazon.awssdk:sts" }
aws-auth = { module = "software.amazon.awssdk:auth" }
aws-iam = { module = "software.amazon.awssdk:iam" }
segment-java-analytics = { module = "com.segment.analytics.java:analytics", version.ref = "segment" }
sentry-java = { module = "io.sentry:sentry", version.ref = "sentry" }
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
Expand Down Expand Up @@ -148,6 +153,7 @@ micronaut-validation = { module = "io.micronaut:micronaut-validation", version.r

[bundles]
apache = ["apache-commons", "apache-commons-lang"]
aws = ["aws-s3","aws-sts","aws-auth","aws-iam"]
datadog = ["datadog-trace-api", "datadog-trace-ot"]
jackson = ["jackson-databind", "jackson-annotations", "jackson-dataformat", "jackson-datatype"]
junit = ["junit-jupiter-api", "junit-jupiter-params", "mockito-junit-jupiter"]
Expand Down