Skip to content

Commit

Permalink
Fix async backfill loop issue (#5670)
Browse files Browse the repository at this point in the history
* fix async backfill loop issue

Signed-off-by: Ben Ye <[email protected]>

* address data race

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Nov 27, 2023
1 parent 84b982c commit 2d7b13a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 23 deletions.
44 changes: 22 additions & 22 deletions pkg/storage/tsdb/multilevel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U

misses = keys
hits = map[labels.Label][]byte{}
backfillItems := make([][]map[labels.Label][]byte, len(m.caches)-1)
backfillItems := make([]map[labels.Label][]byte, len(m.caches)-1)
for i, c := range m.caches {
if i < len(m.caches)-1 {
backfillItems[i] = []map[labels.Label][]byte{}
backfillItems[i] = map[labels.Label][]byte{}
}
if ctx.Err() != nil {
return
Expand All @@ -64,7 +64,7 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
}

if i > 0 {
backfillItems[i-1] = append(backfillItems[i-1], h)
backfillItems[i-1] = h
}

if len(misses) == 0 {
Expand All @@ -75,14 +75,14 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings))
defer backFillTimer.ObserveDuration()
for i, hit := range backfillItems {
for _, values := range hit {
for lbl, b := range values {
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StorePostings(blockID, lbl, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc()
}
for i, values := range backfillItems {
for lbl, b := range values {
lbl := lbl
b := b
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StorePostings(blockID, lbl, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc()
}
}
}
Expand Down Expand Up @@ -148,11 +148,11 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI

misses = ids
hits = map[storage.SeriesRef][]byte{}
backfillItems := make([][]map[storage.SeriesRef][]byte, len(m.caches)-1)
backfillItems := make([]map[storage.SeriesRef][]byte, len(m.caches)-1)

for i, c := range m.caches {
if i < len(m.caches)-1 {
backfillItems[i] = []map[storage.SeriesRef][]byte{}
backfillItems[i] = map[storage.SeriesRef][]byte{}
}
if ctx.Err() != nil {
return
Expand All @@ -165,7 +165,7 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
}

if i > 0 && len(h) > 0 {
backfillItems[i-1] = append(backfillItems[i-1], h)
backfillItems[i-1] = h
}

if len(misses) == 0 {
Expand All @@ -176,14 +176,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries))
defer backFillTimer.ObserveDuration()
for i, hit := range backfillItems {
for _, values := range hit {
for ref, b := range values {
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StoreSeries(blockID, ref, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc()
}
for i, values := range backfillItems {
for ref, b := range values {
ref := ref
b := b
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StoreSeries(blockID, ref, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc()
}
}
}
Expand Down
79 changes: 78 additions & 1 deletion pkg/storage/tsdb/multilevel_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package tsdb

import (
"context"
"sort"
"sync"
"testing"

"github.com/alicebob/miniredis/v2"
Expand Down Expand Up @@ -107,10 +109,16 @@ func Test_MultiLevelCache(t *testing.T) {
Value: "test2",
}

l3 := labels.Label{
Name: "test3",
Value: "test3",
}

matcher, err := labels.NewMatcher(labels.MatchEqual, "name", "value")
require.NoError(t, err)

v := make([]byte, 100)
v2 := make([]byte, 200)

testCases := map[string]struct {
m1ExpectedCalls map[string][][]interface{}
Expand Down Expand Up @@ -181,6 +189,24 @@ func Test_MultiLevelCache(t *testing.T) {
cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2}, "")
},
},
"[FetchMultiPostings] should fallback and backfill only the missing keys on l1, multiple items": {
m1ExpectedCalls: map[string][][]interface{}{
"FetchMultiPostings": {{bID, []labels.Label{l1, l2, l3}}},
"StorePostings": {{bID, l2, v}, {bID, l3, v2}},
},
m2ExpectedCalls: map[string][][]interface{}{
"FetchMultiPostings": {{bID, []labels.Label{l2, l3}}},
},
m1MockedCalls: map[string][]interface{}{
"FetchMultiPostings": {map[labels.Label][]byte{l1: make([]byte, 1)}, []labels.Label{l2, l3}},
},
m2MockedCalls: map[string][]interface{}{
"FetchMultiPostings": {map[labels.Label][]byte{l2: v, l3: v2}, []labels.Label{}},
},
call: func(cache storecache.IndexCache) {
cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2, l3}, "")
},
},
"[FetchMultiPostings] should not fallback when all hit on l1": {
m1ExpectedCalls: map[string][][]interface{}{
"FetchMultiPostings": {{bID, []labels.Label{l1, l2}}},
Expand Down Expand Up @@ -216,12 +242,33 @@ func Test_MultiLevelCache(t *testing.T) {
"FetchMultiSeries": {map[storage.SeriesRef][]byte{1: v}, []storage.SeriesRef{2}},
},
m2MockedCalls: map[string][]interface{}{
"FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v}, []storage.SeriesRef{2}},
"FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v}, []storage.SeriesRef{}},
},
call: func(cache storecache.IndexCache) {
cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2}, "")
},
},
"[FetchMultiSeries] should fallback and backfill only the missing keys on l1, multiple items": {
m1ExpectedCalls: map[string][][]interface{}{
"FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2, 3}}},
"StoreSeries": {
{bID, storage.SeriesRef(2), v},
{bID, storage.SeriesRef(3), v2},
},
},
m2ExpectedCalls: map[string][][]interface{}{
"FetchMultiSeries": {{bID, []storage.SeriesRef{2, 3}}},
},
m1MockedCalls: map[string][]interface{}{
"FetchMultiSeries": {map[storage.SeriesRef][]byte{1: v}, []storage.SeriesRef{2, 3}},
},
m2MockedCalls: map[string][]interface{}{
"FetchMultiSeries": {map[storage.SeriesRef][]byte{2: v, 3: v2}, []storage.SeriesRef{}},
},
call: func(cache storecache.IndexCache) {
cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2, 3}, "")
},
},
"[FetchMultiSeries] should not fallback when all hit on l1": {
m1ExpectedCalls: map[string][][]interface{}{
"FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2}}},
Expand Down Expand Up @@ -273,6 +320,23 @@ func Test_MultiLevelCache(t *testing.T) {
mlc := c.(*multiLevelCache)
// Wait until async operation finishes.
mlc.backfillProcessor.Stop()
// Sort call parameters to make test deterministic.
for k := range m1.calls {
switch k {
case "StorePostings":
sort.Slice(m1.calls[k], func(i, j int) bool {
lbl1 := m1.calls[k][i][1].(labels.Label)
lbl2 := m1.calls[k][j][1].(labels.Label)
return lbl1.Name < lbl2.Name
})
case "StoreSeries":
sort.Slice(m1.calls[k], func(i, j int) bool {
seriesRef1 := m1.calls[k][i][1].(storage.SeriesRef)
seriesRef2 := m1.calls[k][j][1].(storage.SeriesRef)
return seriesRef1 < seriesRef2
})
}
}
require.Equal(t, tc.m1ExpectedCalls, m1.calls)
require.Equal(t, tc.m2ExpectedCalls, m2.calls)
})
Expand All @@ -287,15 +351,20 @@ func newMockIndexCache(mockedCalls map[string][]interface{}) *mockIndexCache {
}

type mockIndexCache struct {
mtx sync.Mutex
calls map[string][][]interface{}
mockedCalls map[string][]interface{}
}

func (m *mockIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.calls["StorePostings"] = append(m.calls["StorePostings"], []interface{}{blockID, l, v})
}

func (m *mockIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.calls["FetchMultiPostings"] = append(m.calls["FetchMultiPostings"], []interface{}{blockID, keys})
if m, ok := m.mockedCalls["FetchMultiPostings"]; ok {
return m[0].(map[labels.Label][]byte), m[1].([]labels.Label)
Expand All @@ -305,10 +374,14 @@ func (m *mockIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID
}

func (m *mockIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.calls["StoreExpandedPostings"] = append(m.calls["StoreExpandedPostings"], []interface{}{blockID, matchers, v})
}

func (m *mockIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.calls["FetchExpandedPostings"] = append(m.calls["FetchExpandedPostings"], []interface{}{blockID, matchers})
if m, ok := m.mockedCalls["FetchExpandedPostings"]; ok {
return m[0].([]byte), m[1].(bool)
Expand All @@ -318,10 +391,14 @@ func (m *mockIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.U
}

func (m *mockIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.calls["StoreSeries"] = append(m.calls["StoreSeries"], []interface{}{blockID, id, v})
}

func (m *mockIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.calls["FetchMultiSeries"] = append(m.calls["FetchMultiSeries"], []interface{}{blockID, ids})
if m, ok := m.mockedCalls["FetchMultiSeries"]; ok {
return m[0].(map[storage.SeriesRef][]byte), m[1].([]storage.SeriesRef)
Expand Down

0 comments on commit 2d7b13a

Please sign in to comment.