Skip to content

Commit

Permalink
update pod watcher to handle all events occurring before the pod watc…
Browse files Browse the repository at this point in the history
…h is started
  • Loading branch information
gabemontero committed Jan 27, 2022
1 parent 4320ccf commit ff8e81d
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/shp/cmd/build/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestStartBuildRunFollowLog(t *testing.T) {
pod.Status.Phase = test.phase
cmd.follower.OnEvent(pod)
} else {
cmd.follower.OnNoPodEventsYet()
cmd.follower.OnNoPodEventsYet(nil)
}
checkLog(test.name, test.logText, cmd, out, t)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/shp/cmd/buildrun/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestStreamBuildRunFollowLogs(t *testing.T) {
pod.Status.Phase = test.phase
cmd.follower.OnEvent(pod)
} else {
cmd.follower.OnNoPodEventsYet()
cmd.follower.OnNoPodEventsYet(nil)
}
checkLog(test.name, test.logText, cmd, out, t)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/shp/cmd/follower/follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,13 @@ func (f *Follower) OnTimeout(msg string) {
}

// OnNoPodEventsYet reacts to the pod watcher telling us it has not received any pod events for our build run
func (f *Follower) OnNoPodEventsYet() {
func (f *Follower) OnNoPodEventsYet(podList *corev1.PodList) {
f.Log(fmt.Sprintf("BuildRun %q log following has not observed any pod events yet.\n", f.buildRun.Name))
if podList != nil && len(podList.Items) > 0 {
f.Log(fmt.Sprintf("BuildRun %q's Pod completed before the log following's watch was established.\n", f.buildRun.Name))
f.OnEvent(&podList.Items[0])
return
}
brClient := f.buildClientset.ShipwrightV1alpha1().BuildRuns(f.buildRun.Namespace)
br, err := brClient.Get(f.ctx, f.buildRun.Name, metav1.GetOptions{})
if err != nil {
Expand Down
15 changes: 12 additions & 3 deletions pkg/shp/reactor/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type PodWatcher struct {
clientset kubernetes.Interface
ns string
watcher watch.Interface // client watch instance
listOpts metav1.ListOptions

noPodEventsYetFn []NoPodEventsYetFn
toPodFn []TimeoutPodFn
Expand All @@ -51,8 +52,9 @@ type OnPodEventFn func(pod *corev1.Pod) error
// TimeoutPodFn when either the context or request timeout expires before the Pod finishes
type TimeoutPodFn func(msg string)

// NoPodEventsYetFn when the watch has not received the create event within a reasonable time
type NoPodEventsYetFn func()
// NoPodEventsYetFn when the watch has not received the create event within a reasonable time,
// where a PodList is also provided in the off chance the Pod completed before the Watch was started.
type NoPodEventsYetFn func(podList *corev1.PodList)

// WithSkipPodFn sets the skip function instance.
func (p *PodWatcher) WithSkipPodFn(fn SkipPodFn) *PodWatcher {
Expand Down Expand Up @@ -121,6 +123,7 @@ func (p *PodWatcher) handleEvent(pod *corev1.Pod, event watch.Event) error {
// Start runs the event loop based on a watch instantiated against informed pod. In case of errors
// the loop is interrupted.
func (p *PodWatcher) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) {
p.listOpts = listOpts
w, err := p.clientset.CoreV1().Pods(p.ns).Watch(p.ctx, listOpts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -178,8 +181,14 @@ func (p *PodWatcher) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) {
// a lot of the relevant constants in github.com/k8s/k8s, which is a hassle to vendor in, prototypes
// felt fragile
case <-p.eventTicker.C:
// for the narrow edge case where the final event for the Pod occurs before the
// watch can be established, we list the pods and if we find any, call noPodEventsYetFn.
// Reminder, if we do get events, this ticker is stopped/cancelled
podList, _ := p.clientset.CoreV1().Pods(p.ns).List(p.ctx, p.listOpts)
// no need to return the error here, calling the no pod events listener is more important and it
// more than likely will treat a nil/empty PodList the same regardless
for _, fn := range p.noPodEventsYetFn {
fn()
fn(podList)
}

// watching over stop channel to stop the event loop on demand.
Expand Down
61 changes: 60 additions & 1 deletion pkg/shp/reactor/pod_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package reactor

import (
"context"
kruntime "k8s.io/apimachinery/pkg/runtime"
fakekubetesting "k8s.io/client-go/testing"
"math"
"testing"
"time"
Expand Down Expand Up @@ -65,7 +67,7 @@ func Test_PodWatcher_NotCalledYet(t *testing.T) {
eventsDoneCh := make(chan bool, 1)

called := false
pw.WithNoPodEventsYetFn(func() {
pw.WithNoPodEventsYetFn(func(podList *corev1.PodList) {
called = true
eventsCh <- true
})
Expand All @@ -88,6 +90,63 @@ func Test_PodWatcher_NotCalledYet(t *testing.T) {
}
}

func Test_PodWatcher_NotCalledYet_AllEventsBeforeWatchStart(t *testing.T) {
// we separate this test out from the other events given the
// lazy check we have for not getting pod events
g := NewWithT(t)
ctx := context.TODO()

clientset := fake.NewSimpleClientset()
// set up lister that will return pod, but we don't create/update a Pod, thus we do not trigger
// events on the watch; mimics a Pod completing before the watch is established.
listReactorFunc := func(action fakekubetesting.Action) (handled bool, ret kruntime.Object, err error) {
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "pod",
},
}
return true, &corev1.PodList{Items: []corev1.Pod{pod}}, nil
}
clientset.PrependReactor("list", "pods", listReactorFunc)

pw, err := NewPodWatcher(ctx, math.MaxInt64, clientset, metav1.NamespaceDefault)
g.Expect(err).To(BeNil())

eventsCh := make(chan bool, 1)
eventsDoneCh := make(chan bool, 1)

noEventsCalled := false
podListProvided := false
pw.WithNoPodEventsYetFn(func(podList *corev1.PodList) {
noEventsCalled = true
if podList != nil {
podListProvided = true
}
eventsCh <- true
})

// executing the event loop in the background, and waiting for the stop channel before inspecting
// for errors
go func() {
_, err := pw.Start(metav1.ListOptions{})
<-pw.stopCh
g.Expect(err).To(BeNil())
eventsDoneCh <- true
}()

<-eventsCh
pw.Stop()
<-eventsDoneCh

if !noEventsCalled {
t.Fatal("noEventsCalled still false")
}
if !podListProvided {
t.Fatal("podListProvided still false")
}
}

func Test_PodWatcherEvents(t *testing.T) {
g := NewWithT(t)
ctx := context.TODO()
Expand Down

0 comments on commit ff8e81d

Please sign in to comment.