diff --git a/src/main/java/de/telekom/horizon/pulsar/ApplicationShutdownListener.java b/src/main/java/de/telekom/horizon/pulsar/ApplicationShutdownListener.java new file mode 100644 index 0000000..d7d3919 --- /dev/null +++ b/src/main/java/de/telekom/horizon/pulsar/ApplicationShutdownListener.java @@ -0,0 +1,27 @@ +package de.telekom.horizon.pulsar; + +import de.telekom.horizon.pulsar.actuator.StopActiveConnectionsEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.stereotype.Component; + +@Component +public class ApplicationShutdownListener implements ApplicationListener { + + private final ApplicationEventPublisher applicationEventPublisher; + + public ApplicationShutdownListener(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + + /* + Will execute even before any @PreStop invocation + */ + @Override + public void onApplicationEvent(ContextClosedEvent event) { + var message = "Got PreStop request. Terminating all connections..."; + + applicationEventPublisher.publishEvent(new StopActiveConnectionsEvent(this, message)); + } +} \ No newline at end of file diff --git a/src/main/java/de/telekom/horizon/pulsar/actuator/HorizonPreStopActuatorEndpoint.java b/src/main/java/de/telekom/horizon/pulsar/actuator/HorizonPreStopActuatorEndpoint.java index 737827d..1294b71 100644 --- a/src/main/java/de/telekom/horizon/pulsar/actuator/HorizonPreStopActuatorEndpoint.java +++ b/src/main/java/de/telekom/horizon/pulsar/actuator/HorizonPreStopActuatorEndpoint.java @@ -4,24 +4,78 @@ package de.telekom.horizon.pulsar.actuator; +import de.telekom.horizon.pulsar.config.PodConfig; +import io.fabric8.kubernetes.api.model.EndpointAddress; +import io.fabric8.kubernetes.client.KubernetesClient; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.actuate.endpoint.annotation.Endpoint; import org.springframework.boot.actuate.endpoint.annotation.WriteOperation; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Slf4j @Component @Endpoint(id = "horizon-prestop") public class HorizonPreStopActuatorEndpoint { - private final ApplicationEventPublisher applicationEventPublisher; + private final PodConfig podConfig; + + private final KubernetesClient kubernetesClient; - public HorizonPreStopActuatorEndpoint(ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; + public HorizonPreStopActuatorEndpoint(PodConfig podConfig, KubernetesClient kubernetesClient) { + this.podConfig = podConfig; + this.kubernetesClient = kubernetesClient; } @WriteOperation public void handlePreStop() { - var event = new HorizonPreStopEvent(this, "Got PreStop request. Terminating all connections..."); - applicationEventPublisher.publishEvent(event); + waitUntil(() -> !isEndpointRegistered(), 10, TimeUnit.SECONDS, 1000); + } + + @FunctionalInterface + public interface CheckCondition { + boolean check(); + } + + public static void waitUntil(CheckCondition condition, long timeout, TimeUnit unit, long delayMillis) { + long timeoutNanos = unit.toNanos(timeout); + long startTime = System.nanoTime(); + + while (System.nanoTime() - startTime < timeoutNanos) { + try { + if (condition.check()) { + return; // Exit as soon as the condition is true + } + Thread.sleep(delayMillis); // Avoid busy-waiting + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Restore the interrupted status + throw new RuntimeException("Thread was interrupted", e); + } + } + } + + public boolean isEndpointRegistered() { + try { + var endpoints = kubernetesClient.endpoints().inNamespace(podConfig.getPodNamespace()).withName(podConfig.getServiceName()).get(); + + if (endpoints.getSubsets() != null) { + for (var subset : endpoints.getSubsets()) { + List addresses = subset.getAddresses() + .stream() + .map(EndpointAddress::getIp) + .toList(); + + if (addresses.contains(podConfig.getPodIp())) { + return true; // Pod's IP exists in the endpoints list + } + } + } + } catch (Exception e) { + log.error("Could not check endpoint status", e); + } + + return false; } } \ No newline at end of file diff --git a/src/main/java/de/telekom/horizon/pulsar/actuator/HorizonPreStopEvent.java b/src/main/java/de/telekom/horizon/pulsar/actuator/StopActiveConnectionsEvent.java similarity index 70% rename from src/main/java/de/telekom/horizon/pulsar/actuator/HorizonPreStopEvent.java rename to src/main/java/de/telekom/horizon/pulsar/actuator/StopActiveConnectionsEvent.java index 8bf66a0..7012d3f 100644 --- a/src/main/java/de/telekom/horizon/pulsar/actuator/HorizonPreStopEvent.java +++ b/src/main/java/de/telekom/horizon/pulsar/actuator/StopActiveConnectionsEvent.java @@ -1,19 +1,19 @@ -// Copyright 2024 Deutsche Telekom IT GmbH -// -// SPDX-License-Identifier: Apache-2.0 - -package de.telekom.horizon.pulsar.actuator; - -import org.springframework.context.ApplicationEvent; - -public class HorizonPreStopEvent extends ApplicationEvent { - private String message; - - public HorizonPreStopEvent(Object source, String message) { - super(source); - this.message = message; - } - public String getMessage() { - return message; - } +// Copyright 2024 Deutsche Telekom IT GmbH +// +// SPDX-License-Identifier: Apache-2.0 + +package de.telekom.horizon.pulsar.actuator; + +import org.springframework.context.ApplicationEvent; + +public class StopActiveConnectionsEvent extends ApplicationEvent { + private String message; + + public StopActiveConnectionsEvent(Object source, String message) { + super(source); + this.message = message; + } + public String getMessage() { + return message; + } } \ No newline at end of file diff --git a/src/main/java/de/telekom/horizon/pulsar/cache/ConnectionCache.java b/src/main/java/de/telekom/horizon/pulsar/cache/ConnectionCache.java index d34f1b8..a478780 100644 --- a/src/main/java/de/telekom/horizon/pulsar/cache/ConnectionCache.java +++ b/src/main/java/de/telekom/horizon/pulsar/cache/ConnectionCache.java @@ -8,7 +8,7 @@ import com.hazelcast.topic.ITopic; import com.hazelcast.topic.Message; import com.hazelcast.topic.MessageListener; -import de.telekom.horizon.pulsar.actuator.HorizonPreStopEvent; +import de.telekom.horizon.pulsar.actuator.StopActiveConnectionsEvent; import de.telekom.horizon.pulsar.helper.WorkerClaim; import de.telekom.horizon.pulsar.service.SseTask; import lombok.extern.slf4j.Slf4j; @@ -94,7 +94,7 @@ public void claimConnectionForSubscription(String subscriptionId, SseTask connec } @EventListener - public void handleHorizonPreStopEvent(HorizonPreStopEvent event) { + public void handleHorizonPreStopEvent(StopActiveConnectionsEvent event) { log.info(event.getMessage()); terminateAllConnections(); } diff --git a/src/main/java/de/telekom/horizon/pulsar/config/PodConfig.java b/src/main/java/de/telekom/horizon/pulsar/config/PodConfig.java new file mode 100644 index 0000000..d08309e --- /dev/null +++ b/src/main/java/de/telekom/horizon/pulsar/config/PodConfig.java @@ -0,0 +1,19 @@ +package de.telekom.horizon.pulsar.config; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Getter +@Configuration +public class PodConfig { + + @Value("${pod.ip}") + private String podIp; + + @Value("${pod.namespace}") + private String podNamespace; + + @Value("${pod.service-name}") + private String serviceName; +}