From 6715dc9515a63d733cb1cad9eeb39b11cb15ea66 Mon Sep 17 00:00:00 2001 From: Mike Herwig Date: Thu, 9 Jan 2025 16:46:48 +0100 Subject: [PATCH] refac: removed k8s informer code and streamlined k8s client creation --- .../listener/SubscriptionResourceEvent.java | 2 +- .../SubscriptionResourceEventBroadcaster.java | 2 +- .../kubernetes/InformerStoreInitHandler.java | 91 -------- .../kubernetes/InformerStoreInitSupport.java | 13 -- .../kubernetes/KubernetesClientBuilder.java | 74 +++++++ .../kubernetes/KubernetesClientWrapper.java | 88 -------- .../kubernetes/PodResourceListener.java | 106 --------- .../config/KubernetesProperties.java | 26 +++ .../kubernetes/util/InformerWrapper.java | 28 --- .../model/meta/CircuitBreakerMessage.java | 5 - .../subscription}/Subscription.java | 2 +- .../subscription}/SubscriptionResource.java | 2 +- .../SubscriptionResourceSpec.java | 2 +- .../SubscriptionResourceStatus.java | 2 +- .../subscription}/SubscriptionTrigger.java | 2 +- .../repository/MessageStateMongoRepo.java | 1 - .../{kubernetes => }/util/RoverToken.java | 2 +- .../cache/service/JsonCacheServiceTest.java | 5 +- .../pandora/horizon/cache/util/QueryTest.java | 2 +- .../KubernetesClientConfiguration.java | 202 ++---------------- .../cache/JsonCacheAutoconfiguration.java | 2 +- 21 files changed, 129 insertions(+), 530 deletions(-) delete mode 100644 horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/InformerStoreInitHandler.java delete mode 100644 horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/InformerStoreInitSupport.java create mode 100644 horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/KubernetesClientBuilder.java delete mode 100644 horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/KubernetesClientWrapper.java delete mode 100644 horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/PodResourceListener.java create mode 100644 horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/config/KubernetesProperties.java delete mode 100644 horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/util/InformerWrapper.java rename horizon-core/src/main/java/de/telekom/eni/pandora/horizon/{kubernetes/resource => model/subscription}/Subscription.java (95%) rename horizon-core/src/main/java/de/telekom/eni/pandora/horizon/{kubernetes/resource => model/subscription}/SubscriptionResource.java (87%) rename horizon-core/src/main/java/de/telekom/eni/pandora/horizon/{kubernetes/resource => model/subscription}/SubscriptionResourceSpec.java (87%) rename horizon-core/src/main/java/de/telekom/eni/pandora/horizon/{kubernetes/resource => model/subscription}/SubscriptionResourceStatus.java (84%) rename horizon-core/src/main/java/de/telekom/eni/pandora/horizon/{kubernetes/resource => model/subscription}/SubscriptionTrigger.java (91%) rename horizon-core/src/main/java/de/telekom/eni/pandora/horizon/{kubernetes => }/util/RoverToken.java (87%) diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/cache/listener/SubscriptionResourceEvent.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/cache/listener/SubscriptionResourceEvent.java index 9631ef3..c3b2440 100644 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/cache/listener/SubscriptionResourceEvent.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/cache/listener/SubscriptionResourceEvent.java @@ -6,7 +6,7 @@ import com.hazelcast.core.EntryEvent; import com.hazelcast.core.HazelcastJsonValue; -import de.telekom.eni.pandora.horizon.kubernetes.resource.SubscriptionResource; +import de.telekom.eni.pandora.horizon.model.subscription.SubscriptionResource; public class SubscriptionResourceEvent extends AbstractHazelcastJsonEvent { public SubscriptionResourceEvent(SubscriptionResource value, SubscriptionResource oldValue, EntryEvent entryEvent) { diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/cache/listener/SubscriptionResourceEventBroadcaster.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/cache/listener/SubscriptionResourceEventBroadcaster.java index b2020ba..6d0b2f2 100644 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/cache/listener/SubscriptionResourceEventBroadcaster.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/cache/listener/SubscriptionResourceEventBroadcaster.java @@ -8,7 +8,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.hazelcast.core.EntryEvent; import com.hazelcast.core.HazelcastJsonValue; -import de.telekom.eni.pandora.horizon.kubernetes.resource.SubscriptionResource; +import de.telekom.eni.pandora.horizon.model.subscription.SubscriptionResource; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/InformerStoreInitHandler.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/InformerStoreInitHandler.java deleted file mode 100644 index b442309..0000000 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/InformerStoreInitHandler.java +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2024 Deutsche Telekom IT GmbH -// -// SPDX-License-Identifier: Apache-2.0 - -package de.telekom.eni.pandora.horizon.kubernetes; - -import de.telekom.eni.pandora.horizon.kubernetes.util.InformerWrapper; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -@Slf4j -public class InformerStoreInitHandler { - - private static final int CACHE_SYNC_PHASE_TIMEOUT_MINUTES = 30; - - private final KubernetesClientWrapper kubernetesClientWrapper; - - private final AtomicBoolean fullySynced = new AtomicBoolean(false); - - @Getter - private ConcurrentHashMap initalSyncedStats = new ConcurrentHashMap<>(); - - public InformerStoreInitHandler(KubernetesClientWrapper kubernetesClientWrapper) { - this.kubernetesClientWrapper = kubernetesClientWrapper; - } - - public void handle(List informers) { - final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(informers); - - new Thread(() -> { - final int waitTimeMs = 1000; - var waitedMs = 0; - - while (!queue.isEmpty()) { - if (waitedMs >= CACHE_SYNC_PHASE_TIMEOUT_MINUTES * 60 * 1000) { - log.info("Cache syncing phase takes longer than {} minutes, we assume it's fully synced.", CACHE_SYNC_PHASE_TIMEOUT_MINUTES); - break; - } - - var informer = queue.poll(); - - if (!sync(informer)) { - queue.add(informer); - } - - try { - Thread.sleep(waitTimeMs); - } catch (InterruptedException ignored) { - break; - } - - waitedMs += waitTimeMs; - } - - if (!Thread.interrupted()) { - fullySynced.compareAndSet(false, true); - log.info("Cache syncing phase ended."); - } - }).start(); - } - - public boolean isFullySynced() { - return fullySynced.get(); - } - - private boolean sync(InformerWrapper informer) { - log.debug("Syncing {}", informer.getInformer().getApiTypeClass().getSimpleName()); - - try { - var items = kubernetesClientWrapper.get(informer.getInformer().getApiTypeClass(), informer.getNamespace()); - - Optional.ofNullable(informer.getEventHandler()).ifPresent(i -> { - i.addAll(items); - }); - - log.info("Synced {}", informer.getInformer().getApiTypeClass().getSimpleName()); - } catch (Exception e) { - log.error(e.getMessage()); - - return false; - } - - return true; - } -} diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/InformerStoreInitSupport.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/InformerStoreInitSupport.java deleted file mode 100644 index a4cf2a6..0000000 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/InformerStoreInitSupport.java +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2024 Deutsche Telekom IT GmbH -// -// SPDX-License-Identifier: Apache-2.0 - -package de.telekom.eni.pandora.horizon.kubernetes; - -import io.fabric8.kubernetes.api.model.HasMetadata; - -import java.util.List; - -public interface InformerStoreInitSupport { - void addAll(List list); -} diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/KubernetesClientBuilder.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/KubernetesClientBuilder.java new file mode 100644 index 0000000..8bf821c --- /dev/null +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/KubernetesClientBuilder.java @@ -0,0 +1,74 @@ +package de.telekom.eni.pandora.horizon.kubernetes; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; +import de.telekom.eni.pandora.horizon.exception.CouldNotConstructKubernetesClientException; +import de.telekom.eni.pandora.horizon.kubernetes.config.KubernetesProperties; +import de.telekom.eni.pandora.horizon.util.RoverToken; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Base64; + +public class KubernetesClientBuilder { + + private final KubernetesProperties kubernetesProperties; + + public KubernetesClientBuilder(KubernetesProperties kubernetesProperties) { + this.kubernetesProperties = kubernetesProperties; + } + + private Config parseRoverToken(String roverToken) throws JsonProcessingException { + byte[] decodedBytes = Base64.getDecoder().decode(roverToken); + + RoverToken tokenObject = new YAMLMapper().readValue(new String(decodedBytes), RoverToken.class); + + return new ConfigBuilder() + .withMasterUrl(tokenObject.getMasterUrl()) + .withOauthToken(tokenObject.getToken()) + .withCaCertData(tokenObject.getCaCertificate()) + .withDisableHostnameVerification(true) + .build(); + } + private Config withCustomSettings(Config config) { + config.setRequestTimeout(kubernetesProperties.getRequestTimeoutInMs()); + config.setConnectionTimeout(kubernetesProperties.getConnectionTimeoutMs()); + + return config; + } + + public KubernetesClient createClientFromRoverToken(String roverToken) throws CouldNotConstructKubernetesClientException { + try { + var config = parseRoverToken(roverToken); + + return new DefaultKubernetesClient(withCustomSettings(config)); + } catch (Exception e) { + throw new CouldNotConstructKubernetesClientException("Error: Rover token could not be parsed.", e); + } + } + + public KubernetesClient createClientFromeKubeconfigFile(String path) throws CouldNotConstructKubernetesClientException { + try { + var configFile = new File(path); + var configYAML = String.join("\n", Files.readAllLines(configFile.toPath(), StandardCharsets.UTF_8)); + + var config = Config.fromKubeconfig(configYAML); + + return new DefaultKubernetesClient(withCustomSettings(config)); + } catch (IOException e) { + throw new CouldNotConstructKubernetesClientException(String.format("Error: Kubernetes config %1s could not be processed.", path), e); + } + } + + public KubernetesClient createDefaultClient() { + var config = new ConfigBuilder().build(); + + return new DefaultKubernetesClient(withCustomSettings(config)); + } +} diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/KubernetesClientWrapper.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/KubernetesClientWrapper.java deleted file mode 100644 index 2344a09..0000000 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/KubernetesClientWrapper.java +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2024 Deutsche Telekom IT GmbH -// -// SPDX-License-Identifier: Apache-2.0 - -package de.telekom.eni.pandora.horizon.kubernetes; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; -import io.fabric8.kubernetes.api.model.ListMeta; -import io.fabric8.kubernetes.api.model.ListOptions; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; -import lombok.Getter; -import org.apache.commons.lang3.StringUtils; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -public class KubernetesClientWrapper { - - @Getter - private final KubernetesClient kubernetesClient; - - public KubernetesClientWrapper(KubernetesClient kubernetesClient) { - this.kubernetesClient = kubernetesClient; - } - - public int count(Class clazz) { - return count(clazz, null); - } - - public List get(Class clazz, String namespace) { - var list = new ArrayList(); - - var limit = 100L; - - var listOptions = new ListOptions(); - listOptions.setLimit(limit); - listOptions.setContinue(null); - - do { - KubernetesResourceList l; - try { - if (StringUtils.isNotBlank(namespace)) { - l = kubernetesClient.resources(clazz).inNamespace(namespace).list(listOptions); - } else { - l = kubernetesClient.resources(clazz).list(listOptions); - } - - list.addAll(l.getItems()); - - listOptions.setContinue(l.getMetadata().getContinue()); - } catch (KubernetesClientException e) { - if (e.getCode() != 404) { - throw e; - } - } - } while (StringUtils.isNotBlank(listOptions.getContinue())); - - return list; - } - - public int count(Class clazz, String namespace) { - var limit = 1L; - - var listOptions = new ListOptions(); - listOptions.setLimit(limit); - listOptions.setContinue(null); - - ListMeta listMeta; - try { - if (StringUtils.isNotBlank(namespace)) { - listMeta = kubernetesClient.resources(clazz).inNamespace(namespace).list(listOptions).getMetadata(); - } else { - listMeta = kubernetesClient.resources(clazz).list(listOptions).getMetadata(); - } - } catch (KubernetesClientException e) { - if (e.getCode() == 404) { - return 0; - } - - throw e; - } - - return Optional.ofNullable(listMeta.getRemainingItemCount()).map(n -> n + limit).orElse(0L).intValue(); - } -} diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/PodResourceListener.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/PodResourceListener.java deleted file mode 100644 index 8ed73a0..0000000 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/PodResourceListener.java +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2024 Deutsche Telekom IT GmbH -// -// SPDX-License-Identifier: Apache-2.0 - -package de.telekom.eni.pandora.horizon.kubernetes; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; -import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; -import io.fabric8.kubernetes.client.dsl.Resource; -import io.fabric8.kubernetes.client.dsl.base.OperationContext; -import io.fabric8.kubernetes.client.informers.ResourceEventHandler; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.fabric8.kubernetes.client.informers.SharedInformerFactory; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; - -import java.util.List; -import java.util.Map; - -@Slf4j -public class PodResourceListener { - - private SharedInformerFactory informerFactory; - - final KubernetesClient kubernetesClient; - - final ResourceEventHandler podHandler; - - final long resyncPeriodInMs; - - final String namespace; - - final String appName; - - - public PodResourceListener(KubernetesClient kubernetesClient, ResourceEventHandler podHandler, long resyncPeriodInMs, String namespace, String appName) { - this.kubernetesClient = kubernetesClient; - this.podHandler = podHandler; - this.resyncPeriodInMs = resyncPeriodInMs; - this.namespace = namespace; - this.appName = appName; - } - - public void init() { - informerFactory = kubernetesClient.informers(); - informerFactory.addSharedInformerEventListener(ex -> log.error("Exception occurred while starting the informers: {}", ex.getMessage())); - - SharedIndexInformer informer = createSharedIndexInformerFor(Pod.class, resyncPeriodInMs); - informer.addEventHandler(podHandler); - } - - public List getAllPods() { - var resources = kubernetesClient.resources(Pod.class); - NonNamespaceOperation, Resource> nonNamespaceOperation = null; - if (StringUtils.isNotBlank(namespace)) { - nonNamespaceOperation = resources.inNamespace(namespace); - } - - FilterWatchListDeletable> filterWatchListDeletable = null; - if (StringUtils.isNotBlank(appName)) { - if(nonNamespaceOperation != null) { - filterWatchListDeletable = nonNamespaceOperation.withLabels(Map.of("app", appName)); - } else { - filterWatchListDeletable = resources.withLabels(Map.of("app", appName)); - } - } - - if(filterWatchListDeletable != null) { - return filterWatchListDeletable.list().getItems(); - } else if(nonNamespaceOperation != null) { - return nonNamespaceOperation.list().getItems(); - } else { - return resources.list().getItems(); - } - } - - private SharedIndexInformer createSharedIndexInformerFor(Class clazz, Long timeToWait) { - var context = new OperationContext(); - if (StringUtils.isNotBlank(namespace)) { - log.info("Restricting pod watching to namespace {}", namespace); - context = context.withNamespace(namespace); - } - - if (StringUtils.isNotBlank(appName)) { - log.info("Restricting pod watching to labels app={}", appName); - context = context.withLabels(Map.of("app", appName)); - } - - - return informerFactory.sharedIndexInformerFor(clazz, context, timeToWait); - } - - public void start() { - log.info("Starting all registered pod informers"); - informerFactory.startAllRegisteredInformers(); - } - - public void stop() { - log.info("Stopping all registered pod informers"); - informerFactory.stopAllRegisteredInformers(false); - } -} diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/config/KubernetesProperties.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/config/KubernetesProperties.java new file mode 100644 index 0000000..d967bc5 --- /dev/null +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/config/KubernetesProperties.java @@ -0,0 +1,26 @@ +// Copyright 2024 Deutsche Telekom IT GmbH +// +// SPDX-License-Identifier: Apache-2.0 + +package de.telekom.eni.pandora.horizon.kubernetes.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@Data +@ConfigurationProperties("horizon.kubernetes") +public class KubernetesProperties { + + @Value("${rover.token:}") + private String roverToken; + + @Value("${kubeConfigPath:}") + private String kubeConfigPath; + + @Value("${requestTimeoutMs:120000}") + private int requestTimeoutInMs; + + @Value("${connectionTimeoutMs:120000}") + private int connectionTimeoutMs; +} diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/util/InformerWrapper.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/util/InformerWrapper.java deleted file mode 100644 index e9925c1..0000000 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/util/InformerWrapper.java +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2024 Deutsche Telekom IT GmbH -// -// SPDX-License-Identifier: Apache-2.0 - -package de.telekom.eni.pandora.horizon.kubernetes.util; - -import de.telekom.eni.pandora.horizon.kubernetes.InformerStoreInitSupport; -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import lombok.Getter; -import lombok.Setter; - -@Getter -public class InformerWrapper { - - - private final SharedIndexInformer informer; - - @Setter - private InformerStoreInitSupport eventHandler; - - private final String namespace; - - public InformerWrapper(SharedIndexInformer informer, String namespace) { - this.informer = informer; - this.namespace = namespace; - } -} diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/meta/CircuitBreakerMessage.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/meta/CircuitBreakerMessage.java index 0af5fac..aea51c6 100755 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/meta/CircuitBreakerMessage.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/meta/CircuitBreakerMessage.java @@ -6,14 +6,9 @@ import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import de.telekom.eni.pandora.horizon.model.common.Cacheable; -import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.Setter; -import java.io.Serial; -import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Date; diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/Subscription.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/Subscription.java similarity index 95% rename from horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/Subscription.java rename to horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/Subscription.java index db02673..271c100 100644 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/Subscription.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/Subscription.java @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 -package de.telekom.eni.pandora.horizon.kubernetes.resource; +package de.telekom.eni.pandora.horizon.model.subscription; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.EqualsAndHashCode; diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionResource.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionResource.java similarity index 87% rename from horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionResource.java rename to horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionResource.java index 44dd4f4..9992f86 100644 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionResource.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionResource.java @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 -package de.telekom.eni.pandora.horizon.kubernetes.resource; +package de.telekom.eni.pandora.horizon.model.subscription; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionResourceSpec.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionResourceSpec.java similarity index 87% rename from horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionResourceSpec.java rename to horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionResourceSpec.java index 591762c..3d1bd90 100644 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionResourceSpec.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionResourceSpec.java @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 -package de.telekom.eni.pandora.horizon.kubernetes.resource; +package de.telekom.eni.pandora.horizon.model.subscription; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionResourceStatus.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionResourceStatus.java similarity index 84% rename from horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionResourceStatus.java rename to horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionResourceStatus.java index dc699f5..1188a53 100644 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionResourceStatus.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionResourceStatus.java @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 -package de.telekom.eni.pandora.horizon.kubernetes.resource; +package de.telekom.eni.pandora.horizon.model.subscription; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionTrigger.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionTrigger.java similarity index 91% rename from horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionTrigger.java rename to horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionTrigger.java index 2cf8404..13613fe 100644 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/resource/SubscriptionTrigger.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/model/subscription/SubscriptionTrigger.java @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 -package de.telekom.eni.pandora.horizon.kubernetes.resource; +package de.telekom.eni.pandora.horizon.model.subscription; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import de.telekom.jsonfilter.operator.Operator; diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/mongo/repository/MessageStateMongoRepo.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/mongo/repository/MessageStateMongoRepo.java index bea389b..fb00368 100644 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/mongo/repository/MessageStateMongoRepo.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/mongo/repository/MessageStateMongoRepo.java @@ -7,7 +7,6 @@ import de.telekom.eni.pandora.horizon.model.event.DeliveryType; import de.telekom.eni.pandora.horizon.model.event.Status; import de.telekom.eni.pandora.horizon.mongo.model.MessageStateMongoDocument; -import org.bson.types.ObjectId; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; import org.springframework.data.mongodb.repository.MongoRepository; diff --git a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/util/RoverToken.java b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/util/RoverToken.java similarity index 87% rename from horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/util/RoverToken.java rename to horizon-core/src/main/java/de/telekom/eni/pandora/horizon/util/RoverToken.java index 3beaf8a..b0c4392 100644 --- a/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/kubernetes/util/RoverToken.java +++ b/horizon-core/src/main/java/de/telekom/eni/pandora/horizon/util/RoverToken.java @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 -package de.telekom.eni.pandora.horizon.kubernetes.util; +package de.telekom.eni.pandora.horizon.util; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; diff --git a/horizon-core/src/test/java/de/telekom/eni/pandora/horizon/cache/service/JsonCacheServiceTest.java b/horizon-core/src/test/java/de/telekom/eni/pandora/horizon/cache/service/JsonCacheServiceTest.java index 9d49735..57bc8c5 100644 --- a/horizon-core/src/test/java/de/telekom/eni/pandora/horizon/cache/service/JsonCacheServiceTest.java +++ b/horizon-core/src/test/java/de/telekom/eni/pandora/horizon/cache/service/JsonCacheServiceTest.java @@ -11,7 +11,10 @@ import de.telekom.eni.pandora.horizon.autoconfigure.cache.CacheAutoConfiguration; import de.telekom.eni.pandora.horizon.cache.util.Query; import de.telekom.eni.pandora.horizon.model.dummy.CacheDummy; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; diff --git a/horizon-core/src/test/java/de/telekom/eni/pandora/horizon/cache/util/QueryTest.java b/horizon-core/src/test/java/de/telekom/eni/pandora/horizon/cache/util/QueryTest.java index c53b7dc..4677d14 100644 --- a/horizon-core/src/test/java/de/telekom/eni/pandora/horizon/cache/util/QueryTest.java +++ b/horizon-core/src/test/java/de/telekom/eni/pandora/horizon/cache/util/QueryTest.java @@ -4,7 +4,7 @@ package de.telekom.eni.pandora.horizon.cache.util; -import de.telekom.eni.pandora.horizon.kubernetes.resource.Subscription; +import de.telekom.eni.pandora.horizon.model.subscription.Subscription; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; diff --git a/horizon-spring-boot-autoconfigure/src/main/java/de/telekom/eni/pandora/horizon/autoconfigure/KubernetesClientConfiguration.java b/horizon-spring-boot-autoconfigure/src/main/java/de/telekom/eni/pandora/horizon/autoconfigure/KubernetesClientConfiguration.java index df0b4f8..ae00d21 100644 --- a/horizon-spring-boot-autoconfigure/src/main/java/de/telekom/eni/pandora/horizon/autoconfigure/KubernetesClientConfiguration.java +++ b/horizon-spring-boot-autoconfigure/src/main/java/de/telekom/eni/pandora/horizon/autoconfigure/KubernetesClientConfiguration.java @@ -4,167 +4,51 @@ package de.telekom.eni.pandora.horizon.autoconfigure; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; import de.telekom.eni.pandora.horizon.exception.CouldNotConstructKubernetesClientException; -import de.telekom.eni.pandora.horizon.kubernetes.InformerStoreInitHandler; -import de.telekom.eni.pandora.horizon.kubernetes.KubernetesClientWrapper; -import de.telekom.eni.pandora.horizon.kubernetes.PodResourceListener; -import de.telekom.eni.pandora.horizon.kubernetes.resource.SubscriptionResource; -import de.telekom.eni.pandora.horizon.kubernetes.util.RoverToken; +import de.telekom.eni.pandora.horizon.kubernetes.KubernetesClientBuilder; +import de.telekom.eni.pandora.horizon.kubernetes.config.KubernetesProperties; import de.telekom.jsonfilter.operator.Operator; import de.telekom.jsonfilter.serde.OperatorDeserializer; import de.telekom.jsonfilter.serde.OperatorSerializer; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; -import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.utils.Serialization; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.Base64; -import java.util.stream.Collectors; - -@ConditionalOnProperty(value = "kubernetes.enabled") @Configuration +@ConditionalOnProperty(value = "horizon.kubernetes.enabled") +@EnableConfigurationProperties({KubernetesProperties.class}) @Slf4j public class KubernetesClientConfiguration { - private static final String KUBERNETES_DEV_CONFIG_FILE_PATH = "kubernetes/config/config.laptop-dev-dev-system"; - - @Value("${spring.profiles.active:}") - private String activeProfile; - - @Value("${kubernetes.rover.token:}") - private String roverToken; - - @Value("${kubernetes.kubeConfigPath:}") - private String kubeConfigPath; - - @Value("${kubernetes.informer.resyncperiod.ms:600000}") - private long resyncPeriodInMs; - - @Value("${kubernetes.informer.namespace:}") - private String namespace; - - @Value("${kubernetes.informer.pods.namespace:}") - private String podsNamespace; - - @Value("${kubernetes.informer.pods.appname:}") - private String appName; - - @Value("${kubernetes.requestTimeoutMs:120000}") - private int requestTimeoutInMs; - - @Value("${kubernetes.connectionTimeoutMs:120000}") - private int connectionTimeoutMs; - - private String getResourceFileAsString(String fileName) throws IOException { - var cl = Thread.currentThread().getContextClassLoader(); - - try (var is = cl.getResourceAsStream(fileName)) { - if (is == null) { - return null; - } - - try (var isr = new InputStreamReader(is, StandardCharsets.UTF_8); BufferedReader reader = new BufferedReader(isr)) { - return reader.lines().collect(Collectors.joining(System.lineSeparator())); - } - } - } - - private Config parseRoverToken(String roverToken) throws JsonProcessingException { - byte[] decodedBytes = Base64.getDecoder().decode(roverToken); - - RoverToken tokenObject = new YAMLMapper().readValue(new String(decodedBytes), RoverToken.class); - - return new ConfigBuilder() - .withMasterUrl(tokenObject.getMasterUrl()) - .withOauthToken(tokenObject.getToken()) - .withCaCertData(tokenObject.getCaCertificate()) - .withDisableHostnameVerification(true) - .build(); - } - - private KubernetesClient createClientFromRoverToken(String roverToken) throws CouldNotConstructKubernetesClientException { - try { - var config = parseRoverToken(roverToken); - - return new DefaultKubernetesClient(withCustomSettings(config)); - } catch (Exception e) { - throw new CouldNotConstructKubernetesClientException("Error: Rover token could not be parsed.", e); - } - } - - private KubernetesClient createClientFromBundledConfig() throws CouldNotConstructKubernetesClientException { - try { - var configYAML = getResourceFileAsString(KUBERNETES_DEV_CONFIG_FILE_PATH); - if (configYAML == null) { - throw new FileNotFoundException(String.format("Error: Kubernetes config %1s could not be found.", KUBERNETES_DEV_CONFIG_FILE_PATH)); - } - - var config = Config.fromKubeconfig(configYAML); - - return new DefaultKubernetesClient(withCustomSettings(config)); - } catch (Exception e) { - throw new CouldNotConstructKubernetesClientException(String.format("Error: Kubernetes config %1s could not be processed.", KUBERNETES_DEV_CONFIG_FILE_PATH), e); - } - } - - private KubernetesClient createClientFromeKubeconfigFile(String path) throws CouldNotConstructKubernetesClientException { - try { - var configFile = new File(path); - var configYAML = String.join("\n", Files.readAllLines(configFile.toPath(), StandardCharsets.UTF_8)); - - var config = Config.fromKubeconfig(configYAML); - - return new DefaultKubernetesClient(withCustomSettings(config)); - } catch (IOException e) { - throw new CouldNotConstructKubernetesClientException(String.format("Error: Kubernetes config %1s could not be processed.", path), e); - } - } - - private Config withCustomSettings(Config config) { - config.setRequestTimeout(requestTimeoutInMs); - config.setConnectionTimeout(connectionTimeoutMs); - - return config; - } - @Bean - public KubernetesClient kubernetesClient() throws CouldNotConstructKubernetesClientException { + public KubernetesClient kubernetesClient(KubernetesProperties kubernetesProperties) throws CouldNotConstructKubernetesClientException { + var builder = new KubernetesClientBuilder(kubernetesProperties); + KubernetesClient client; + var roverToken = kubernetesProperties.getRoverToken(); + var kubeConfigPath = kubernetesProperties.getKubeConfigPath(); + if (!StringUtils.isBlank(roverToken)) { - client = createClientFromRoverToken(roverToken); + client = builder.createClientFromRoverToken(roverToken); log.info("Using configured rover token for configuring the Kubernetes client"); } else if (!StringUtils.isBlank(kubeConfigPath)) { - client = createClientFromeKubeconfigFile(kubeConfigPath); + client = builder.createClientFromeKubeconfigFile(kubeConfigPath); log.info("Using configured Kubernetes config file ({}) for configuring the Kubernetes client", kubeConfigPath); - } else if ("dev".equals(activeProfile) || "test".equals(activeProfile)) { - client = createClientFromBundledConfig(); - log.info("Using bundled Kubernetes dev config"); } else { - var config = new ConfigBuilder().build(); - client = new DefaultKubernetesClient(withCustomSettings(config)); - + client = builder.createDefaultClient(); log.info("Using default Kubernetes config"); } log.info("Using cluster {}", client.getConfiguration().getMasterUrl()); + // used for serialize/deserialize filter operators in subscriptions SimpleModule operatorModule = new SimpleModule(); operatorModule.addDeserializer(Operator.class, new OperatorDeserializer()); operatorModule.addSerializer(Operator.class, new OperatorSerializer()); @@ -172,60 +56,4 @@ public KubernetesClient kubernetesClient() throws CouldNotConstructKubernetesCli return client; } - - @Bean - @ConditionalOnMissingBean(value = SubscriptionResource.class, parameterizedContainer = ResourceEventHandler.class) - public ResourceEventHandler defaultEventHandler() { - return new ResourceEventHandler<>() { - - @Override - public void onAdd(SubscriptionResource obj) { - log.warn("This is the default SubscriptionResource ResourceEventHandler!"); - } - - @Override - public void onUpdate(SubscriptionResource oldObj, SubscriptionResource newObj) { - log.warn("This is the default SubscriptionResource ResourceEventHandler!"); - } - - @Override - public void onDelete(SubscriptionResource obj, boolean deletedFinalStateUnknown) { - log.warn("This is the default SubscriptionResource ResourceEventHandler!"); - } - }; - } - - @Bean - @ConditionalOnMissingBean(value = Pod.class, parameterizedContainer = ResourceEventHandler.class) - public ResourceEventHandler defaultPodEventHandler() { - return new ResourceEventHandler<>() { - - @Override - public void onAdd(Pod obj) { - log.warn("This is the default Pod ResourceEventHandler!"); - } - - @Override - public void onUpdate(Pod oldObj, Pod newObj) { - log.warn("This is the default Pod ResourceEventHandler!"); - } - - @Override - public void onDelete(Pod obj, boolean deletedFinalStateUnknown) { - log.warn("This is the default Pod ResourceEventHandler!"); - } - }; - } - - @Bean - public InformerStoreInitHandler cacheInitHandler(KubernetesClient kubernetesClient) { - return new InformerStoreInitHandler(new KubernetesClientWrapper(kubernetesClient)); - } - - @Bean - public PodResourceListener podResourceListener(KubernetesClient kubernetesClient, ResourceEventHandler podEventHandler) { - var listener = new PodResourceListener(kubernetesClient, podEventHandler, resyncPeriodInMs, podsNamespace, appName); - listener.init(); - return listener; - } } \ No newline at end of file diff --git a/horizon-spring-boot-autoconfigure/src/main/java/de/telekom/eni/pandora/horizon/autoconfigure/cache/JsonCacheAutoconfiguration.java b/horizon-spring-boot-autoconfigure/src/main/java/de/telekom/eni/pandora/horizon/autoconfigure/cache/JsonCacheAutoconfiguration.java index d490d0b..50561a7 100644 --- a/horizon-spring-boot-autoconfigure/src/main/java/de/telekom/eni/pandora/horizon/autoconfigure/cache/JsonCacheAutoconfiguration.java +++ b/horizon-spring-boot-autoconfigure/src/main/java/de/telekom/eni/pandora/horizon/autoconfigure/cache/JsonCacheAutoconfiguration.java @@ -12,8 +12,8 @@ import com.hazelcast.map.IMap; import de.telekom.eni.pandora.horizon.cache.listener.SubscriptionResourceEventBroadcaster; import de.telekom.eni.pandora.horizon.cache.service.JsonCacheService; -import de.telekom.eni.pandora.horizon.kubernetes.resource.SubscriptionResource; import de.telekom.eni.pandora.horizon.model.meta.CircuitBreakerMessage; +import de.telekom.eni.pandora.horizon.model.subscription.SubscriptionResource; import de.telekom.jsonfilter.operator.Operator; import de.telekom.jsonfilter.serde.OperatorDeserializer; import de.telekom.jsonfilter.serde.OperatorSerializer;