From 42028f7c750a1e3244c73b1060a8e026d23e4932 Mon Sep 17 00:00:00 2001 From: Charlie Le Date: Mon, 6 Jan 2025 11:45:51 -0800 Subject: [PATCH] kv/etcd: Fix race condition within unit test for Watch method (#6479) Copied from https://github.com/grafana/dskit/pull/336 Fixes a race condition during setup for Client.Watch tests where a conditional variable broadcast was missed by the caller waiting on the conditional variable. Verified by running the tests many times with a timeout: ``` go test -timeout=10s -count=10000 -run=TestMockKV_Watch ./pkg/ring/kv/etcd/ ``` Fixes: - https://github.com/cortexproject/cortex/issues/6478 - https://github.com/cortexproject/cortex/issues/5564 Signed-off-by: Charlie Le --- pkg/ring/kv/etcd/mock_test.go | 44 +++++++++++++++++------------------ 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/pkg/ring/kv/etcd/mock_test.go b/pkg/ring/kv/etcd/mock_test.go index 58bf917528..4089aa9732 100644 --- a/pkg/ring/kv/etcd/mock_test.go +++ b/pkg/ring/kv/etcd/mock_test.go @@ -306,16 +306,20 @@ func TestMockKV_Watch(t *testing.T) { // returned wait group. setupWatchTest := func(key string, prefix bool) (*mockKV, context.CancelFunc, chan *clientv3.Event, *sync.WaitGroup) { kv := newMockKV() - // Use a condition to make sure the goroutine has started using the watch before + // Use a WaitGroup to make sure the goroutine has started using the watch before // we do anything to the mockKV that would emit an event the watcher is expecting - cond := sync.NewCond(&sync.Mutex{}) - wg := sync.WaitGroup{} + started := sync.WaitGroup{} + // Use another WaitGroup so that callers can tell when the channel returned by watch + // method is closed and the watch is complete. + complete := sync.WaitGroup{} + ch := make(chan *clientv3.Event) ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) + started.Add(1) + complete.Add(1) go func() { - defer wg.Done() + defer complete.Done() var ops []clientv3.OpOption if prefix { @@ -323,7 +327,7 @@ func TestMockKV_Watch(t *testing.T) { } watch := kv.Watch(ctx, key, ops...) - cond.Broadcast() + started.Done() for e := range watch { if len(e.Events) > 0 { @@ -332,33 +336,29 @@ func TestMockKV_Watch(t *testing.T) { } }() - // Wait for the watcher goroutine to start actually watching - cond.L.Lock() - cond.Wait() - cond.L.Unlock() - - return kv, cancel, ch, &wg + started.Wait() + return kv, cancel, ch, &complete } - t.Run("watch stopped by context", func(t *testing.T) { + t.Run("watch stopped by context", func(*testing.T) { // Ensure we can use the cancel method of the context given to the watch // to stop the watch - _, cancel, _, wg := setupWatchTest("/bar", false) + _, cancel, _, complete := setupWatchTest("/bar", false) cancel() - wg.Wait() + complete.Wait() }) - t.Run("watch stopped by close", func(t *testing.T) { + t.Run("watch stopped by close", func(*testing.T) { // Ensure we can use the Close method of the mockKV given to the watch // to stop the watch - kv, _, _, wg := setupWatchTest("/bar", false) + kv, _, _, complete := setupWatchTest("/bar", false) _ = kv.Close() - wg.Wait() + complete.Wait() }) t.Run("watch exact key", func(t *testing.T) { // watch for events with key "/bar" and send them via the channel - kv, cancel, ch, wg := setupWatchTest("/bar", false) + kv, cancel, ch, complete := setupWatchTest("/bar", false) _, err := kv.Put(context.Background(), "/foo", "1") require.NoError(t, err) @@ -371,12 +371,12 @@ func TestMockKV_Watch(t *testing.T) { assert.Equal(t, []byte("/bar"), event.Kv.Key) cancel() - wg.Wait() + complete.Wait() }) t.Run("watch prefix match", func(t *testing.T) { // watch for events with the prefix "/b" and send them via the channel - kv, cancel, ch, wg := setupWatchTest("/b", true) + kv, cancel, ch, complete := setupWatchTest("/b", true) _, err := kv.Delete(context.Background(), "/foo") require.NoError(t, err) @@ -389,6 +389,6 @@ func TestMockKV_Watch(t *testing.T) { assert.Equal(t, []byte("/bar"), event.Kv.Key) cancel() - wg.Wait() + complete.Wait() }) }