Skip to content

Commit

Permalink
Fix concurrency issues caused by Pipeline cache function (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
vircoys authored Dec 11, 2024
1 parent 2793865 commit ef1c166
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pipeline/ptinput/plcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func (c *Cache) getPosAndCircle(d time.Duration) (pos, circle int) {
}

func (c *Cache) Get(key string) (any, bool, error) {
c.mu.RLock()
defer c.mu.RUnlock()

select {
case <-c.stopChannel:
return nil, false, ErrClosed
Expand Down
29 changes: 29 additions & 0 deletions pipeline/ptinput/plcache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package plcache

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -52,3 +53,31 @@ func TestCache_Stop(t *testing.T) {
assert.NotNil(t, err1, "err1 is nil")
assert.NotNil(t, err2, "err2 is nil")
}

func TestCache(t *testing.T) {
cache, _ := NewCache(time.Second, 10)

times := 10_0000

var wg sync.WaitGroup
fnSet := func() {
for i := 0; i < times; i++ {
cache.Set("k", "v", time.Millisecond*10)
}
wg.Done()
}

fnGet := func() {
for i := 0; i < times; i++ {
cache.Get("k1")
}
wg.Done()
}

for i := 0; i < 50; i++ {
wg.Add(2)
go fnSet()
go fnGet()
}
wg.Wait()
}

0 comments on commit ef1c166

Please sign in to comment.