Skip to content

Commit

Permalink
[release-v1.12] Wait for events with poll interval after finished eve…
Browse files Browse the repository at this point in the history
…nt received (knative#7668) (#540)

* Wait for events with poll interval after finished event received (knative#7668)

* Wait for events with poll interval after finished event received

* Use PollUntilContextCancel

* Change default timeout to 30 seconds

* Call cancel

* Simplify

* Use wait.PollImmediate

The function PollUntilContextTimeout is not yet available in
k8s.io/apimachinery v0.26.5

---------

Co-authored-by: Martin Gencur <[email protected]>
  • Loading branch information
openshift-cherrypick-robot and mgencur authored Feb 28, 2024
1 parent 6a034ac commit 6e70211
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 12 deletions.
3 changes: 2 additions & 1 deletion test/upgrade/prober/wathola/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func defaultValues() *Config {
Receiver: ReceiverConfig{
Port: port,
Teardown: ReceiverTeardownConfig{
Duration: 3 * time.Second,
Duration: 60 * time.Second,
Interval: 1 * time.Second,
},
Progress: ReceiverProgressConfig{
Duration: time.Second,
Expand Down
3 changes: 3 additions & 0 deletions test/upgrade/prober/wathola/config/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ func TestDefaultValues(t *testing.T) {
assert.Condition(t, func() (success bool) {
return Instance.Receiver.Teardown.Duration.Seconds() >= 1
})
assert.Condition(t, func() (success bool) {
return Instance.Receiver.Teardown.Interval.Seconds() >= 1
})
}
1 change: 1 addition & 0 deletions test/upgrade/prober/wathola/config/structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// ReceiverTeardownConfig holds config receiver teardown
type ReceiverTeardownConfig struct {
Duration time.Duration
Interval time.Duration
}

// ReceiverProgressConfig holds config receiver progress reporting
Expand Down
26 changes: 15 additions & 11 deletions test/upgrade/prober/wathola/event/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/eventing/test/upgrade/prober/wathola/config"
)

Expand Down Expand Up @@ -104,23 +105,26 @@ func (f *finishedStore) RegisterFinished(finished *Finished) {
f.eventsSent = finished.EventsSent
f.totalRequests = finished.TotalRequests
log.Infof("finish event received, expecting %d event ware propagated", finished.EventsSent)
d := config.Instance.Receiver.Teardown.Duration
log.Infof("waiting additional %v to be sure all events came", d)
time.Sleep(d)
receivedEvents := f.steps.Count()

if receivedEvents != finished.EventsSent &&
// If sending was interrupted, tolerate one more received
// event as there's no way to check if the last event is delivered or not.
!(finished.SendingInterrupted && receivedEvents == finished.EventsSent+1) {
timeout := config.Instance.Receiver.Teardown.Duration
interval := config.Instance.Receiver.Teardown.Interval

log.Infof("waiting additional %v to be sure all events came", timeout)

if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
return f.steps.Count() == finished.EventsSent ||
// If sending was interrupted, tolerate one more received
// event as there's no way to check if the last event is delivered or not.
(finished.SendingInterrupted && f.steps.Count() == finished.EventsSent+1), nil
}); err != nil {
f.errors.throwUnexpected("expecting to have %v unique events received, "+
"but received %v unique events", finished.EventsSent, receivedEvents)
"but received %v unique events", finished.EventsSent, f.steps.Count())
f.reportViolations(finished)
f.errors.state = Failed
} else {
log.Infof("properly received %d unique events", receivedEvents)
log.Infof("properly received %d unique events", f.steps.Count())
f.errors.state = Success
}

// check down time
for _, unavailablePeriod := range finished.UnavailablePeriods {
if unavailablePeriod.Period > config.Instance.Receiver.Errors.UnavailablePeriodToReport {
Expand Down

0 comments on commit 6e70211

Please sign in to comment.