Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix flakes in log following related unit tests #112

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions pkg/shp/cmd/build/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
type RunCommand struct {
cmd *cobra.Command // cobra command instance

buildName string
namespace string
buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags
follow bool // flag to tail pod logs
follower *follower.Follower
buildName string
namespace string
buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags
follow bool // flag to tail pod logs
follower *follower.Follower
followerReady chan bool
}

const buildRunLongDesc = `
Expand All @@ -52,6 +53,15 @@ func (r *RunCommand) Complete(params *params.Params, ioStreams *genericclioption

r.namespace = params.Namespace()

if r.follow {
var err error
// provide empty build run name; will be set in Run()
r.follower, err = params.NewFollower(r.cmd.Context(), types.NamespacedName{}, ioStreams)
if err != nil {
return err
}
r.followerReady = make(chan bool, 1)
}
// overwriting build-ref name to use what's on arguments
return r.Cmd().Flags().Set(flags.BuildrefNameFlag, r.buildName)
}
Expand All @@ -64,6 +74,16 @@ func (r *RunCommand) Validate() error {
return nil
}

// FollowerReady blocks until the any log following connections are established in the Run call.
// Useful if you have code that calls Run on a separate thread and coordination is needed.
func (r *RunCommand) FollowerReady() bool {
if !r.follow {
return false
}
_, closed := <-r.followerReady
return !closed
}

// Run creates a BuildRun resource based on Build's name informed on arguments.
func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOStreams) error {
// resource using GenerateName, which will provide a unique instance
Expand All @@ -90,15 +110,8 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
return nil
}

// during unit-testing the follower instance will be injected directly, which makes possible to
// simulate the pod events without creating a race condition
if r.follower == nil {
buildRun := types.NamespacedName{Namespace: r.namespace, Name: br.GetName()}
r.follower, err = params.NewFollower(ctx, buildRun, ioStreams)
if err != nil {
return err
}
}
buildRun := types.NamespacedName{Namespace: r.namespace, Name: br.GetName()}
r.follower.SetBuildRunName(buildRun)

// instantiating a pod watcher with a specific label-selector to find the indented pod where the
// actual build started by this subcommand is being executed, including the randomized buildrun
Expand All @@ -108,7 +121,12 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
r.buildName,
br.GetName(),
)}
_, err = r.follower.Start(listOpts)
err = r.follower.Connect(listOpts)
if err != nil {
return err
}
close(r.followerReady)
_, err = r.follower.WaitForCompletion()
return err
}

Expand Down
30 changes: 18 additions & 12 deletions pkg/shp/cmd/build/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"strings"
"testing"
"time"

buildv1alpha1 "github.com/shipwright-io/build/pkg/apis/build/v1alpha1"
shpfake "github.com/shipwright-io/build/pkg/client/clientset/versioned/fake"
Expand All @@ -15,7 +16,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes/fake"
fakekubetesting "k8s.io/client-go/testing"
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestStartBuildRunFollowLog(t *testing.T) {
},
{
name: "timeout",
to: "1s",
to: "1ms",
logText: reactor.RequestTimeoutMessage,
},
{
Expand Down Expand Up @@ -146,18 +146,10 @@ func TestStartBuildRunFollowLog(t *testing.T) {
if len(test.to) > 0 {
pm.Timeout = &test.to
}
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault)
failureDuration := 1 * time.Millisecond
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault, &failureDuration, &failureDuration)

ioStreams, _, out, _ := genericclioptions.NewTestIOStreams()
var err error
cmd.follower, err = param.NewFollower(
cmd.Cmd().Context(),
types.NamespacedName{Namespace: br.GetNamespace(), Name: br.GetName()},
&ioStreams,
)
if err != nil {
t.Fatalf("error instantiating follower: %q", err)
}

switch {
case test.cancelled:
Expand All @@ -184,6 +176,12 @@ func TestStartBuildRunFollowLog(t *testing.T) {
Status: corev1.ConditionFalse,
},
}
case test.phase == corev1.PodRunning:
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{StartedAt: metav1.Now()},
},
})
}

cmd.Complete(param, &ioStreams, []string{name})
Expand All @@ -200,6 +198,14 @@ func TestStartBuildRunFollowLog(t *testing.T) {
}
}()

// when employing the Run() method in a multi-threaded capacity, we must make sure
// the underlying Follower/PodWatcher watches are sync'ed and ready for use before
// we start populating the event queue
ready := cmd.FollowerReady()
if !ready {
t.Errorf("%s follower no ready", test.name)
}

if !test.noPodYet {
// mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful;
pod.Status.Phase = test.phase
Expand Down
2 changes: 1 addition & 1 deletion pkg/shp/cmd/buildrun/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestCancelBuildRun(t *testing.T) {

// set up context
cmd.Cmd().ExecuteC()
param := params.NewParamsForTest(nil, clientset, nil, metav1.NamespaceDefault)
param := params.NewParamsForTest(nil, clientset, nil, metav1.NamespaceDefault, nil, nil)

ioStreams, _, _, _ := genericclioptions.NewTestIOStreams()
err := cmd.Run(param, &ioStreams)
Expand Down
4 changes: 2 additions & 2 deletions pkg/shp/cmd/buildrun/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestStreamBuildLogs(t *testing.T) {

clientset := fake.NewSimpleClientset(pod)
ioStreams, _, out, _ := genericclioptions.NewTestIOStreams()
param := params.NewParamsForTest(clientset, nil, nil, metav1.NamespaceDefault)
param := params.NewParamsForTest(clientset, nil, nil, metav1.NamespaceDefault, nil, nil)
err := cmd.Run(param, &ioStreams)
if err != nil {
t.Fatalf("%s", err.Error())
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestStreamBuildRunFollowLogs(t *testing.T) {
if len(test.to) > 0 {
pm.Timeout = &test.to
}
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault)
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault, nil, nil)

ioStreams, _, out, _ := genericclioptions.NewTestIOStreams()

Expand Down
66 changes: 53 additions & 13 deletions pkg/shp/cmd/follower/follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type Follower struct {

logLock sync.Mutex // avoiding race condition to print logs
enteredRunningState bool // target pod is running

failPollInterval time.Duration // for use in the PollInterval call when processing failed pods
failPollTimeout time.Duration // for use in the PollInterval call when processing failed pods
}

// NewFollower returns a Follower instance.
Expand All @@ -56,9 +59,11 @@ func NewFollower(
clientset: clientset,
buildClientset: buildClientset,

logTail: tail.NewTail(ctx, clientset),
logLock: sync.Mutex{},
tailLogsStarted: map[string]bool{},
logTail: tail.NewTail(ctx, clientset),
logLock: sync.Mutex{},
tailLogsStarted: map[string]bool{},
failPollInterval: 1 * time.Second,
failPollTimeout: 15 * time.Second,
}

f.pw.WithOnPodModifiedFn(f.OnEvent)
Expand All @@ -68,6 +73,23 @@ func NewFollower(
return f
}

// SetBuildRunName allows for setting of the BuildRun name after to call to NewFollower. This help service
// auto generation of the BuildRun name from the Build. NOTE, if the BuildRun name
// is not set prior to the call to WaitForCompletion, the Follower will not function fully once events arrive.
func (f *Follower) SetBuildRunName(brName types.NamespacedName) {
f.buildRun = brName
}

// SetFailPollInterval overrides the default value used in polling calls
func (f *Follower) SetFailPollInterval(t time.Duration) {
f.failPollInterval = t
}

// SetFailPollTimeout overrides the default value used in polling calls
func (f *Follower) SetFailPollTimeout(t time.Duration) {
f.failPollTimeout = t
}

// GetLogLock returns the mutex used for coordinating access to log buffers.
func (f *Follower) GetLogLock() *sync.Mutex {
return &f.logLock
Expand Down Expand Up @@ -106,16 +128,20 @@ func (f *Follower) OnEvent(pod *corev1.Pod) error {
case corev1.PodRunning:
if !f.enteredRunningState {
f.Log(fmt.Sprintf("Pod %q in %q state, starting up log tail", pod.GetName(), corev1.PodRunning))
f.enteredRunningState = true
// graceful time to wait for container start
time.Sleep(3 * time.Second)
// start tailing container logs
f.tailLogs(pod)
for _, c := range pod.Status.ContainerStatuses {
if c.State.Running != nil && !c.State.Running.StartedAt.IsZero() {
f.enteredRunningState = true
break
}
}
if f.enteredRunningState {
f.tailLogs(pod)
}
}
case corev1.PodFailed:
msg := ""
var br *buildv1alpha1.BuildRun
err := wait.PollImmediate(1*time.Second, 15*time.Second, func() (done bool, err error) {
err := wait.PollImmediate(f.failPollInterval, f.failPollTimeout, func() (done bool, err error) {
brClient := f.buildClientset.ShipwrightV1alpha1().BuildRuns(pod.Namespace)
br, err = brClient.Get(f.ctx, f.buildRun.Name, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -205,6 +231,7 @@ func (f *Follower) OnNoPodEventsYet(podList *corev1.PodList) {
br, err := brClient.Get(f.ctx, f.buildRun.Name, metav1.GetOptions{})
if err != nil {
f.Log(fmt.Sprintf("error accessing BuildRun %q: %s", f.buildRun.Name, err.Error()))
f.Stop()
return
}

Expand All @@ -225,7 +252,7 @@ func (f *Follower) OnNoPodEventsYet(podList *corev1.PodList) {
giveUp = true
msg = fmt.Sprintf("BuildRun '%s' has been deleted.\n", br.Name)
case !br.HasStarted():
f.Log(fmt.Sprintf("BuildRun '%s' has been marked as failed.\n", br.Name))
f.Log(fmt.Sprintf("BuildRun '%s' has not been marked as started yet.\n", br.Name))
}
if giveUp {
f.Log(msg)
Expand All @@ -234,7 +261,20 @@ func (f *Follower) OnNoPodEventsYet(podList *corev1.PodList) {
}
}

// Start initiates the log following for the referenced BuildRun's Pod
func (f *Follower) Start(lo metav1.ListOptions) (*corev1.Pod, error) {
return f.pw.Start(lo)
func (f *Follower) Connect(lo metav1.ListOptions) error {
return f.pw.Connect(lo)
}

// WaitForCompletion initiates the log following for the referenced BuildRun's Pod
func (f *Follower) WaitForCompletion() (*corev1.Pod, error) {
return f.pw.WaitForCompletion()
}

// Start is a convenience method for capturing the use of both Connect and WaitForCompletion
func (f *Follower) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) {
err := f.Connect(listOpts)
if err != nil {
return nil, err
}
return f.WaitForCompletion()
}
22 changes: 18 additions & 4 deletions pkg/shp/params/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Params struct {

configFlags *genericclioptions.ConfigFlags
namespace string

failPollInterval *time.Duration
failPollTimeout *time.Duration
}

// AddFlags accepts flags and adds program global flags to it
Expand Down Expand Up @@ -181,6 +184,12 @@ func (p *Params) NewFollower(
}

p.follower = follower.NewFollower(ctx, br, ioStreams, pw, clientset, buildClientset)
if p.failPollTimeout != nil {
p.follower.SetFailPollTimeout(*p.failPollTimeout)
}
if p.failPollInterval != nil {
p.follower.SetFailPollInterval(*p.failPollInterval)
}
return p.follower, nil
}

Expand All @@ -198,11 +207,16 @@ func NewParamsForTest(clientset kubernetes.Interface,
shpClientset buildclientset.Interface,
configFlags *genericclioptions.ConfigFlags,
namespace string,
failPollInterval *time.Duration,
failPollTimeout *time.Duration,

) *Params {
return &Params{
clientset: clientset,
buildClientset: shpClientset,
configFlags: configFlags,
namespace: namespace,
clientset: clientset,
buildClientset: shpClientset,
configFlags: configFlags,
namespace: namespace,
failPollInterval: failPollInterval,
failPollTimeout: failPollTimeout,
}
}
24 changes: 20 additions & 4 deletions pkg/shp/reactor/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,22 @@ func (p *PodWatcher) handleEvent(pod *corev1.Pod, event watch.Event) error {
return nil
}

// Start runs the event loop based on a watch instantiated against informed pod. In case of errors
// the loop is interrupted.
func (p *PodWatcher) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) {
// Connect is the first of two methods called by Start, and it handles the creation of the watch based on the list options provided.
// Separating out Connect from Start helps deal with the fake k8s clients, which are used by the unit tests, and the capabilities of their Watch implementation.
func (p *PodWatcher) Connect(listOpts metav1.ListOptions) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to add a doc-comment explaining how the connect helps the multi-threading, etc.

p.listOpts = listOpts
w, err := p.clientset.CoreV1().Pods(p.ns).Watch(p.ctx, listOpts)
if err != nil {
return nil, err
return err
}
p.watcher = w
return nil
}

// WaitForCompletion is the second of two methods called by Start, and it runs the event loop based on the watch instantiated (by Connect) against informed pod. In case of errors
// the loop is interrupted. Separating out WaitForCompletion from Start helps deal with the fake k8s clients, which are used by the unit tests,
// and the capabilities of their Watch implementation.
func (p *PodWatcher) WaitForCompletion() (*corev1.Pod, error) {
for {
select {
// handling the regular pod modification events, which should trigger calling event functions
Expand Down Expand Up @@ -199,6 +206,15 @@ func (p *PodWatcher) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) {
}
}

// Start is a convenience method for capturing the use of both Connect and WaitForCompletion
func (p *PodWatcher) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) {
err := p.Connect(listOpts)
if err != nil {
return nil, err
}
return p.WaitForCompletion()
}

// Stop closes the stop channel, and stops the execution loop.
func (p *PodWatcher) Stop() {
// employ sync because of observed 'panic: close of closed channel' when running build run log following
Expand Down
Loading