diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 57027cf2a4..801bfcfc1d 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -5,7 +5,6 @@ import ( "context" "flag" "slices" - "strconv" "strings" "sync" "time" @@ -166,7 +165,7 @@ func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context } func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) { - var seed string + var seed int cache := c.blocksCache // If is a head block, lets add the seed on the cache key so we can @@ -208,8 +207,8 @@ func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd return nil, 0, err } - key := cacheKey(seed, blockID, ms...) - promise, loaded := cache.getPromiseForKey(key, fetch) + key := cacheKey(blockID, ms...) + promise, loaded := cache.getPromiseForKey(seed, key, fetch) if loaded { c.metrics.CacheHits.WithLabelValues(cache.name).Inc() } @@ -231,11 +230,11 @@ func (c *blocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage. } } -func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { +func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) int { return c.seedByHash.getSeed(c.userId, metricName) } -func cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { +func cacheKey(blockID ulid.ULID, ms ...*labels.Matcher) string { slices.SortFunc(ms, func(i, j *labels.Matcher) int { if i.Type != j.Type { return int(i.Type - j.Type) @@ -254,14 +253,12 @@ func cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { sepLen = 1 ) - size := len(seed) + len(blockID.String()) + 2*sepLen + size := len(blockID.String()) + sepLen for _, m := range ms { size += len(m.Name) + len(m.Value) + typeLen + sepLen } sb := strings.Builder{} sb.Grow(size) - sb.WriteString(seed) - sb.WriteByte('|') sb.WriteString(blockID.String()) sb.WriteByte('|') for _, m := range ms { @@ -300,13 +297,13 @@ func newSeedByHash(size int) *seedByHash { } } -func (s *seedByHash) getSeed(userId string, v string) string { +func (s *seedByHash) getSeed(userId string, v string) int { h := memHashString(userId, v) i := h % uint64(len(s.seedByHash)) l := i % uint64(len(s.strippedLock)) s.strippedLock[l].RLock() defer s.strippedLock[l].RUnlock() - return strconv.Itoa(s.seedByHash[i]) + return s.seedByHash[i] } func (s *seedByHash) incrementSeed(userId string, v string) { @@ -360,9 +357,10 @@ func (c *fifoCache[V]) expire() { } } -func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { +func (c *fifoCache[V]) getPromiseForKey(seed int, k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { r := &cacheEntryPromise[V]{ done: make(chan struct{}), + seed: seed, } defer close(r.done) @@ -385,15 +383,39 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) // If the promise is already in the cache, lets wait it to fetch the data. <-loaded.(*cacheEntryPromise[V]).done - // If is cached but is expired, lets try to replace the cache value. - if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) { - c.metrics.CacheEvicts.WithLabelValues(c.name, "expired").Inc() - r.v, r.sizeBytes, r.err = fetch() - r.sizeBytes += int64(len(k)) - c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes) - loaded = r - r.ts = c.timeNow() - ok = false + var reason string + invalidated, expired := false, false + + switch { + // If the seed from the cached promise is not equal with the incoming sample, it means that the cache key was invalidated + case loaded.(*cacheEntryPromise[V]).seed != seed: + invalidated = true + reason = "invalidated" + case loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()): + expired = true + reason = "expired" + } + + if invalidated || expired { + c.metrics.CacheEvicts.WithLabelValues(c.name, reason).Inc() + + // If the cache is invalid of expired, lets try to replace its value + if c.cachedValues.CompareAndSwap(k, loaded, r) { + r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes) + loaded = r + r.ts = c.timeNow() + ok = false + } else if invalidated { + // If we cannot perform the swap, it indicates that another goroutine is attempting to set the cache key concurrently. + // In this scenario, fetch the key if it was invalidated, as we cannot be certain whether the other goroutine holds + // the most up-to-date value. Loading from the cache in this state may result in returning a stale value. + r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + r.ts = c.timeNow() + loaded = r + } } } @@ -459,6 +481,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { type cacheEntryPromise[V any] struct { ts time.Time sizeBytes int64 + seed int done chan struct{} v V diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index 3517ec6e9c..b37c31e0f9 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -18,7 +18,6 @@ import ( func TestCacheKey(t *testing.T) { blockID := ulid.MustNew(1, nil) - seed := "seed123" matchers := []*labels.Matcher{ { Type: labels.MatchEqual, @@ -41,8 +40,8 @@ func TestCacheKey(t *testing.T) { Value: "value_4", }, } - r := cacheKey(seed, blockID, matchers...) - require.Equal(t, "seed123|00000000010000000000000000|name_1=value_1|name_2!=value_2|name_3=~value_4|name_5!~value_4|", r) + r := cacheKey(blockID, matchers...) + require.Equal(t, "00000000010000000000000000|name_1=value_1|name_2!=value_2|name_3=~value_4|name_5!~value_4|", r) } func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { @@ -67,7 +66,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { for i := 0; i < 100; i++ { go func() { defer wg.Done() - cache.getPromiseForKey("key1", fetchFunc) + cache.getPromiseForKey(1, "key1", fetchFunc) }() } @@ -82,7 +81,7 @@ func TestFifoCacheDisabled(t *testing.T) { m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) timeNow := time.Now cache := newFifoCache[int](cfg, "test", m, timeNow) - old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) { + old, loaded := cache.getPromiseForKey(1, "key1", func() (int, int64, error) { return 1, 0, nil }) require.False(t, loaded) @@ -91,7 +90,6 @@ func TestFifoCacheDisabled(t *testing.T) { } func TestFifoCacheExpire(t *testing.T) { - keySize := 20 numberOfKeys := 100 @@ -128,17 +126,24 @@ func TestFifoCacheExpire(t *testing.T) { for i := 0; i < numberOfKeys; i++ { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) - p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + p, loaded := cache.getPromiseForKey(1, key, func() (int, int64, error) { return 1, 8, nil }) require.False(t, loaded) require.Equal(t, 1, p.v) require.True(t, cache.contains(key)) - p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) { - return 1, 0, nil + p, loaded = cache.getPromiseForKey(1, key, func() (int, int64, error) { + return 1, 8, nil }) require.True(t, loaded) require.Equal(t, 1, p.v) + + // Changing seed and make sure the key is reloaded + p, loaded = cache.getPromiseForKey(2, key, func() (int, int64, error) { + return 2, 8, nil + }) + require.False(t, loaded) + require.Equal(t, 2, p.v) } totalCacheSize := 0 @@ -156,8 +161,9 @@ func TestFifoCacheExpire(t *testing.T) { err := testutil.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(` # HELP cortex_ingester_expanded_postings_cache_evicts Total number of evictions in the cache, excluding items that got evicted due to TTL. # TYPE cortex_ingester_expanded_postings_cache_evicts counter + cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="invalidated"} %v cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="full"} %v -`, numberOfKeys-c.expectedFinalItems)), "cortex_ingester_expanded_postings_cache_evicts") +`, numberOfKeys, numberOfKeys-c.expectedFinalItems)), "cortex_ingester_expanded_postings_cache_evicts") require.NoError(t, err) } @@ -170,7 +176,7 @@ func TestFifoCacheExpire(t *testing.T) { for i := 0; i < numberOfKeys; i++ { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) originalSize := cache.cachedBytes - p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + p, loaded := cache.getPromiseForKey(2, key, func() (int, int64, error) { return 2, 18, nil }) require.False(t, loaded) @@ -183,15 +189,16 @@ func TestFifoCacheExpire(t *testing.T) { err := testutil.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(` # HELP cortex_ingester_expanded_postings_cache_evicts Total number of evictions in the cache, excluding items that got evicted due to TTL. # TYPE cortex_ingester_expanded_postings_cache_evicts counter + cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="invalidated"} %v cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="expired"} %v -`, numberOfKeys)), "cortex_ingester_expanded_postings_cache_evicts") +`, numberOfKeys, numberOfKeys)), "cortex_ingester_expanded_postings_cache_evicts") require.NoError(t, err) cache.timeNow = func() time.Time { return timeNow().Add(5 * c.cfg.Ttl) } - cache.getPromiseForKey("newKwy", func() (int, int64, error) { + cache.getPromiseForKey(1, "newKwy", func() (int, int64, error) { return 2, 18, nil }) @@ -200,7 +207,8 @@ func TestFifoCacheExpire(t *testing.T) { # HELP cortex_ingester_expanded_postings_cache_evicts Total number of evictions in the cache, excluding items that got evicted due to TTL. # TYPE cortex_ingester_expanded_postings_cache_evicts counter cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="expired"} %v -`, numberOfKeys*2)), "cortex_ingester_expanded_postings_cache_evicts") + cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="invalidated"} %v +`, numberOfKeys*2, numberOfKeys)), "cortex_ingester_expanded_postings_cache_evicts") require.NoError(t, err) } })