Skip to content

Commit

Permalink
Fix: PostingCache promise should fetch data only once (#6314)
Browse files Browse the repository at this point in the history
* Fix: PostingCache promise should fetch data only once

Signed-off-by: alanprot <[email protected]>

* fix

Signed-off-by: alanprot <[email protected]>

* fix test using default prometheus registerer

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Nov 6, 2024
1 parent 2e5488a commit f658039
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 33 deletions.
35 changes: 16 additions & 19 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,17 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
return c.result(promise)
}

func (c *BlocksPostingsForMatchersCache) result(promise *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
return func(ctx context.Context) (index.Postings, error) {
ids, err := promise.result(ctx)
return index.NewListPostings(ids), err
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ce.done:
if ctx.Err() != nil {
return nil, ctx.Err()
}
return index.NewListPostings(ce.v), ce.err
}
}
}

Expand Down Expand Up @@ -327,9 +334,12 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
c.expire()
}

// If is cached but is expired, lets try to replace the cache value
if ok && loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) {
if c.cachedValues.CompareAndSwap(k, loaded, r) {
if ok {
// If the promise is already in the cache, lets wait it to fetch the data.
<-loaded.(*cacheEntryPromise[V]).done

// If is cached but is expired, lets try to replace the cache value.
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
Expand Down Expand Up @@ -404,19 +414,6 @@ type cacheEntryPromise[V any] struct {
err error
}

func (ce *cacheEntryPromise[V]) result(ctx context.Context) (V, error) {
select {
case <-ctx.Done():
return ce.v, ctx.Err()
case <-ce.done:
if ctx.Err() != nil {
return ce.v, ctx.Err()
}

return ce.v, ce.err
}
}

func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool {
ts := ce.ts
r := now.Sub(ts)
Expand Down
52 changes: 38 additions & 14 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,59 @@
package tsdb

import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
cfg := PostingsCacheConfig{
Enabled: true,
Ttl: time.Hour,
MaxBytes: 10 << 20,
}
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
cache := newFifoCache[int](cfg, "test", m, time.Now)
calls := atomic.Int64{}
concurrency := 100
wg := sync.WaitGroup{}
wg.Add(concurrency)

fetchFunc := func() (int, int64, error) {
calls.Inc()
time.Sleep(100 * time.Millisecond)
return 0, 0, nil
}

for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
cache.getPromiseForKey("key1", fetchFunc)
}()
}

wg.Wait()
require.Equal(t, int64(1), calls.Load())

}

func TestFifoCacheDisabled(t *testing.T) {
cfg := PostingsCacheConfig{}
cfg.Enabled = false
m := NewPostingCacheMetrics(prometheus.DefaultRegisterer)
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
timeNow := time.Now
cache := newFifoCache[int](cfg, "test", m, timeNow)
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
return 1, 0, nil
})
require.False(t, loaded)
v, err := old.result(context.Background())
require.NoError(t, err)
require.Equal(t, 1, v)
require.Equal(t, 1, old.v)
require.False(t, cache.contains("key1"))
}

Expand Down Expand Up @@ -68,17 +98,13 @@ func TestFifoCacheExpire(t *testing.T) {
return 1, 8, nil
})
require.False(t, loaded)
v, err := p.result(context.Background())
require.NoError(t, err)
require.Equal(t, 1, v)
require.Equal(t, 1, p.v)
require.True(t, cache.contains(key))
p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) {
return 1, 0, nil
})
require.True(t, loaded)
v, err = p.result(context.Background())
require.NoError(t, err)
require.Equal(t, 1, v)
require.Equal(t, 1, p.v)
}

totalCacheSize := 0
Expand All @@ -104,10 +130,8 @@ func TestFifoCacheExpire(t *testing.T) {
return 2, 18, nil
})
require.False(t, loaded)
v, err := p.result(context.Background())
require.NoError(t, err)
// New value
require.Equal(t, 2, v)
require.Equal(t, 2, p.v)
// Total Size Updated
require.Equal(t, originalSize+10, cache.cachedBytes)
}
Expand Down

0 comments on commit f658039

Please sign in to comment.