diff --git a/pkg/shp/cmd/build/run.go b/pkg/shp/cmd/build/run.go index 9eac09c61..ed0b5f7bc 100644 --- a/pkg/shp/cmd/build/run.go +++ b/pkg/shp/cmd/build/run.go @@ -22,11 +22,12 @@ import ( type RunCommand struct { cmd *cobra.Command // cobra command instance - buildName string - namespace string - buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags - follow bool // flag to tail pod logs - follower *follower.Follower + buildName string + namespace string + buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags + follow bool // flag to tail pod logs + follower *follower.Follower + followerReady chan bool } const buildRunLongDesc = ` @@ -52,6 +53,15 @@ func (r *RunCommand) Complete(params *params.Params, ioStreams *genericclioption r.namespace = params.Namespace() + if r.follow { + var err error + // provide empty build run name; will be set in Run() + r.follower, err = params.NewFollower(r.cmd.Context(), types.NamespacedName{}, ioStreams) + if err != nil { + return err + } + r.followerReady = make(chan bool, 1) + } // overwriting build-ref name to use what's on arguments return r.Cmd().Flags().Set(flags.BuildrefNameFlag, r.buildName) } @@ -64,6 +74,16 @@ func (r *RunCommand) Validate() error { return nil } +// FollowerReady blocks until the any log following connections are established in the Run call. +// Useful if you have code that calls Run on a separate thread and coordination is needed. +func (r *RunCommand) FollowerReady() bool { + if !r.follow { + return false + } + _, closed := <-r.followerReady + return !closed +} + // Run creates a BuildRun resource based on Build's name informed on arguments. func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOStreams) error { // resource using GenerateName, which will provide a unique instance @@ -90,15 +110,8 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS return nil } - // during unit-testing the follower instance will be injected directly, which makes possible to - // simulate the pod events without creating a race condition - if r.follower == nil { - buildRun := types.NamespacedName{Namespace: r.namespace, Name: br.GetName()} - r.follower, err = params.NewFollower(ctx, buildRun, ioStreams) - if err != nil { - return err - } - } + buildRun := types.NamespacedName{Namespace: r.namespace, Name: br.GetName()} + r.follower.SetBuildRunName(buildRun) // instantiating a pod watcher with a specific label-selector to find the indented pod where the // actual build started by this subcommand is being executed, including the randomized buildrun @@ -108,7 +121,12 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS r.buildName, br.GetName(), )} - _, err = r.follower.Start(listOpts) + err = r.follower.Connect(listOpts) + if err != nil { + return err + } + close(r.followerReady) + _, err = r.follower.WaitForCompletion() return err } diff --git a/pkg/shp/cmd/build/run_test.go b/pkg/shp/cmd/build/run_test.go index 4d9125f00..fe4a97137 100644 --- a/pkg/shp/cmd/build/run_test.go +++ b/pkg/shp/cmd/build/run_test.go @@ -4,6 +4,7 @@ import ( "bytes" "strings" "testing" + "time" buildv1alpha1 "github.com/shipwright-io/build/pkg/apis/build/v1alpha1" shpfake "github.com/shipwright-io/build/pkg/client/clientset/versioned/fake" @@ -15,7 +16,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes/fake" fakekubetesting "k8s.io/client-go/testing" @@ -80,7 +80,7 @@ func TestStartBuildRunFollowLog(t *testing.T) { }, { name: "timeout", - to: "1s", + to: "1ms", logText: reactor.RequestTimeoutMessage, }, { @@ -146,18 +146,10 @@ func TestStartBuildRunFollowLog(t *testing.T) { if len(test.to) > 0 { pm.Timeout = &test.to } - param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault) + failureDuration := 1 * time.Millisecond + param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault, &failureDuration, &failureDuration) ioStreams, _, out, _ := genericclioptions.NewTestIOStreams() - var err error - cmd.follower, err = param.NewFollower( - cmd.Cmd().Context(), - types.NamespacedName{Namespace: br.GetNamespace(), Name: br.GetName()}, - &ioStreams, - ) - if err != nil { - t.Fatalf("error instantiating follower: %q", err) - } switch { case test.cancelled: @@ -184,6 +176,12 @@ func TestStartBuildRunFollowLog(t *testing.T) { Status: corev1.ConditionFalse, }, } + case test.phase == corev1.PodRunning: + pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{ + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{StartedAt: metav1.Now()}, + }, + }) } cmd.Complete(param, &ioStreams, []string{name}) @@ -200,6 +198,14 @@ func TestStartBuildRunFollowLog(t *testing.T) { } }() + // when employing the Run() method in a multi-threaded capacity, we must make sure + // the underlying Follower/PodWatcher watches are sync'ed and ready for use before + // we start populating the event queue + ready := cmd.FollowerReady() + if !ready { + t.Errorf("%s follower no ready", test.name) + } + if !test.noPodYet { // mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful; pod.Status.Phase = test.phase diff --git a/pkg/shp/cmd/buildrun/cancel_test.go b/pkg/shp/cmd/buildrun/cancel_test.go index 265e4f4b5..5c9aaf407 100644 --- a/pkg/shp/cmd/buildrun/cancel_test.go +++ b/pkg/shp/cmd/buildrun/cancel_test.go @@ -97,7 +97,7 @@ func TestCancelBuildRun(t *testing.T) { // set up context cmd.Cmd().ExecuteC() - param := params.NewParamsForTest(nil, clientset, nil, metav1.NamespaceDefault) + param := params.NewParamsForTest(nil, clientset, nil, metav1.NamespaceDefault, nil, nil) ioStreams, _, _, _ := genericclioptions.NewTestIOStreams() err := cmd.Run(param, &ioStreams) diff --git a/pkg/shp/cmd/buildrun/logs_test.go b/pkg/shp/cmd/buildrun/logs_test.go index 0a7cece54..24d6cea36 100644 --- a/pkg/shp/cmd/buildrun/logs_test.go +++ b/pkg/shp/cmd/buildrun/logs_test.go @@ -42,7 +42,7 @@ func TestStreamBuildLogs(t *testing.T) { clientset := fake.NewSimpleClientset(pod) ioStreams, _, out, _ := genericclioptions.NewTestIOStreams() - param := params.NewParamsForTest(clientset, nil, nil, metav1.NamespaceDefault) + param := params.NewParamsForTest(clientset, nil, nil, metav1.NamespaceDefault, nil, nil) err := cmd.Run(param, &ioStreams) if err != nil { t.Fatalf("%s", err.Error()) @@ -180,7 +180,7 @@ func TestStreamBuildRunFollowLogs(t *testing.T) { if len(test.to) > 0 { pm.Timeout = &test.to } - param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault) + param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault, nil, nil) ioStreams, _, out, _ := genericclioptions.NewTestIOStreams() diff --git a/pkg/shp/cmd/follower/follow.go b/pkg/shp/cmd/follower/follow.go index 949e0de30..a65d60134 100644 --- a/pkg/shp/cmd/follower/follow.go +++ b/pkg/shp/cmd/follower/follow.go @@ -37,6 +37,9 @@ type Follower struct { logLock sync.Mutex // avoiding race condition to print logs enteredRunningState bool // target pod is running + + failPollInterval time.Duration // for use in the PollInterval call when processing failed pods + failPollTimeout time.Duration // for use in the PollInterval call when processing failed pods } // NewFollower returns a Follower instance. @@ -56,9 +59,11 @@ func NewFollower( clientset: clientset, buildClientset: buildClientset, - logTail: tail.NewTail(ctx, clientset), - logLock: sync.Mutex{}, - tailLogsStarted: map[string]bool{}, + logTail: tail.NewTail(ctx, clientset), + logLock: sync.Mutex{}, + tailLogsStarted: map[string]bool{}, + failPollInterval: 1 * time.Second, + failPollTimeout: 15 * time.Second, } f.pw.WithOnPodModifiedFn(f.OnEvent) @@ -68,6 +73,23 @@ func NewFollower( return f } +// SetBuildRunName allows for setting of the BuildRun name after to call to NewFollower. This help service +// auto generation of the BuildRun name from the Build. NOTE, if the BuildRun name +// is not set prior to the call to WaitForCompletion, the Follower will not function fully once events arrive. +func (f *Follower) SetBuildRunName(brName types.NamespacedName) { + f.buildRun = brName +} + +// SetFailPollInterval overrides the default value used in polling calls +func (f *Follower) SetFailPollInterval(t time.Duration) { + f.failPollInterval = t +} + +// SetFailPollTimeout overrides the default value used in polling calls +func (f *Follower) SetFailPollTimeout(t time.Duration) { + f.failPollTimeout = t +} + // GetLogLock returns the mutex used for coordinating access to log buffers. func (f *Follower) GetLogLock() *sync.Mutex { return &f.logLock @@ -106,16 +128,20 @@ func (f *Follower) OnEvent(pod *corev1.Pod) error { case corev1.PodRunning: if !f.enteredRunningState { f.Log(fmt.Sprintf("Pod %q in %q state, starting up log tail", pod.GetName(), corev1.PodRunning)) - f.enteredRunningState = true - // graceful time to wait for container start - time.Sleep(3 * time.Second) - // start tailing container logs - f.tailLogs(pod) + for _, c := range pod.Status.ContainerStatuses { + if c.State.Running != nil && !c.State.Running.StartedAt.IsZero() { + f.enteredRunningState = true + break + } + } + if f.enteredRunningState { + f.tailLogs(pod) + } } case corev1.PodFailed: msg := "" var br *buildv1alpha1.BuildRun - err := wait.PollImmediate(1*time.Second, 15*time.Second, func() (done bool, err error) { + err := wait.PollImmediate(f.failPollInterval, f.failPollTimeout, func() (done bool, err error) { brClient := f.buildClientset.ShipwrightV1alpha1().BuildRuns(pod.Namespace) br, err = brClient.Get(f.ctx, f.buildRun.Name, metav1.GetOptions{}) if err != nil { @@ -205,6 +231,7 @@ func (f *Follower) OnNoPodEventsYet(podList *corev1.PodList) { br, err := brClient.Get(f.ctx, f.buildRun.Name, metav1.GetOptions{}) if err != nil { f.Log(fmt.Sprintf("error accessing BuildRun %q: %s", f.buildRun.Name, err.Error())) + f.Stop() return } @@ -225,7 +252,7 @@ func (f *Follower) OnNoPodEventsYet(podList *corev1.PodList) { giveUp = true msg = fmt.Sprintf("BuildRun '%s' has been deleted.\n", br.Name) case !br.HasStarted(): - f.Log(fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name)) + f.Log(fmt.Sprintf("BuildRun '%s' has not been marked as started yet.\n", br.Name)) } if giveUp { f.Log(msg) @@ -234,7 +261,20 @@ func (f *Follower) OnNoPodEventsYet(podList *corev1.PodList) { } } -// Start initiates the log following for the referenced BuildRun's Pod -func (f *Follower) Start(lo metav1.ListOptions) (*corev1.Pod, error) { - return f.pw.Start(lo) +func (f *Follower) Connect(lo metav1.ListOptions) error { + return f.pw.Connect(lo) +} + +// WaitForCompletion initiates the log following for the referenced BuildRun's Pod +func (f *Follower) WaitForCompletion() (*corev1.Pod, error) { + return f.pw.WaitForCompletion() +} + +// Start is a convenience method for capturing the use of both Connect and WaitForCompletion +func (f *Follower) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) { + err := f.Connect(listOpts) + if err != nil { + return nil, err + } + return f.WaitForCompletion() } diff --git a/pkg/shp/params/params.go b/pkg/shp/params/params.go index 9bb72712a..7d42add8d 100644 --- a/pkg/shp/params/params.go +++ b/pkg/shp/params/params.go @@ -48,6 +48,9 @@ type Params struct { configFlags *genericclioptions.ConfigFlags namespace string + + failPollInterval *time.Duration + failPollTimeout *time.Duration } // AddFlags accepts flags and adds program global flags to it @@ -181,6 +184,12 @@ func (p *Params) NewFollower( } p.follower = follower.NewFollower(ctx, br, ioStreams, pw, clientset, buildClientset) + if p.failPollTimeout != nil { + p.follower.SetFailPollTimeout(*p.failPollTimeout) + } + if p.failPollInterval != nil { + p.follower.SetFailPollInterval(*p.failPollInterval) + } return p.follower, nil } @@ -198,11 +207,16 @@ func NewParamsForTest(clientset kubernetes.Interface, shpClientset buildclientset.Interface, configFlags *genericclioptions.ConfigFlags, namespace string, + failPollInterval *time.Duration, + failPollTimeout *time.Duration, + ) *Params { return &Params{ - clientset: clientset, - buildClientset: shpClientset, - configFlags: configFlags, - namespace: namespace, + clientset: clientset, + buildClientset: shpClientset, + configFlags: configFlags, + namespace: namespace, + failPollInterval: failPollInterval, + failPollTimeout: failPollTimeout, } } diff --git a/pkg/shp/reactor/pod_watcher.go b/pkg/shp/reactor/pod_watcher.go index df565c57b..ab08ddbb2 100644 --- a/pkg/shp/reactor/pod_watcher.go +++ b/pkg/shp/reactor/pod_watcher.go @@ -120,15 +120,22 @@ func (p *PodWatcher) handleEvent(pod *corev1.Pod, event watch.Event) error { return nil } -// Start runs the event loop based on a watch instantiated against informed pod. In case of errors -// the loop is interrupted. -func (p *PodWatcher) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) { +// Connect is the first of two methods called by Start, and it handles the creation of the watch based on the list options provided. +// Separating out Connect from Start helps deal with the fake k8s clients, which are used by the unit tests, and the capabilities of their Watch implementation. +func (p *PodWatcher) Connect(listOpts metav1.ListOptions) error { p.listOpts = listOpts w, err := p.clientset.CoreV1().Pods(p.ns).Watch(p.ctx, listOpts) if err != nil { - return nil, err + return err } p.watcher = w + return nil +} + +// WaitForCompletion is the second of two methods called by Start, and it runs the event loop based on the watch instantiated (by Connect) against informed pod. In case of errors +// the loop is interrupted. Separating out WaitForCompletion from Start helps deal with the fake k8s clients, which are used by the unit tests, +// and the capabilities of their Watch implementation. +func (p *PodWatcher) WaitForCompletion() (*corev1.Pod, error) { for { select { // handling the regular pod modification events, which should trigger calling event functions @@ -199,6 +206,15 @@ func (p *PodWatcher) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) { } } +// Start is a convenience method for capturing the use of both Connect and WaitForCompletion +func (p *PodWatcher) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) { + err := p.Connect(listOpts) + if err != nil { + return nil, err + } + return p.WaitForCompletion() +} + // Stop closes the stop channel, and stops the execution loop. func (p *PodWatcher) Stop() { // employ sync because of observed 'panic: close of closed channel' when running build run log following diff --git a/pkg/shp/reactor/pod_watcher_test.go b/pkg/shp/reactor/pod_watcher_test.go index a63cb309b..543ad4382 100644 --- a/pkg/shp/reactor/pod_watcher_test.go +++ b/pkg/shp/reactor/pod_watcher_test.go @@ -20,7 +20,7 @@ func Test_PodWatcher_RequestTimeout(t *testing.T) { clientset := fake.NewSimpleClientset() - pw, err := NewPodWatcher(ctx, time.Second, clientset, metav1.NamespaceDefault) + pw, err := NewPodWatcher(ctx, time.Millisecond, clientset, metav1.NamespaceDefault) g.Expect(err).To(BeNil()) called := false @@ -35,7 +35,7 @@ func Test_PodWatcher_RequestTimeout(t *testing.T) { func Test_PodWatcher_ContextTimeout(t *testing.T) { g := NewWithT(t) ctx := context.TODO() - ctxWithDeadline, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second)) + ctxWithDeadline, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond)) defer cancel() clientset := fake.NewSimpleClientset() @@ -157,7 +157,6 @@ func Test_PodWatcherEvents(t *testing.T) { g.Expect(err).To(BeNil()) eventsCh := make(chan string, 5) - eventsDoneCh := make(chan bool, 1) skipPODFn := "SkipPodFn" onPodAddedFn := "OnPodAddedFn" @@ -180,13 +179,17 @@ func Test_PodWatcherEvents(t *testing.T) { return nil }) + // with the multi-threaded nature of this test, and the lack of thread safety and reliability of the k8s fake watch clients, + // we cannot use pw.Start directly, and must call what it calls separately + err = pw.Connect(metav1.ListOptions{}) + g.Expect(err).To(BeNil()) + // executing the event loop in the background, and waiting for the stop channel before inspecting // for errors go func() { - _, err := pw.Start(metav1.ListOptions{}) + _, err := pw.WaitForCompletion() <-pw.stopCh g.Expect(err).To(BeNil()) - eventsDoneCh <- true }() pod := &corev1.Pod{ @@ -197,39 +200,46 @@ func Test_PodWatcherEvents(t *testing.T) { } // making modifications in the pod, making sure all events are exercised, thus the events channel - // should be populated + // should be populated; also, we send and receive events in a single threaded fashion given observed fragility + // with the k8s fake client watch implementation, as well as guidance on the limited scope of its intent podClient := clientset.CoreV1().Pods(metav1.NamespaceDefault) - t.Run("pod-is-added", func(t *testing.T) { - var err error - pod, err = podClient.Create(ctx, pod, metav1.CreateOptions{}) - g.Expect(err).To(BeNil()) - }) + pod, err = podClient.Create(ctx, pod, metav1.CreateOptions{}) + g.Expect(err).To(BeNil()) - t.Run("pod-is-modified", func(t *testing.T) { - pod.SetLabels(map[string]string{"label": "value"}) + val, ok := <-eventsCh + validateEventChannelData(val, skipPODFn, "add", ok, t) + val, ok = <-eventsCh + validateEventChannelData(val, onPodAddedFn, "add", ok, t) - var err error - pod, err = podClient.Update(ctx, pod, metav1.UpdateOptions{}) - g.Expect(err).To(BeNil()) - }) + pod.SetLabels(map[string]string{"label": "value"}) - t.Run("pod-is-deleted", func(t *testing.T) { - err := podClient.Delete(ctx, pod.GetName(), metav1.DeleteOptions{}) - g.Expect(err).To(BeNil()) - }) + pod, err = podClient.Update(ctx, pod, metav1.UpdateOptions{}) + g.Expect(err).To(BeNil()) - // stopping event-loop running in the background, after waiting for events to arrive on events - // channel, and before running assertions, it waits for eventsDoneCh as well - <-eventsCh + val, ok = <-eventsCh + validateEventChannelData(val, skipPODFn, "mod", ok, t) + val, ok = <-eventsCh + validateEventChannelData(val, onPodModifiedFn, "mod", ok, t) + + err = podClient.Delete(ctx, pod.GetName(), metav1.DeleteOptions{}) + g.Expect(err).To(BeNil()) + + val, ok = <-eventsCh + validateEventChannelData(val, skipPODFn, "del", ok, t) + val, ok = <-eventsCh + validateEventChannelData(val, onPodDeletedFn, "del", ok, t) + + close(eventsCh) pw.Stop() - <-eventsDoneCh - // asserting that all events have been exercised, by inspecting the function names sent over the - // events channel - g.Eventually(eventsCh).Should(Receive(&skipPODFn)) - g.Eventually(eventsCh).Should(Receive(&onPodAddedFn)) - g.Eventually(eventsCh).Should(Receive(&onPodModifiedFn)) - // sometimes it is slow to get these when running go test with race detection - g.Eventually(eventsCh, 10*time.Second).Should(Receive(&onPodDeletedFn)) +} + +func validateEventChannelData(got, expected, verb string, ok bool, t *testing.T) { + if !ok { + t.Fatalf("test channel closed unexpectedly on %s", verb) + } + if got != expected { + t.Fatalf("test channel %s value was %s instead of %s", verb, got, expected) + } }