-
Notifications
You must be signed in to change notification settings - Fork 66
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add integration test for graceful shutdowns which release leader leases
- Loading branch information
Showing
4 changed files
with
259 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
// Copyright 2023 the Pinniped contributors. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package integration | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"io" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/utils/strings/slices" | ||
|
||
"go.pinniped.dev/test/testlib" | ||
) | ||
|
||
// TestPodShutdown_Disruptive is intended to test that the Supervisor and Concierge pods can | ||
// perform a graceful shutdown. Most importantly, the leader pods should give up their leases | ||
// before they die. | ||
// Never run this test in parallel since deleting the pods is disruptive, see main_test.go. | ||
func TestPodShutdown_Disruptive(t *testing.T) { | ||
env := testlib.IntegrationEnv(t, testlib.SkipPodRestartAssertions()) | ||
|
||
testShutdownAllPodsOfApp(t, env.ConciergeNamespace, env.ConciergeAppName, "-kube-cert-agent-") | ||
testShutdownAllPodsOfApp(t, env.SupervisorNamespace, env.SupervisorAppName, "") | ||
} | ||
|
||
func testShutdownAllPodsOfApp(t *testing.T, namespace string, appName string, ignorePodsWithNameSubstring string) { | ||
// The app should have some pods running initially. | ||
initialPods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring) | ||
require.Greater(t, len(initialPods), 0) | ||
|
||
// Make a list of pod names. | ||
initialPodNames := make([]string, len(initialPods)) | ||
for i, pod := range initialPods { | ||
initialPodNames[i] = pod.Name | ||
} | ||
|
||
// Start tailing the logs of all the pods in background goroutines. This struct will keep track | ||
// of each background log tail. | ||
type podLog struct { | ||
pod corev1.Pod | ||
tailDoneCh chan struct{} | ||
logsBuf *bytes.Buffer | ||
} | ||
podLogs := make([]*podLog, len(initialPods)) | ||
for i, pod := range initialPods { | ||
tailDoneCh, logTailBuf := tailFollowPodLogs(t, pod) | ||
podLogs[i] = &podLog{ | ||
pod: pod, | ||
tailDoneCh: tailDoneCh, | ||
logsBuf: logTailBuf, | ||
} | ||
} | ||
|
||
// The leader election lease should contain the name of one of the initial pods as the holder. | ||
waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName, | ||
func(holder string) bool { return holder != "" && slices.Contains(initialPodNames, holder) }, 2*time.Minute) | ||
|
||
// Scale down the deployment's number of replicas to 0, which will shut down all the pods. | ||
originalScale := updateDeploymentScale(t, namespace, appName, 0) | ||
|
||
t.Cleanup(func() { | ||
// When the test is over, restore the deployment to the original scale. | ||
updateDeploymentScale(t, namespace, appName, originalScale) | ||
|
||
// Wait for all the new pods to be running. | ||
var newPods []corev1.Pod | ||
testlib.RequireEventually(t, func(requireEventually *require.Assertions) { | ||
newPods = getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring) | ||
requireEventually.Len(newPods, originalScale, "wanted pods to return to original scale") | ||
}, 2*time.Minute, 200*time.Millisecond) | ||
|
||
// Make a list of pod names. | ||
newPodNames := make([]string, len(newPods)) | ||
for i, pod := range newPods { | ||
newPodNames[i] = pod.Name | ||
} | ||
|
||
// The leader election lease should contain the name of one of the new pods as the holder. | ||
waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName, | ||
func(holder string) bool { return holder != "" && slices.Contains(newPodNames, holder) }, 2*time.Minute) | ||
}) | ||
|
||
// The previous scale should have equaled the actual number of running pods from before we adjusted the scale. | ||
require.Equal(t, len(initialPods), originalScale) | ||
|
||
// Now that we have adjusted the scale to 0, the pods should go away. | ||
// Our pods are intended to gracefully shut down within a few seconds, so fail unless it happens fairly quickly. | ||
testlib.RequireEventually(t, func(requireEventually *require.Assertions) { | ||
pods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring) | ||
requireEventually.Len(pods, 0, "wanted no pods but found some") | ||
}, 20*time.Second, 200*time.Millisecond) | ||
|
||
// Look for some interesting log messages in each of the now-dead pod's logs. | ||
for _, pl := range podLogs { | ||
// Wait for the logs of the now-dead pod to be finished collecting. | ||
t.Logf("waiting for tail of pod logs for pod %q", pl.pod.Name) | ||
<-pl.tailDoneCh | ||
// Assert that the Kubernetes generic apiserver library has started and finished a graceful | ||
// shutdown according to its log messages. This is to make sure that the whole graceful shutdown | ||
// process was performed successfully and without being blocked. | ||
require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] shutdown event","name":"ShutdownInitiated"`, | ||
"did not find expected message in pod log for pod %q", pl.pod.Name) | ||
require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] apiserver is exiting\n"`, | ||
"did not find expected message in pod log for pod %q", pl.pod.Name) | ||
} | ||
|
||
// The leader election lease should already contain the empty string as the holder, because the old leader | ||
// pod should have given up the lease during its graceful shutdown. | ||
waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName, | ||
func(holder string) bool { return holder == "" }, 200*time.Millisecond) | ||
} | ||
|
||
func getRunningPodsByNamePrefix( | ||
t *testing.T, | ||
namespace string, | ||
podNamePrefix string, | ||
podNameExcludeSubstring string, | ||
) (foundPods []corev1.Pod) { | ||
t.Helper() | ||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) | ||
defer cancel() | ||
client := testlib.NewKubernetesClientset(t) | ||
|
||
pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) | ||
require.NoError(t, err) | ||
|
||
for _, pod := range pods.Items { | ||
if !strings.HasPrefix(pod.Name, podNamePrefix) { | ||
continue | ||
} | ||
if podNameExcludeSubstring != "" && strings.Contains(pod.Name, podNameExcludeSubstring) { | ||
continue | ||
} | ||
if pod.Status.Phase != corev1.PodRunning { | ||
continue | ||
} | ||
foundPods = append(foundPods, pod) | ||
} | ||
|
||
return foundPods | ||
} | ||
|
||
func updateDeploymentScale(t *testing.T, namespace string, deploymentName string, newScale int) int { | ||
t.Helper() | ||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) | ||
defer cancel() | ||
client := testlib.NewKubernetesClientset(t) | ||
|
||
initialScale, err := client.AppsV1().Deployments(namespace).GetScale(ctx, deploymentName, metav1.GetOptions{}) | ||
require.NoError(t, err) | ||
|
||
desiredScale := initialScale.DeepCopy() | ||
desiredScale.Spec.Replicas = int32(newScale) | ||
updatedScale, err := client.AppsV1().Deployments(namespace).UpdateScale(ctx, deploymentName, desiredScale, metav1.UpdateOptions{}) | ||
require.NoError(t, err) | ||
t.Logf("updated scale of Deployment %s/%s from %d to %d", | ||
namespace, deploymentName, initialScale.Spec.Replicas, updatedScale.Spec.Replicas) | ||
|
||
return int(initialScale.Spec.Replicas) | ||
} | ||
|
||
func tailFollowPodLogs(t *testing.T, pod corev1.Pod) (chan struct{}, *bytes.Buffer) { | ||
t.Helper() | ||
done := make(chan struct{}) | ||
var buf bytes.Buffer | ||
|
||
go func() { | ||
// At the end of this block, signal that we are done writing to the returned buf, | ||
// so it is now safe to read the logs from the returned buf. | ||
defer close(done) | ||
|
||
client := testlib.NewKubernetesClientset(t) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) | ||
defer cancel() | ||
|
||
req := client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{ | ||
Follow: true, // keep streaming until completion | ||
}) | ||
|
||
// This line should block until the pod dies or the context expires. | ||
body, err := req.Stream(ctx) | ||
assert.NoError(t, err) | ||
|
||
n, err := io.Copy(&buf, body) | ||
assert.NoError(t, err) | ||
assert.Greater(t, n, int64(0), "got empty pod log, but wanted log with content") | ||
|
||
assert.NoError(t, body.Close()) | ||
}() | ||
|
||
return done, &buf | ||
} | ||
|
||
func waitForLeaderElectionLeaseToHaveHolderIdentity( | ||
t *testing.T, | ||
namespace string, | ||
leaseName string, | ||
holderIdentityPredicate func(string) bool, | ||
waitDuration time.Duration, | ||
) { | ||
t.Helper() | ||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) | ||
defer cancel() | ||
client := testlib.NewKubernetesClientset(t) | ||
|
||
testlib.RequireEventually(t, func(requireEventually *require.Assertions) { | ||
lease, err := client.CoordinationV1().Leases(namespace).Get(ctx, leaseName, metav1.GetOptions{}) | ||
requireEventually.NoError(err) | ||
requireEventually.True(holderIdentityPredicate(*lease.Spec.HolderIdentity)) | ||
}, waitDuration, 200*time.Millisecond) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters