Skip to content

Commit

Permalink
feat: alternative approach on handling graceful shutdown and prestop …
Browse files Browse the repository at this point in the history
…hook
  • Loading branch information
mherwig committed Jan 9, 2025
1 parent e497007 commit eade874
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package de.telekom.horizon.pulsar;

import de.telekom.horizon.pulsar.actuator.StopActiveConnectionsEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

@Component
public class ApplicationShutdownListener implements ApplicationListener<ContextClosedEvent> {

private final ApplicationEventPublisher applicationEventPublisher;

public ApplicationShutdownListener(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

/*
Will execute even before any @PreStop invocation
*/
@Override
public void onApplicationEvent(ContextClosedEvent event) {
var message = "Got PreStop request. Terminating all connections...";

applicationEventPublisher.publishEvent(new StopActiveConnectionsEvent(this, message));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,78 @@

package de.telekom.horizon.pulsar.actuator;

import de.telekom.horizon.pulsar.config.PodConfig;
import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.client.KubernetesClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.WriteOperation;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@Endpoint(id = "horizon-prestop")
public class HorizonPreStopActuatorEndpoint {

private final ApplicationEventPublisher applicationEventPublisher;
private final PodConfig podConfig;

private final KubernetesClient kubernetesClient;

public HorizonPreStopActuatorEndpoint(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
public HorizonPreStopActuatorEndpoint(PodConfig podConfig, KubernetesClient kubernetesClient) {
this.podConfig = podConfig;
this.kubernetesClient = kubernetesClient;
}

@WriteOperation
public void handlePreStop() {
var event = new HorizonPreStopEvent(this, "Got PreStop request. Terminating all connections...");
applicationEventPublisher.publishEvent(event);
waitUntil(() -> !isEndpointRegistered(), 10, TimeUnit.SECONDS, 1000);
}

@FunctionalInterface
public interface CheckCondition {
boolean check();
}

public static void waitUntil(CheckCondition condition, long timeout, TimeUnit unit, long delayMillis) {
long timeoutNanos = unit.toNanos(timeout);
long startTime = System.nanoTime();

while (System.nanoTime() - startTime < timeoutNanos) {
try {
if (condition.check()) {
return; // Exit as soon as the condition is true
}
Thread.sleep(delayMillis); // Avoid busy-waiting
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
throw new RuntimeException("Thread was interrupted", e);
}
}
}

public boolean isEndpointRegistered() {
try {
var endpoints = kubernetesClient.endpoints().inNamespace(podConfig.getPodNamespace()).withName(podConfig.getServiceName()).get();

if (endpoints.getSubsets() != null) {
for (var subset : endpoints.getSubsets()) {
List<String> addresses = subset.getAddresses()
.stream()
.map(EndpointAddress::getIp)
.toList();

if (addresses.contains(podConfig.getPodIp())) {
return true; // Pod's IP exists in the endpoints list
}
}
}
} catch (Exception e) {
log.error("Could not check endpoint status", e);
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
// Copyright 2024 Deutsche Telekom IT GmbH
//
// SPDX-License-Identifier: Apache-2.0

package de.telekom.horizon.pulsar.actuator;

import org.springframework.context.ApplicationEvent;

public class HorizonPreStopEvent extends ApplicationEvent {
private String message;

public HorizonPreStopEvent(Object source, String message) {
super(source);
this.message = message;
}
public String getMessage() {
return message;
}
// Copyright 2024 Deutsche Telekom IT GmbH
//
// SPDX-License-Identifier: Apache-2.0

package de.telekom.horizon.pulsar.actuator;

import org.springframework.context.ApplicationEvent;

public class StopActiveConnectionsEvent extends ApplicationEvent {
private String message;

public StopActiveConnectionsEvent(Object source, String message) {
super(source);
this.message = message;
}
public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.hazelcast.topic.ITopic;
import com.hazelcast.topic.Message;
import com.hazelcast.topic.MessageListener;
import de.telekom.horizon.pulsar.actuator.HorizonPreStopEvent;
import de.telekom.horizon.pulsar.actuator.StopActiveConnectionsEvent;
import de.telekom.horizon.pulsar.helper.WorkerClaim;
import de.telekom.horizon.pulsar.service.SseTask;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void claimConnectionForSubscription(String subscriptionId, SseTask connec
}

@EventListener
public void handleHorizonPreStopEvent(HorizonPreStopEvent event) {
public void handleHorizonPreStopEvent(StopActiveConnectionsEvent event) {
log.info(event.getMessage());
terminateAllConnections();
}
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/de/telekom/horizon/pulsar/config/PodConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package de.telekom.horizon.pulsar.config;

import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Getter
@Configuration
public class PodConfig {

@Value("${pod.ip}")
private String podIp;

@Value("${pod.namespace}")
private String podNamespace;

@Value("${pod.service-name}")
private String serviceName;
}

0 comments on commit eade874

Please sign in to comment.