Skip to content

Commit

Permalink
add integration test for graceful shutdowns which release leader leases
Browse files Browse the repository at this point in the history
  • Loading branch information
cfryanr committed Sep 25, 2023
1 parent ca6c29e commit 5e06c6d
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 20 deletions.
12 changes: 6 additions & 6 deletions test/integration/main_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2021 the Pinniped contributors. All Rights Reserved.
// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package integration
Expand Down Expand Up @@ -65,8 +65,8 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) {
serialTest := testing.InternalTest{
Name: "TestIntegrationSerial",
F: func(t *testing.T) {
_ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests
t.Parallel() // outer test always runs in parallel for this bucket
testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests
t.Parallel() // outer test always runs in parallel for this bucket

for _, test := range serialTests {
test := test
Expand All @@ -80,8 +80,8 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) {
parallelTest := testing.InternalTest{
Name: "TestIntegrationParallel",
F: func(t *testing.T) {
_ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests
t.Parallel() // outer test always runs in parallel for this bucket
testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests
t.Parallel() // outer test always runs in parallel for this bucket

for _, test := range parallelTests {
test := test
Expand All @@ -97,7 +97,7 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) {
disruptiveTest := testing.InternalTest{
Name: "TestIntegrationDisruptive",
F: func(t *testing.T) {
_ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests
testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests
// outer test never runs in parallel for this bucket

for _, test := range disruptiveTests {
Expand Down
232 changes: 232 additions & 0 deletions test/integration/pod_shutdown_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Copyright 2023 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"bytes"
"context"
"io"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/strings/slices"

"go.pinniped.dev/test/testlib"
)

// TestPodShutdown_Disruptive is intended to test that the Supervisor and Concierge pods can
// perform a graceful shutdown. Most importantly, the leader pods should give up their leases
// before they die.
// Never run this test in parallel since deleting the pods is disruptive, see main_test.go.
func TestPodShutdown_Disruptive(t *testing.T) {
// Only run this test in CI on Kind clusters, because something about restarting the pods
// in this test breaks the "kubectl port-forward" commands that we are using in CI for
// AKS, EKS, and GKE clusters. The Go code that we wrote for graceful pod shutdown should
// not be sensitive to which distribution it runs on, so running this test only on Kind
// should give us sufficient coverage for what we are trying to test here.
env := testlib.IntegrationEnv(t, testlib.SkipPodRestartAssertions()).
WithKubeDistribution(testlib.KindDistro)

testShutdownAllPodsOfApp(t, env, env.ConciergeNamespace, env.ConciergeAppName, "-kube-cert-agent-")
testShutdownAllPodsOfApp(t, env, env.SupervisorNamespace, env.SupervisorAppName, "")
}

func testShutdownAllPodsOfApp(t *testing.T, env *testlib.TestEnv, namespace string, appName string, ignorePodsWithNameSubstring string) {
// Precondition: the app should have some pods running initially.
initialPods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring)
require.Greater(t, len(initialPods), 0)

// Precondition: the leader election lease should contain the name of one of the initial pods as the lease's holder.
waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName,
func(holder string) bool { return holder != "" && slices.Contains(namesOfPods(initialPods), holder) }, 2*time.Minute)

// Start tailing the logs of all the pods in background goroutines. This struct will keep track
// of each background log tail.
type podLog struct {
pod corev1.Pod // which pod's logs are being tailed
tailDoneCh chan struct{} // this channel will be closed when it is safe to read from logsBuf
logsBuf *bytes.Buffer // the text of the logs will be put in this buffer
}
podLogs := make([]*podLog, 0)
// Skip tailing pod logs for test runs that are using alternate group suffixes. There seems to be a bug in our
// kubeclient package which causes an "unable to find resp serialier" (sic) error for pod log API responses when
// the middleware is active. Since we do not tail pod logs in production code (or anywhere else at this time),
// we don't need to fix that bug right now just for this test.
if env.APIGroupSuffix == "pinniped.dev" {
// For each pod, start tailing its logs.
for _, pod := range initialPods {
tailDoneCh, logTailBuf := tailFollowPodLogs(t, pod)
podLogs = append(podLogs, &podLog{
pod: pod,
tailDoneCh: tailDoneCh,
logsBuf: logTailBuf,
})
}
}

// Scale down the deployment's number of replicas to 0, which will shut down all the pods.
originalScale := updateDeploymentScale(t, namespace, appName, 0)

// When the test is over, restore the deployment to the original scale.
t.Cleanup(func() {
updateDeploymentScale(t, namespace, appName, originalScale)

// Wait for all the new pods to be running.
var newPods []corev1.Pod
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
newPods = getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring)
requireEventually.Len(newPods, originalScale, "wanted pods to return to original scale")
}, 2*time.Minute, 200*time.Millisecond)

// After a short time, leader election should have finished and the lease should contain the name of
// one of the new pods as the lease's holder.
waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName,
func(holder string) bool { return holder != "" && slices.Contains(namesOfPods(newPods), holder) }, 1*time.Minute)

t.Logf("new pod of Deployment %s/%s has acquired the leader election lease", namespace, appName)
})

// Double check: the deployment's previous scale should have equaled the actual number of running pods from
// the start of the test (before we scaled down).
require.Equal(t, len(initialPods), originalScale)

// Now that we have adjusted the scale to 0, the pods should go away.
// Our pods are intended to gracefully shut down within a few seconds, so fail unless it happens fairly quickly.
testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
pods := getRunningPodsByNamePrefix(t, namespace, appName+"-", ignorePodsWithNameSubstring)
requireEventually.Len(pods, 0, "wanted no pods but found some")
}, 20*time.Second, 200*time.Millisecond)

// Look for some interesting log messages in each of the now-dead pod's logs, if we started tailing them above.
for _, pl := range podLogs {
// Wait for the logs of the now-dead pod to be finished collecting.
t.Logf("waiting for tail of pod logs for pod %q", pl.pod.Name)
<-pl.tailDoneCh
// Assert that the Kubernetes generic apiserver library has started and finished a graceful
// shutdown according to its log messages. This is to make sure that the whole graceful shutdown
// process was performed successfully and without being blocked.
require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] shutdown event","name":"ShutdownInitiated"`,
"did not find expected message in pod log for pod %q", pl.pod.Name)
require.Containsf(t, pl.logsBuf.String(), `"[graceful-termination] apiserver is exiting\n"`,
"did not find expected message in pod log for pod %q", pl.pod.Name)
t.Logf("found expected graceful-termination messages in the logs of pod %q", pl.pod.Name)
}

// The leader election lease should already contain the empty string as the holder, because the old leader
// pod should have given up the lease during its graceful shutdown.
waitForLeaderElectionLeaseToHaveHolderIdentity(t, namespace, appName,
func(holder string) bool { return holder == "" }, 200*time.Millisecond)
}

// Given a list of pods, return a list of their names.
func namesOfPods(pods []corev1.Pod) []string {
names := make([]string, len(pods))
for i, pod := range pods {
names[i] = pod.Name
}
return names
}

func getRunningPodsByNamePrefix(
t *testing.T,
namespace string,
podNamePrefix string,
podNameExcludeSubstring string,
) (foundPods []corev1.Pod) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
client := testlib.NewKubernetesClientset(t)

pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
require.NoError(t, err)

for _, pod := range pods.Items {
if !strings.HasPrefix(pod.Name, podNamePrefix) {
continue
}
if podNameExcludeSubstring != "" && strings.Contains(pod.Name, podNameExcludeSubstring) {
continue
}
if pod.Status.Phase != corev1.PodRunning {
continue
}
foundPods = append(foundPods, pod)
}

return foundPods
}

func updateDeploymentScale(t *testing.T, namespace string, deploymentName string, newScale int) int {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
client := testlib.NewKubernetesClientset(t)

initialScale, err := client.AppsV1().Deployments(namespace).GetScale(ctx, deploymentName, metav1.GetOptions{})
require.NoError(t, err)

desiredScale := initialScale.DeepCopy()
desiredScale.Spec.Replicas = int32(newScale)
updatedScale, err := client.AppsV1().Deployments(namespace).UpdateScale(ctx, deploymentName, desiredScale, metav1.UpdateOptions{})
require.NoError(t, err)
t.Logf("updated scale of Deployment %s/%s from %d to %d",
namespace, deploymentName, initialScale.Spec.Replicas, updatedScale.Spec.Replicas)

return int(initialScale.Spec.Replicas)
}

func tailFollowPodLogs(t *testing.T, pod corev1.Pod) (chan struct{}, *bytes.Buffer) {
t.Helper()
done := make(chan struct{})
var buf bytes.Buffer
client := testlib.NewKubernetesClientset(t)

go func() {
// At the end of this block, signal that we are done writing to the returned buf,
// so it is now safe to read the logs from the returned buf.
defer close(done)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

req := client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{
Follow: true, // keep streaming until completion
})

// This line should block until the pod dies or the context expires.
body, err := req.Stream(ctx)
require.NoError(t, err)

_, err = io.Copy(&buf, body)
require.NoError(t, err)

require.NoError(t, body.Close())
}()

return done, &buf
}

func waitForLeaderElectionLeaseToHaveHolderIdentity(
t *testing.T,
namespace string,
leaseName string,
holderIdentityPredicate func(string) bool,
waitDuration time.Duration,
) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
client := testlib.NewKubernetesClientset(t)

testlib.RequireEventually(t, func(requireEventually *require.Assertions) {
lease, err := client.CoordinationV1().Leases(namespace).Get(ctx, leaseName, metav1.GetOptions{})
requireEventually.NoError(err)
requireEventually.True(holderIdentityPredicate(*lease.Spec.HolderIdentity))
}, waitDuration, 200*time.Millisecond)
}
45 changes: 34 additions & 11 deletions test/testlib/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
type TestEnv struct {
t *testing.T

skipPodRestartAssertions bool

ToolsNamespace string `json:"toolsNamespace"`
ConciergeNamespace string `json:"conciergeNamespace"`
SupervisorNamespace string `json:"supervisorNamespace"`
Expand Down Expand Up @@ -120,15 +122,28 @@ func (e *TestEnv) ProxyEnv() []string {
// environment parsing N times per test and so that any implicit assertions happen only once.
var memoizedTestEnvsByTest sync.Map //nolint:gochecknoglobals

type TestEnvOption func(env *TestEnv)

// SkipPodRestartAssertions is a functional option that can be passed to IntegrationEnv()
// to skip using the implicit assertions which check that no pods get restarted during tests.
// Please using this sparingly, since most pod restarts are caused by unintentional crashes
// and should therefore cause tests to fail.
func SkipPodRestartAssertions() TestEnvOption {
return func(t *TestEnv) {
t.skipPodRestartAssertions = true
t.t.Log("skipping pod restart assertions for test", t.t.Name())
}
}

// IntegrationEnv gets the integration test environment from OS environment variables. This
// method also implies SkipUnlessIntegration().
func IntegrationEnv(t *testing.T) *TestEnv {
func IntegrationEnv(t *testing.T, opts ...TestEnvOption) *TestEnv {
if existing, exists := memoizedTestEnvsByTest.Load(t); exists {
return existing.(*TestEnv)
}

t.Helper()
skipUnlessIntegration(t)
SkipUnlessIntegration(t)

capabilitiesDescriptionYAML := os.Getenv("PINNIPED_TEST_CLUSTER_CAPABILITY_YAML")
capabilitiesDescriptionFile := os.Getenv("PINNIPED_TEST_CLUSTER_CAPABILITY_FILE")
Expand All @@ -142,18 +157,26 @@ func IntegrationEnv(t *testing.T) *TestEnv {
require.NoError(t, err)
}

var result TestEnv
err := yaml.Unmarshal([]byte(capabilitiesDescriptionYAML), &result)
var testEnv TestEnv
err := yaml.Unmarshal([]byte(capabilitiesDescriptionYAML), &testEnv)
require.NoErrorf(t, err, "capabilities specification was invalid YAML")

loadEnvVars(t, &result)
result.t = t
memoizedTestEnvsByTest.Store(t, &result)
loadEnvVars(t, &testEnv)
testEnv.t = t

for _, opt := range opts {
opt(&testEnv)
}

memoizedTestEnvsByTest.Store(t, &testEnv)

// By default, in every integration test, assert that no pods in our namespaces restart during the test.
if !testEnv.skipPodRestartAssertions {
assertNoRestartsDuringTest(t, testEnv.ConciergeNamespace, "!pinniped.dev/test")
assertNoRestartsDuringTest(t, testEnv.SupervisorNamespace, "!pinniped.dev/test")
}

// In every integration test, assert that no pods in our namespaces restart during the test.
assertNoRestartsDuringTest(t, result.ConciergeNamespace, "!pinniped.dev/test")
assertNoRestartsDuringTest(t, result.SupervisorNamespace, "!pinniped.dev/test")
return &result
return &testEnv
}

func needEnv(t *testing.T, key string) string {
Expand Down
6 changes: 3 additions & 3 deletions test/testlib/skip.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright 2020-2022 the Pinniped contributors. All Rights Reserved.
// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package testlib

import "testing"

// skipUnlessIntegration skips the current test if `-short` has been passed to `go test`.
func skipUnlessIntegration(t *testing.T) {
// SkipUnlessIntegration skips the current test if `-short` has been passed to `go test`.
func SkipUnlessIntegration(t *testing.T) {
t.Helper()

if testing.Short() {
Expand Down

0 comments on commit 5e06c6d

Please sign in to comment.