Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use merge policy instead of fallbacks for KubeResourceConfig #309

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@

import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import java.util.Objects;
import java.util.Optional;

/**
* Encapsulates the configuration that is specific to Kubernetes. This is meant for the
* WorkerConfigsProvider to be reading configs, not for direct use as fallback logic isn't
* implemented here.
* WorkerConfigsProvider to be reading configs, not for direct use as merge logic isn't implemented
* here. Important: we cannot distinguish between empty and non-existent environment variables in
* this context, so we treat empty and non-existing strings as the same for our update logic. We use
* the "<EMPTY>" literal to represent an empty string.
*/
@EachProperty("airbyte.worker.kube-job-configs")
public final class KubeResourceConfig {

public static final String EMPTY_VALUE = "<EMPTY>";

private final String name;
private String annotations;
private String labels;
Expand All @@ -28,36 +34,50 @@ public KubeResourceConfig(@Parameter final String name) {
this.name = name;
}

public KubeResourceConfig merge(KubeResourceConfig other) {
var merged = new KubeResourceConfig(name);

merged.setAnnotations(useOtherIfEmpty(annotations, other.annotations));
merged.setLabels(useOtherIfEmpty(labels, other.labels));
merged.setNodeSelectors(useOtherIfEmpty(nodeSelectors, other.nodeSelectors));
merged.setCpuLimit(useOtherIfEmpty(cpuLimit, other.cpuLimit));
merged.setCpuRequest(useOtherIfEmpty(cpuRequest, other.cpuRequest));
merged.setMemoryLimit(useOtherIfEmpty(memoryLimit, other.memoryLimit));
merged.setMemoryRequest(useOtherIfEmpty(memoryRequest, other.memoryRequest));

return merged;
}

public String getName() {
return name;
}

public String getAnnotations() {
return annotations;
return resolveEmpty(annotations);
}

public String getLabels() {
return labels;
return resolveEmpty(labels);
}

public String getNodeSelectors() {
return nodeSelectors;
return resolveEmpty(nodeSelectors);
}

public String getCpuLimit() {
return cpuLimit;
return resolveEmpty(cpuLimit);
}

public String getCpuRequest() {
return cpuRequest;
return resolveEmpty(cpuRequest);
}

public String getMemoryLimit() {
return memoryLimit;
return resolveEmpty(memoryLimit);
}

public String getMemoryRequest() {
return memoryRequest;
return resolveEmpty(memoryRequest);
}

public void setAnnotations(final String annotations) {
Expand Down Expand Up @@ -88,4 +108,13 @@ public void setMemoryRequest(final String memoryRequest) {
this.memoryRequest = memoryRequest;
}

private static String useOtherIfEmpty(final String value, final String defaultValue) {
return (value == null || value.isBlank()) ? defaultValue : value;
}

private static String resolveEmpty(final String value) {
// Let's no return null values as it can be ambiguous
return (Objects.equals(value, EMPTY_VALUE)) ? "" : Optional.ofNullable(value).orElse("");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ public static ResourceSubType fromValue(final String value) {

@Singleton
record WorkerConfigsDefaults(WorkerEnvironment workerEnvironment,
@Named("default") KubeResourceConfig defaultKubeResourceConfig,
List<TolerationPOJO> jobKubeTolerations,
@Value("${airbyte.worker.isolated.kube.node-selectors}") String isolatedNodeSelectors,
@Value("${airbyte.worker.isolated.kube.use-custom-node-selector}") boolean useCustomNodeSelector,
Expand Down Expand Up @@ -231,22 +230,13 @@ private WorkerConfigs getConfig(final KubeResourceKey key) {
final Map<String, String> isolatedNodeSelectors = splitKVPairsFromEnvString(workerConfigsDefaults.isolatedNodeSelectors);
validateIsolatedPoolConfigInitialization(workerConfigsDefaults.useCustomNodeSelector(), isolatedNodeSelectors);

// if annotations are not defined for this specific resource, then fallback to the default
// resource's annotations
final Map<String, String> annotations;
if (Strings.isNullOrEmpty(kubeResourceConfig.getAnnotations())) {
annotations = splitKVPairsFromEnvString(workerConfigsDefaults.defaultKubeResourceConfig.getAnnotations());
} else {
annotations = splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations());
}

Comment on lines -234 to -242
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove custom fallbacks since we don't need them anymore

return new WorkerConfigs(
workerConfigsDefaults.workerEnvironment(),
getResourceRequirementsFrom(kubeResourceConfig, workerConfigsDefaults.defaultKubeResourceConfig()),
getResourceRequirementsFrom(kubeResourceConfig),
workerConfigsDefaults.jobKubeTolerations(),
splitKVPairsFromEnvString(kubeResourceConfig.getNodeSelectors()),
workerConfigsDefaults.useCustomNodeSelector() ? Optional.of(isolatedNodeSelectors) : Optional.empty(),
annotations,
splitKVPairsFromEnvString(kubeResourceConfig.getAnnotations()),
splitKVPairsFromEnvString(kubeResourceConfig.getLabels()),
workerConfigsDefaults.mainContainerImagePullSecret(),
workerConfigsDefaults.mainContainerImagePullPolicy(),
Expand Down Expand Up @@ -279,18 +269,20 @@ public ResourceRequirements getResourceRequirements(final ResourceRequirementsTy
* Look up resource configs given a key.
* <p>
* We are storing configs in a tree like structure. Look up should be handled as such. Keeping in
* mind that we have defaults we want to fallback to, we should perform a complete scan of the
* configs until we find a match to make sure we do not overlook a match.
* mind that we have defaults we want to merge with, we should perform a complete scan of the
* configs, so we can update our result config.
*/
private Optional<KubeResourceConfig> getKubeResourceConfig(final KubeResourceKey key) {
// Look up by actual variant
final var resultWithVariant = getKubeResourceConfigByType(kubeResourceConfigs.get(key.variant), key);
if (resultWithVariant.isPresent()) {
return resultWithVariant;
var defaultConfig = getKubeResourceConfigByType(kubeResourceConfigs.get(DEFAULT_VARIANT), key);
if (Objects.equals(key.variant, DEFAULT_VARIANT)) { // fast track
return defaultConfig;
}

// no match with exact variant found, try again with the default.
return getKubeResourceConfigByType(kubeResourceConfigs.get(DEFAULT_VARIANT), key);
final var variantConfig = getKubeResourceConfigByType(kubeResourceConfigs.get(key.variant), key);
if (defaultConfig.isEmpty()) {
return variantConfig;
}
return variantConfig.map(kubeResourceConfig -> kubeResourceConfig.merge(defaultConfig.get())).or(() -> defaultConfig);
}

private static Optional<KubeResourceConfig> getKubeResourceConfigByType(
Expand All @@ -300,14 +292,16 @@ private static Optional<KubeResourceConfig> getKubeResourceConfigByType(
return Optional.empty();
}

// Look up by actual type
final var resultWithType = getKubeResourceConfigBySubType(configs.get(key.type), key);
if (resultWithType.isPresent()) {
return resultWithType;
var defaultConfig = getKubeResourceConfigBySubType(configs.get(ResourceType.DEFAULT), key);
if (Objects.equals(key.type, ResourceType.DEFAULT)) { // fast track
return defaultConfig;
}

// no match with exact type found, try again with the default.
return getKubeResourceConfigBySubType(configs.get(ResourceType.DEFAULT), key);
final var typeConfig = getKubeResourceConfigBySubType(configs.get(key.type), key);
if (defaultConfig.isEmpty()) {
return typeConfig;
}
return typeConfig.map(kubeResourceConfig -> kubeResourceConfig.merge(defaultConfig.get())).or(() -> defaultConfig);
}

private static Optional<KubeResourceConfig> getKubeResourceConfigBySubType(final Map<ResourceSubType, KubeResourceConfig> configBySubType,
Expand All @@ -316,10 +310,16 @@ private static Optional<KubeResourceConfig> getKubeResourceConfigBySubType(final
return Optional.empty();
}

// Lookup by actual sub type
final var config = configBySubType.get(key.subType);
// if we didn't find a match, try again with the default
return Optional.ofNullable(config != null ? config : configBySubType.get(ResourceSubType.DEFAULT));
var defaultConfig = Optional.ofNullable(configBySubType.get(ResourceSubType.DEFAULT));
if (Objects.equals(key.subType, ResourceSubType.DEFAULT)) { // fast track
return defaultConfig;
}

final var subTypeConfig = Optional.ofNullable(configBySubType.get(key.subType));
if (defaultConfig.isEmpty()) {
return subTypeConfig;
}
return subTypeConfig.map(kubeResourceConfig -> kubeResourceConfig.merge(defaultConfig.get())).or(() -> defaultConfig);
}

private void validateIsolatedPoolConfigInitialization(final boolean useCustomNodeSelector, final Map<String, String> isolatedNodeSelectors) {
Expand Down Expand Up @@ -369,16 +369,12 @@ private Map<String, String> splitKVPairsFromEnvString(final String input) {
.collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim()));
}

private ResourceRequirements getResourceRequirementsFrom(final KubeResourceConfig kubeResourceConfig, final KubeResourceConfig defaultConfig) {
private ResourceRequirements getResourceRequirementsFrom(final KubeResourceConfig kubeResourceConfig) {
return new ResourceRequirements()
.withCpuLimit(useDefaultIfEmpty(kubeResourceConfig.getCpuLimit(), defaultConfig.getCpuLimit()))
.withCpuRequest(useDefaultIfEmpty(kubeResourceConfig.getCpuRequest(), defaultConfig.getCpuRequest()))
.withMemoryLimit(useDefaultIfEmpty(kubeResourceConfig.getMemoryLimit(), defaultConfig.getMemoryLimit()))
.withMemoryRequest(useDefaultIfEmpty(kubeResourceConfig.getMemoryRequest(), defaultConfig.getMemoryRequest()));
}

private static String useDefaultIfEmpty(final String value, final String defaultValue) {
return (value == null || value.isBlank()) ? defaultValue : value;
.withCpuLimit(kubeResourceConfig.getCpuLimit())
.withCpuRequest(kubeResourceConfig.getCpuRequest())
.withMemoryLimit(kubeResourceConfig.getMemoryLimit())
.withMemoryRequest(kubeResourceConfig.getMemoryRequest());
}
Comment on lines -372 to 378
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove custom fallbacks since we don't need them anymore


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.commons.workers.config;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import io.airbyte.commons.workers.config.WorkerConfigsProvider.ResourceType;
import io.airbyte.config.ResourceRequirements;
Expand Down Expand Up @@ -59,7 +58,7 @@ void verifyTestConfigIsLoaded() {
@Test
void testKubeConfigIsReadingAllTheFields() {
assertEquals("check", checkKubeResourceConfig.getName());
assertEquals("check annotations", checkKubeResourceConfig.getAnnotations());
assertEquals("check_annotation_key=check_annotation_value", checkKubeResourceConfig.getAnnotations());
assertEquals("check labels", checkKubeResourceConfig.getLabels());
assertEquals("check node-selectors", checkKubeResourceConfig.getNodeSelectors());
assertEquals("check cpu limit", checkKubeResourceConfig.getCpuLimit());
Expand All @@ -71,11 +70,11 @@ void testKubeConfigIsReadingAllTheFields() {
@Test
void testDefaultFieldBehavior() {
assertEquals("spec", specKubeResourceConfig.getName());
assertEquals("spec annotations", specKubeResourceConfig.getAnnotations());
assertEquals("", specKubeResourceConfig.getAnnotations());
assertEquals("spec labels", specKubeResourceConfig.getLabels());
assertEquals("spec node selectors", specKubeResourceConfig.getNodeSelectors());
assertNull(specKubeResourceConfig.getCpuLimit());
assertNull(specKubeResourceConfig.getCpuRequest());
assertEquals("", specKubeResourceConfig.getCpuLimit());
assertEquals("", specKubeResourceConfig.getCpuRequest());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We consider nulls as empty strings for KubeResourceConfig now

assertEquals("spec memory limit", specKubeResourceConfig.getMemoryLimit());
assertEquals("", specKubeResourceConfig.getMemoryRequest());
}
Expand All @@ -85,9 +84,9 @@ void checkWorkerConfigProvider() {
final WorkerConfigs specKubeConfig = workerConfigsProvider.getConfig(ResourceType.SPEC);

assertEquals("default cpu limit", specKubeConfig.getResourceRequirements().getCpuLimit());
assertEquals("", specKubeConfig.getResourceRequirements().getCpuRequest());
assertEquals("default cpu request", specKubeConfig.getResourceRequirements().getCpuRequest());
assertEquals("spec memory limit", specKubeConfig.getResourceRequirements().getMemoryLimit());
assertEquals("", specKubeConfig.getResourceRequirements().getMemoryRequest());
assertEquals("default memory request", specKubeConfig.getResourceRequirements().getMemoryRequest());
}

@Test
Expand Down Expand Up @@ -120,18 +119,22 @@ void testVariantLookups() {
testVariant);
final ResourceRequirements testSourceDatabase = workerConfigsProvider.getResourceRequirements(ResourceRequirementsType.SOURCE, Optional.of(
"database"), testVariant);
final WorkerConfigs checkConfig = workerConfigsProvider.getConfig(ResourceType.CHECK);
final WorkerConfigs specConfig = workerConfigsProvider.getConfig(ResourceType.SPEC);

// Testing the variant override lookup
assertEquals("5", testSourceApi.getCpuLimit());
assertEquals("10", testSourceDatabase.getCpuLimit());
assertEquals("default cpu limit", sourceApi.getCpuLimit());
assertEquals("default cpu limit", sourceDatabase.getCpuLimit());
assertEquals(Map.of("check_annotation_key", "check_annotation_value"), checkConfig.getWorkerKubeAnnotations());

// Verifying the default inheritance
assertEquals("0.5", sourceApi.getCpuRequest());
assertEquals("1", sourceDatabase.getCpuRequest());
assertEquals("", testSourceApi.getCpuRequest());
assertEquals("", testSourceDatabase.getCpuRequest());
assertEquals(Map.of("default_annotation_key", "default_annotation_value"), specConfig.getWorkerKubeAnnotations());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ airbyte:
worker:
kube-job-configs:
default:
annotations: default_annotation_key=default_annotation_value
cpu-limit: default cpu limit
cpu-request: ${JOB_MAIN_CONTAINER_CPU_REQUEST:}
memory-request: ${JOB_MAIN_CONTAINER_MEMORY_REQUEST:}
cpu-request: default cpu request
memory-request: default memory request
Comment on lines +10 to +11
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous test cases weren't consistent

check:
annotations: ${CHECK_JOB_KUBE_ANNOTATIONS:check annotations}
annotations: ${CHECK_JOB_KUBE_ANNOTATIONS:check_annotation_key=check_annotation_value}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't check annotation overrides earlier

labels: ${CHECK_JOB_KUBE_LABELS:check labels}
node-selectors: ${CHECK_JOB_KUBE_NODE_SELECTORS:check node-selectors}
cpu-limit: ${CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT:check cpu limit}
cpu-request: ${CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST:check cpu request}
memory-limit: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT:check mem limit}
memory-request: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST:check mem request}
spec:
annotations: ${SPEC_JOB_KUBE_ANNOTATIONS:spec annotations}
annotations: ${SPEC_JOB_KUBE_ANNOTATIONS:}
labels: ${SPEC_JOB_KUBE_LABELS:spec labels}
node-selectors: ${SPEC_JOB_KUBE_NODE_SELECTORS:spec node selectors}
memory-limit: spec memory limit
Expand All @@ -38,6 +39,7 @@ airbyte:
cpu-request: 42
micronauttest-source:
cpu-limit: 5
cpu-request: <EMPTY> # Disable inheritance, force empty value
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful: that's a breaking change

micronauttest-source-database:
cpu-limit: 10
mappingtest-source-stderr:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ airbyte:
kube-job-configs:
default:
annotations: ${JOB_KUBE_ANNOTATIONS:}
labels: ${JOB_KUBE_LABELS:}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node-selectors: ${JOB_KUBE_NODE_SELECTORS:}
cpu-limit: ${JOB_MAIN_CONTAINER_CPU_LIMIT:}
cpu-request: ${JOB_MAIN_CONTAINER_CPU_REQUEST:}
Expand Down
Loading