From c46aec6c7a7b0937f9aa0bef9e7ff62ca0b1167b Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 22 Nov 2024 11:36:26 -0800 Subject: [PATCH] Using a single seed array for expanded postings cache on ingesters (#6365) * Using a single seed array for expanded postings cache on ingesters Signed-off-by: alanprot * using tenant id to calculate the seeds hash Signed-off-by: alanprot * Adding cache isolation test Signed-off-by: alanprot * add test for memHashString Signed-off-by: alanprot --------- Signed-off-by: alanprot --- pkg/ingester/ingester.go | 20 +-- pkg/ingester/ingester_test.go | 59 +++++++++ pkg/storage/tsdb/expanded_postings_cache.go | 116 ++++++++++++------ .../tsdb/expanded_postings_cache_test.go | 18 +++ 4 files changed, 169 insertions(+), 44 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 00dd1337ce..6f2aea2e0d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -238,6 +238,8 @@ type Ingester struct { inflightQueryRequests atomic.Int64 maxInflightQueryRequests util_math.MaxTracker + + expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory } // Shipper interface is used to have an easy way to mock it in tests. @@ -697,12 +699,13 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe } i := &Ingester{ - cfg: cfg, - limits: limits, - usersMetadata: map[string]*userMetricsMetadata{}, - TSDBState: newTSDBState(bucketClient, registerer), - logger: logger, - ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + cfg: cfg, + limits: limits, + usersMetadata: map[string]*userMetricsMetadata{}, + TSDBState: newTSDBState(bucketClient, registerer), + logger: logger, + ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + expandedPostingsCacheFactory: cortex_tsdb.NewExpandedPostingsCacheFactory(cfg.BlocksStorageConfig.TSDB.PostingsCache), } i.metrics = newIngesterMetrics(registerer, false, @@ -2174,9 +2177,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { blockRanges := i.cfg.BlocksStorageConfig.TSDB.BlockRanges.ToMilliseconds() var postingCache cortex_tsdb.ExpandedPostingsCache - if i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled || i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled { - logutil.WarnExperimentalUse("expanded postings cache") - postingCache = cortex_tsdb.NewBlocksPostingsForMatchersCache(i.cfg.BlocksStorageConfig.TSDB.PostingsCache, i.metrics.expandedPostingsCacheMetrics) + if i.expandedPostingsCacheFactory != nil { + postingCache = i.expandedPostingsCacheFactory.NewExpandedPostingsCache(userID, i.metrics.expandedPostingsCacheMetrics) } userDB := &userTSDB{ diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 2f7a4c7cf6..6f1f145a9e 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5080,6 +5080,65 @@ func TestIngester_instanceLimitsMetrics(t *testing.T) { `), "cortex_ingester_instance_limits")) } +func TestExpendedPostingsCacheIsolation(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{ + SeedSize: 1, // lets make sure all metric names collide + Head: cortex_tsdb.PostingsCacheConfig{ + Enabled: true, + }, + Blocks: cortex_tsdb.PostingsCacheConfig{ + Enabled: true, + }, + } + + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + numberOfTenants := 100 + wg := sync.WaitGroup{} + wg.Add(numberOfTenants) + + for j := 0; j < numberOfTenants; j++ { + go func() { + defer wg.Done() + userId := fmt.Sprintf("user%v", j) + ctx := user.InjectOrgID(context.Background(), userId) + _, err = i.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userId)}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API)) + require.NoError(t, err) + }() + } + + wg.Wait() + + wg.Add(numberOfTenants) + for j := 0; j < numberOfTenants; j++ { + go func() { + defer wg.Done() + userId := fmt.Sprintf("user%v", j) + ctx := user.InjectOrgID(context.Background(), userId) + s := &mockQueryStreamServer{ctx: ctx} + + err := i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}}, + }, s) + require.NoError(t, err) + require.Len(t, s.series, 1) + require.Len(t, s.series[0].Labels, 2) + require.Equal(t, userId, cortexpb.FromLabelAdaptersToLabels(s.series[0].Labels).Get("userId")) + }() + } + wg.Wait() +} + func TestExpendedPostingsCache(t *testing.T) { cfg := defaultIngesterTestConfig(t) cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 59af6d879f..ae9cbea2ee 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/cespare/xxhash/v2" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -18,8 +17,10 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" + "github.com/segmentio/fasthash/fnv1a" "github.com/cortexproject/cortex/pkg/util/extract" + logutil "github.com/cortexproject/cortex/pkg/util/log" ) var ( @@ -29,8 +30,8 @@ var ( const ( // size of the seed array. Each seed is a 64bits int (8 bytes) - // totaling 8mb - seedArraySize = 1024 * 1024 + // totaling 16mb + seedArraySize = 2 * 1024 * 1024 numOfSeedsStripes = 512 ) @@ -67,7 +68,9 @@ type TSDBPostingsCacheConfig struct { Head PostingsCacheConfig `yaml:"head" doc:"description=If enabled, ingesters will cache expanded postings for the head block. Only queries with with an equal matcher for metric __name__ are cached."` Blocks PostingsCacheConfig `yaml:"blocks" doc:"description=If enabled, ingesters will cache expanded postings for the compacted blocks. The cache is shared between all blocks."` + // The configurations below are used only for testing purpose PostingsForMatchers func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) `yaml:"-"` + SeedSize int `yaml:"-"` timeNow func() time.Time `yaml:"-"` } @@ -89,25 +92,48 @@ func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not") } +type ExpandedPostingsCacheFactory struct { + seedByHash *seedByHash + cfg TSDBPostingsCacheConfig +} + +func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPostingsCacheFactory { + if cfg.Head.Enabled || cfg.Blocks.Enabled { + if cfg.SeedSize == 0 { + cfg.SeedSize = seedArraySize + } + logutil.WarnExperimentalUse("expanded postings cache") + return &ExpandedPostingsCacheFactory{ + cfg: cfg, + seedByHash: newSeedByHash(cfg.SeedSize), + } + } + + return nil +} + +func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { + return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash) +} + type ExpandedPostingsCache interface { PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) ExpireSeries(metric labels.Labels) } -type BlocksPostingsForMatchersCache struct { - strippedLock []sync.RWMutex - - headCache *fifoCache[[]storage.SeriesRef] - blocksCache *fifoCache[[]storage.SeriesRef] +type blocksPostingsForMatchersCache struct { + userId string - headSeedByMetricName []int + headCache *fifoCache[[]storage.SeriesRef] + blocksCache *fifoCache[[]storage.SeriesRef] postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) timeNow func() time.Time - metrics *ExpandedPostingsCacheMetrics + metrics *ExpandedPostingsCacheMetrics + seedByHash *seedByHash } -func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { +func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache { if cfg.PostingsForMatchers == nil { cfg.PostingsForMatchers = tsdb.PostingsForMatchers } @@ -116,36 +142,30 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp cfg.timeNow = time.Now } - return &BlocksPostingsForMatchersCache{ + return &blocksPostingsForMatchersCache{ headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), - headSeedByMetricName: make([]int, seedArraySize), - strippedLock: make([]sync.RWMutex, numOfSeedsStripes), postingsForMatchersFunc: cfg.PostingsForMatchers, timeNow: cfg.timeNow, metrics: metrics, + seedByHash: seedByHash, + userId: userId, } } -func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { +func (c *blocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { metricName, err := extract.MetricNameFromLabels(metric) if err != nil { return } - - h := MemHashString(metricName) - i := h % uint64(len(c.headSeedByMetricName)) - l := h % uint64(len(c.strippedLock)) - c.strippedLock[l].Lock() - defer c.strippedLock[l].Unlock() - c.headSeedByMetricName[i]++ + c.seedByHash.incrementSeed(c.userId, metricName) } -func (c *BlocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { +func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { return c.fetchPostings(blockID, ix, ms...)(ctx) } -func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) { +func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) { var seed string cache := c.blocksCache @@ -197,7 +217,7 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd return c.result(promise) } -func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { +func (c *blocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { return func(ctx context.Context) (index.Postings, error) { select { case <-ctx.Done(): @@ -211,16 +231,11 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage. } } -func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { - h := MemHashString(metricName) - i := h % uint64(len(c.headSeedByMetricName)) - l := h % uint64(len(c.strippedLock)) - c.strippedLock[l].RLock() - defer c.strippedLock[l].RUnlock() - return strconv.Itoa(c.headSeedByMetricName[i]) +func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { + return c.seedByHash.getSeed(c.userId, metricName) } -func (c *BlocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { +func (c *blocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { slices.SortFunc(ms, func(i, j *labels.Matcher) int { if i.Type != j.Type { return int(i.Type - j.Type) @@ -272,6 +287,36 @@ func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) { return "", false } +type seedByHash struct { + strippedLock []sync.RWMutex + seedByHash []int +} + +func newSeedByHash(size int) *seedByHash { + return &seedByHash{ + seedByHash: make([]int, size), + strippedLock: make([]sync.RWMutex, numOfSeedsStripes), + } +} + +func (s *seedByHash) getSeed(userId string, v string) string { + h := memHashString(userId, v) + i := h % uint64(len(s.seedByHash)) + l := h % uint64(len(s.strippedLock)) + s.strippedLock[l].RLock() + defer s.strippedLock[l].RUnlock() + return strconv.Itoa(s.seedByHash[i]) +} + +func (s *seedByHash) incrementSeed(userId string, v string) { + h := memHashString(userId, v) + i := h % uint64(len(s.seedByHash)) + l := h % uint64(len(s.strippedLock)) + s.strippedLock[l].Lock() + defer s.strippedLock[l].Unlock() + s.seedByHash[i]++ +} + type fifoCache[V any] struct { cfg PostingsCacheConfig cachedValues *sync.Map @@ -425,6 +470,7 @@ func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool return r >= ttl } -func MemHashString(str string) uint64 { - return xxhash.Sum64(yoloBuf(str)) +func memHashString(userId, v string) uint64 { + h := fnv1a.HashString64(userId) + return fnv1a.AddString64(h, v) } diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index db821736a3..6a9476072a 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -176,6 +176,24 @@ func TestFifoCacheExpire(t *testing.T) { } } +func Test_memHashString(test *testing.T) { + numberOfTenants := 200 + numberOfMetrics := 100 + occurrences := map[uint64]int{} + + for k := 0; k < 10; k++ { + for j := 0; j < numberOfMetrics; j++ { + metricName := fmt.Sprintf("metricName%v", j) + for i := 0; i < numberOfTenants; i++ { + userId := fmt.Sprintf("user%v", i) + occurrences[memHashString(userId, metricName)]++ + } + } + + require.Len(test, occurrences, numberOfMetrics*numberOfTenants) + } +} + func RepeatStringIfNeeded(seed string, length int) string { if len(seed) > length { return seed