From 7362057c07c84ae1482beae94e44b4105098168d Mon Sep 17 00:00:00 2001 From: vircoys Date: Wed, 11 Dec 2024 17:23:50 +0800 Subject: [PATCH] Fix concurrency issues caused by Pipeline `cache` function --- pipeline/ptinput/plcache/cache.go | 3 +++ pipeline/ptinput/plcache/cache_test.go | 29 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/pipeline/ptinput/plcache/cache.go b/pipeline/ptinput/plcache/cache.go index 11ff718b..34197ad7 100644 --- a/pipeline/ptinput/plcache/cache.go +++ b/pipeline/ptinput/plcache/cache.go @@ -134,6 +134,9 @@ func (c *Cache) getPosAndCircle(d time.Duration) (pos, circle int) { } func (c *Cache) Get(key string) (any, bool, error) { + c.mu.RLock() + defer c.mu.RUnlock() + select { case <-c.stopChannel: return nil, false, ErrClosed diff --git a/pipeline/ptinput/plcache/cache_test.go b/pipeline/ptinput/plcache/cache_test.go index 366489d8..d1cd0ebf 100644 --- a/pipeline/ptinput/plcache/cache_test.go +++ b/pipeline/ptinput/plcache/cache_test.go @@ -1,6 +1,7 @@ package plcache import ( + "sync" "testing" "time" @@ -52,3 +53,31 @@ func TestCache_Stop(t *testing.T) { assert.NotNil(t, err1, "err1 is nil") assert.NotNil(t, err2, "err2 is nil") } + +func TestCache(t *testing.T) { + cache, _ := NewCache(time.Second, 10) + + times := 10_0000 + + var wg sync.WaitGroup + fnSet := func() { + for i := 0; i < times; i++ { + cache.Set("k", "v", time.Millisecond*10) + } + wg.Done() + } + + fnGet := func() { + for i := 0; i < times; i++ { + cache.Get("k1") + } + wg.Done() + } + + for i := 0; i < 50; i++ { + wg.Add(2) + go fnSet() + go fnGet() + } + wg.Wait() +}