diff --git a/pkg/storage/tsdb/multilevel_cache.go b/pkg/storage/tsdb/multilevel_cache.go index 59365ad9e0..2511b4259d 100644 --- a/pkg/storage/tsdb/multilevel_cache.go +++ b/pkg/storage/tsdb/multilevel_cache.go @@ -48,10 +48,10 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U misses = keys hits = map[labels.Label][]byte{} - backfillItems := make([][]map[labels.Label][]byte, len(m.caches)-1) + backfillItems := make([]map[labels.Label][]byte, len(m.caches)-1) for i, c := range m.caches { if i < len(m.caches)-1 { - backfillItems[i] = []map[labels.Label][]byte{} + backfillItems[i] = map[labels.Label][]byte{} } if ctx.Err() != nil { return @@ -64,7 +64,7 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U } if i > 0 { - backfillItems[i-1] = append(backfillItems[i-1], h) + backfillItems[i-1] = h } if len(misses) == 0 { @@ -75,14 +75,14 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U defer func() { backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings)) defer backFillTimer.ObserveDuration() - for i, hit := range backfillItems { - for _, values := range hit { - for lbl, b := range values { - if err := m.backfillProcessor.EnqueueAsync(func() { - m.caches[i].StorePostings(blockID, lbl, b, tenant) - }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { - m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc() - } + for i, values := range backfillItems { + for lbl, b := range values { + lbl := lbl + b := b + if err := m.backfillProcessor.EnqueueAsync(func() { + m.caches[i].StorePostings(blockID, lbl, b, tenant) + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc() } } } @@ -148,11 +148,11 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI misses = ids hits = map[storage.SeriesRef][]byte{} - backfillItems := make([][]map[storage.SeriesRef][]byte, len(m.caches)-1) + backfillItems := make([]map[storage.SeriesRef][]byte, len(m.caches)-1) for i, c := range m.caches { if i < len(m.caches)-1 { - backfillItems[i] = []map[storage.SeriesRef][]byte{} + backfillItems[i] = map[storage.SeriesRef][]byte{} } if ctx.Err() != nil { return @@ -165,7 +165,7 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI } if i > 0 && len(h) > 0 { - backfillItems[i-1] = append(backfillItems[i-1], h) + backfillItems[i-1] = h } if len(misses) == 0 { @@ -176,14 +176,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI defer func() { backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries)) defer backFillTimer.ObserveDuration() - for i, hit := range backfillItems { - for _, values := range hit { - for ref, b := range values { - if err := m.backfillProcessor.EnqueueAsync(func() { - m.caches[i].StoreSeries(blockID, ref, b, tenant) - }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { - m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc() - } + for i, values := range backfillItems { + for ref, b := range values { + ref := ref + b := b + if err := m.backfillProcessor.EnqueueAsync(func() { + m.caches[i].StoreSeries(blockID, ref, b, tenant) + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc() } } } diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index 9ae5e92972..263735cbde 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -2,6 +2,8 @@ package tsdb import ( "context" + "sort" + "sync" "testing" "github.com/alicebob/miniredis/v2" @@ -107,10 +109,16 @@ func Test_MultiLevelCache(t *testing.T) { Value: "test2", } + l3 := labels.Label{ + Name: "test3", + Value: "test3", + } + matcher, err := labels.NewMatcher(labels.MatchEqual, "name", "value") require.NoError(t, err) v := make([]byte, 100) + v2 := make([]byte, 200) testCases := map[string]struct { m1ExpectedCalls map[string][][]interface{} @@ -181,6 +189,24 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}, "") }, }, + "[FetchMultiPostings] should fallback and backfill only the missing keys on l1, multiple items": { + m1ExpectedCalls: map[string][][]interface{}{ + "FetchMultiPostings": {{bID, []labels.Label{l1, l2, l3}}}, + "StorePostings": {{bID, l2, v}, {bID, l3, v2}}, + }, + m2ExpectedCalls: map[string][][]interface{}{ + "FetchMultiPostings": {{bID, []labels.Label{l2, l3}}}, + }, + m1MockedCalls: map[string][]interface{}{ + "FetchMultiPostings": {map[labels.Label][]byte{l1: make([]byte, 1)}, []labels.Label{l2, l3}}, + }, + m2MockedCalls: map[string][]interface{}{ + "FetchMultiPostings": {map[labels.Label][]byte{l2: v, l3: v2}, []labels.Label{}}, + }, + call: func(cache storecache.IndexCache) { + cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2, l3}, "") + }, + }, "[FetchMultiPostings] should not fallback when all hit on l1": { m1ExpectedCalls: map[string][][]interface{}{ "FetchMultiPostings": {{bID, []labels.Label{l1, l2}}}, @@ -216,12 +242,33 @@ func Test_MultiLevelCache(t *testing.T) { "FetchMultiSeries": {map[storage.SeriesRef][]byte{1: v}, []storage.SeriesRef{2}}, }, m2MockedCalls: map[string][]interface{}{ - "FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v}, []storage.SeriesRef{2}}, + "FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v}, []storage.SeriesRef{}}, }, call: func(cache storecache.IndexCache) { cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}, "") }, }, + "[FetchMultiSeries] should fallback and backfill only the missing keys on l1, multiple items": { + m1ExpectedCalls: map[string][][]interface{}{ + "FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2, 3}}}, + "StoreSeries": { + {bID, storage.SeriesRef(2), v}, + {bID, storage.SeriesRef(3), v2}, + }, + }, + m2ExpectedCalls: map[string][][]interface{}{ + "FetchMultiSeries": {{bID, []storage.SeriesRef{2, 3}}}, + }, + m1MockedCalls: map[string][]interface{}{ + "FetchMultiSeries": {map[storage.SeriesRef][]byte{1: v}, []storage.SeriesRef{2, 3}}, + }, + m2MockedCalls: map[string][]interface{}{ + "FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v, 3: v2}, []storage.SeriesRef{}}, + }, + call: func(cache storecache.IndexCache) { + cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2, 3}, "") + }, + }, "[FetchMultiSeries] should not fallback when all hit on l1": { m1ExpectedCalls: map[string][][]interface{}{ "FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2}}}, @@ -273,6 +320,23 @@ func Test_MultiLevelCache(t *testing.T) { mlc := c.(*multiLevelCache) // Wait until async operation finishes. mlc.backfillProcessor.Stop() + // Sort call parameters to make test deterministic. + for k := range m1.calls { + switch k { + case "StorePostings": + sort.Slice(m1.calls[k], func(i, j int) bool { + lbl1 := m1.calls[k][i][1].(labels.Label) + lbl2 := m1.calls[k][j][1].(labels.Label) + return lbl1.Name < lbl2.Name + }) + case "StoreSeries": + sort.Slice(m1.calls[k], func(i, j int) bool { + seriesRef1 := m1.calls[k][i][1].(storage.SeriesRef) + seriesRef2 := m1.calls[k][j][1].(storage.SeriesRef) + return seriesRef1 < seriesRef2 + }) + } + } require.Equal(t, tc.m1ExpectedCalls, m1.calls) require.Equal(t, tc.m2ExpectedCalls, m2.calls) }) @@ -287,15 +351,20 @@ func newMockIndexCache(mockedCalls map[string][]interface{}) *mockIndexCache { } type mockIndexCache struct { + mtx sync.Mutex calls map[string][][]interface{} mockedCalls map[string][]interface{} } func (m *mockIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + m.mtx.Lock() + defer m.mtx.Unlock() m.calls["StorePostings"] = append(m.calls["StorePostings"], []interface{}{blockID, l, v}) } func (m *mockIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + m.mtx.Lock() + defer m.mtx.Unlock() m.calls["FetchMultiPostings"] = append(m.calls["FetchMultiPostings"], []interface{}{blockID, keys}) if m, ok := m.mockedCalls["FetchMultiPostings"]; ok { return m[0].(map[labels.Label][]byte), m[1].([]labels.Label) @@ -305,10 +374,14 @@ func (m *mockIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID } func (m *mockIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { + m.mtx.Lock() + defer m.mtx.Unlock() m.calls["StoreExpandedPostings"] = append(m.calls["StoreExpandedPostings"], []interface{}{blockID, matchers, v}) } func (m *mockIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + m.mtx.Lock() + defer m.mtx.Unlock() m.calls["FetchExpandedPostings"] = append(m.calls["FetchExpandedPostings"], []interface{}{blockID, matchers}) if m, ok := m.mockedCalls["FetchExpandedPostings"]; ok { return m[0].([]byte), m[1].(bool) @@ -318,10 +391,14 @@ func (m *mockIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.U } func (m *mockIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + m.mtx.Lock() + defer m.mtx.Unlock() m.calls["StoreSeries"] = append(m.calls["StoreSeries"], []interface{}{blockID, id, v}) } func (m *mockIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + m.mtx.Lock() + defer m.mtx.Unlock() m.calls["FetchMultiSeries"] = append(m.calls["FetchMultiSeries"], []interface{}{blockID, ids}) if m, ok := m.mockedCalls["FetchMultiSeries"]; ok { return m[0].(map[storage.SeriesRef][]byte), m[1].([]storage.SeriesRef)