Skip to content

Commit

Permalink
kv/etcd: Fix race condition within unit test for Watch method (cortex…
Browse files Browse the repository at this point in the history
…project#6479)

Copied from grafana/dskit#336

Fixes a race condition during setup for Client.Watch tests where a
conditional variable broadcast was missed by the caller waiting on
the conditional variable.

Verified by running the tests many times with a timeout:

```
go test -timeout=10s -count=10000 -run=TestMockKV_Watch ./pkg/ring/kv/etcd/
```

Fixes:
- cortexproject#6478
- cortexproject#5564

Signed-off-by: Charlie Le <[email protected]>
  • Loading branch information
CharlieTLe authored Jan 6, 2025
1 parent a8ee1a2 commit 42028f7
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions pkg/ring/kv/etcd/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,24 +306,28 @@ func TestMockKV_Watch(t *testing.T) {
// returned wait group.
setupWatchTest := func(key string, prefix bool) (*mockKV, context.CancelFunc, chan *clientv3.Event, *sync.WaitGroup) {
kv := newMockKV()
// Use a condition to make sure the goroutine has started using the watch before
// Use a WaitGroup to make sure the goroutine has started using the watch before
// we do anything to the mockKV that would emit an event the watcher is expecting
cond := sync.NewCond(&sync.Mutex{})
wg := sync.WaitGroup{}
started := sync.WaitGroup{}
// Use another WaitGroup so that callers can tell when the channel returned by watch
// method is closed and the watch is complete.
complete := sync.WaitGroup{}

ch := make(chan *clientv3.Event)
ctx, cancel := context.WithCancel(context.Background())

wg.Add(1)
started.Add(1)
complete.Add(1)
go func() {
defer wg.Done()
defer complete.Done()

var ops []clientv3.OpOption
if prefix {
ops = []clientv3.OpOption{clientv3.WithPrefix()}
}

watch := kv.Watch(ctx, key, ops...)
cond.Broadcast()
started.Done()

for e := range watch {
if len(e.Events) > 0 {
Expand All @@ -332,33 +336,29 @@ func TestMockKV_Watch(t *testing.T) {
}
}()

// Wait for the watcher goroutine to start actually watching
cond.L.Lock()
cond.Wait()
cond.L.Unlock()

return kv, cancel, ch, &wg
started.Wait()
return kv, cancel, ch, &complete
}

t.Run("watch stopped by context", func(t *testing.T) {
t.Run("watch stopped by context", func(*testing.T) {
// Ensure we can use the cancel method of the context given to the watch
// to stop the watch
_, cancel, _, wg := setupWatchTest("/bar", false)
_, cancel, _, complete := setupWatchTest("/bar", false)
cancel()
wg.Wait()
complete.Wait()
})

t.Run("watch stopped by close", func(t *testing.T) {
t.Run("watch stopped by close", func(*testing.T) {
// Ensure we can use the Close method of the mockKV given to the watch
// to stop the watch
kv, _, _, wg := setupWatchTest("/bar", false)
kv, _, _, complete := setupWatchTest("/bar", false)
_ = kv.Close()
wg.Wait()
complete.Wait()
})

t.Run("watch exact key", func(t *testing.T) {
// watch for events with key "/bar" and send them via the channel
kv, cancel, ch, wg := setupWatchTest("/bar", false)
kv, cancel, ch, complete := setupWatchTest("/bar", false)

_, err := kv.Put(context.Background(), "/foo", "1")
require.NoError(t, err)
Expand All @@ -371,12 +371,12 @@ func TestMockKV_Watch(t *testing.T) {
assert.Equal(t, []byte("/bar"), event.Kv.Key)

cancel()
wg.Wait()
complete.Wait()
})

t.Run("watch prefix match", func(t *testing.T) {
// watch for events with the prefix "/b" and send them via the channel
kv, cancel, ch, wg := setupWatchTest("/b", true)
kv, cancel, ch, complete := setupWatchTest("/b", true)

_, err := kv.Delete(context.Background(), "/foo")
require.NoError(t, err)
Expand All @@ -389,6 +389,6 @@ func TestMockKV_Watch(t *testing.T) {
assert.Equal(t, []byte("/bar"), event.Kv.Key)

cancel()
wg.Wait()
complete.Wait()
})
}

0 comments on commit 42028f7

Please sign in to comment.