Skip to content

Commit

Permalink
Using a single seed array for expanded postings cache on ingesters (#…
Browse files Browse the repository at this point in the history
…6365)

* Using a single seed array for expanded postings cache on ingesters

Signed-off-by: alanprot <[email protected]>

* using tenant id to calculate the seeds hash

Signed-off-by: alanprot <[email protected]>

* Adding cache isolation test

Signed-off-by: alanprot <[email protected]>

* add test for memHashString

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Nov 22, 2024
1 parent 0dc2e33 commit c46aec6
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 44 deletions.
20 changes: 11 additions & 9 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ type Ingester struct {

inflightQueryRequests atomic.Int64
maxInflightQueryRequests util_math.MaxTracker

expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory
}

// Shipper interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -697,12 +699,13 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
}

i := &Ingester{
cfg: cfg,
limits: limits,
usersMetadata: map[string]*userMetricsMetadata{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
cfg: cfg,
limits: limits,
usersMetadata: map[string]*userMetricsMetadata{},
TSDBState: newTSDBState(bucketClient, registerer),
logger: logger,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
expandedPostingsCacheFactory: cortex_tsdb.NewExpandedPostingsCacheFactory(cfg.BlocksStorageConfig.TSDB.PostingsCache),
}
i.metrics = newIngesterMetrics(registerer,
false,
Expand Down Expand Up @@ -2174,9 +2177,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
blockRanges := i.cfg.BlocksStorageConfig.TSDB.BlockRanges.ToMilliseconds()

var postingCache cortex_tsdb.ExpandedPostingsCache
if i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled || i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled {
logutil.WarnExperimentalUse("expanded postings cache")
postingCache = cortex_tsdb.NewBlocksPostingsForMatchersCache(i.cfg.BlocksStorageConfig.TSDB.PostingsCache, i.metrics.expandedPostingsCacheMetrics)
if i.expandedPostingsCacheFactory != nil {
postingCache = i.expandedPostingsCacheFactory.NewExpandedPostingsCache(userID, i.metrics.expandedPostingsCacheMetrics)
}

userDB := &userTSDB{
Expand Down
59 changes: 59 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5080,6 +5080,65 @@ func TestIngester_instanceLimitsMetrics(t *testing.T) {
`), "cortex_ingester_instance_limits"))
}

func TestExpendedPostingsCacheIsolation(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
cfg.LifecyclerConfig.JoinAfter = 0
cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{
SeedSize: 1, // lets make sure all metric names collide
Head: cortex_tsdb.PostingsCacheConfig{
Enabled: true,
},
Blocks: cortex_tsdb.PostingsCacheConfig{
Enabled: true,
},
}

r := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

numberOfTenants := 100
wg := sync.WaitGroup{}
wg.Add(numberOfTenants)

for j := 0; j < numberOfTenants; j++ {
go func() {
defer wg.Done()
userId := fmt.Sprintf("user%v", j)
ctx := user.InjectOrgID(context.Background(), userId)
_, err = i.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(labels.MetricName, "foo", "userId", userId)}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()
}

wg.Wait()

wg.Add(numberOfTenants)
for j := 0; j < numberOfTenants; j++ {
go func() {
defer wg.Done()
userId := fmt.Sprintf("user%v", j)
ctx := user.InjectOrgID(context.Background(), userId)
s := &mockQueryStreamServer{ctx: ctx}

err := i.QueryStream(&client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: math.MaxInt64,
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
}, s)
require.NoError(t, err)
require.Len(t, s.series, 1)
require.Len(t, s.series[0].Labels, 2)
require.Equal(t, userId, cortexpb.FromLabelAdaptersToLabels(s.series[0].Labels).Get("userId"))
}()
}
wg.Wait()
}

func TestExpendedPostingsCache(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
Expand Down
116 changes: 81 additions & 35 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"sync"
"time"

"github.com/cespare/xxhash/v2"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/segmentio/fasthash/fnv1a"

"github.com/cortexproject/cortex/pkg/util/extract"
logutil "github.com/cortexproject/cortex/pkg/util/log"
)

var (
Expand All @@ -29,8 +30,8 @@ var (

const (
// size of the seed array. Each seed is a 64bits int (8 bytes)
// totaling 8mb
seedArraySize = 1024 * 1024
// totaling 16mb
seedArraySize = 2 * 1024 * 1024

numOfSeedsStripes = 512
)
Expand Down Expand Up @@ -67,7 +68,9 @@ type TSDBPostingsCacheConfig struct {
Head PostingsCacheConfig `yaml:"head" doc:"description=If enabled, ingesters will cache expanded postings for the head block. Only queries with with an equal matcher for metric __name__ are cached."`
Blocks PostingsCacheConfig `yaml:"blocks" doc:"description=If enabled, ingesters will cache expanded postings for the compacted blocks. The cache is shared between all blocks."`

// The configurations below are used only for testing purpose
PostingsForMatchers func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) `yaml:"-"`
SeedSize int `yaml:"-"`
timeNow func() time.Time `yaml:"-"`
}

Expand All @@ -89,25 +92,48 @@ func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f
f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not")
}

type ExpandedPostingsCacheFactory struct {
seedByHash *seedByHash
cfg TSDBPostingsCacheConfig
}

func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPostingsCacheFactory {
if cfg.Head.Enabled || cfg.Blocks.Enabled {
if cfg.SeedSize == 0 {
cfg.SeedSize = seedArraySize
}
logutil.WarnExperimentalUse("expanded postings cache")
return &ExpandedPostingsCacheFactory{
cfg: cfg,
seedByHash: newSeedByHash(cfg.SeedSize),
}
}

return nil
}

func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache {
return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash)
}

type ExpandedPostingsCache interface {
PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
ExpireSeries(metric labels.Labels)
}

type BlocksPostingsForMatchersCache struct {
strippedLock []sync.RWMutex

headCache *fifoCache[[]storage.SeriesRef]
blocksCache *fifoCache[[]storage.SeriesRef]
type blocksPostingsForMatchersCache struct {
userId string

headSeedByMetricName []int
headCache *fifoCache[[]storage.SeriesRef]
blocksCache *fifoCache[[]storage.SeriesRef]
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
timeNow func() time.Time

metrics *ExpandedPostingsCacheMetrics
metrics *ExpandedPostingsCacheMetrics
seedByHash *seedByHash
}

func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache {
func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache {
if cfg.PostingsForMatchers == nil {
cfg.PostingsForMatchers = tsdb.PostingsForMatchers
}
Expand All @@ -116,36 +142,30 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp
cfg.timeNow = time.Now
}

return &BlocksPostingsForMatchersCache{
return &blocksPostingsForMatchersCache{
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
headSeedByMetricName: make([]int, seedArraySize),
strippedLock: make([]sync.RWMutex, numOfSeedsStripes),
postingsForMatchersFunc: cfg.PostingsForMatchers,
timeNow: cfg.timeNow,
metrics: metrics,
seedByHash: seedByHash,
userId: userId,
}
}

func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {
func (c *blocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {
metricName, err := extract.MetricNameFromLabels(metric)
if err != nil {
return
}

h := MemHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
c.strippedLock[l].Lock()
defer c.strippedLock[l].Unlock()
c.headSeedByMetricName[i]++
c.seedByHash.incrementSeed(c.userId, metricName)
}

func (c *BlocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
return c.fetchPostings(blockID, ix, ms...)(ctx)
}

func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) {
func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) {
var seed string
cache := c.blocksCache

Expand Down Expand Up @@ -197,7 +217,7 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
return c.result(promise)
}

func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
func (c *blocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) {
return func(ctx context.Context) (index.Postings, error) {
select {
case <-ctx.Done():
Expand All @@ -211,16 +231,11 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.
}
}

func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
h := MemHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
c.strippedLock[l].RLock()
defer c.strippedLock[l].RUnlock()
return strconv.Itoa(c.headSeedByMetricName[i])
func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
return c.seedByHash.getSeed(c.userId, metricName)
}

func (c *BlocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string {
func (c *blocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string {
slices.SortFunc(ms, func(i, j *labels.Matcher) int {
if i.Type != j.Type {
return int(i.Type - j.Type)
Expand Down Expand Up @@ -272,6 +287,36 @@ func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) {
return "", false
}

type seedByHash struct {
strippedLock []sync.RWMutex
seedByHash []int
}

func newSeedByHash(size int) *seedByHash {
return &seedByHash{
seedByHash: make([]int, size),
strippedLock: make([]sync.RWMutex, numOfSeedsStripes),
}
}

func (s *seedByHash) getSeed(userId string, v string) string {
h := memHashString(userId, v)
i := h % uint64(len(s.seedByHash))
l := h % uint64(len(s.strippedLock))
s.strippedLock[l].RLock()
defer s.strippedLock[l].RUnlock()
return strconv.Itoa(s.seedByHash[i])
}

func (s *seedByHash) incrementSeed(userId string, v string) {
h := memHashString(userId, v)
i := h % uint64(len(s.seedByHash))
l := h % uint64(len(s.strippedLock))
s.strippedLock[l].Lock()
defer s.strippedLock[l].Unlock()
s.seedByHash[i]++
}

type fifoCache[V any] struct {
cfg PostingsCacheConfig
cachedValues *sync.Map
Expand Down Expand Up @@ -425,6 +470,7 @@ func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool
return r >= ttl
}

func MemHashString(str string) uint64 {
return xxhash.Sum64(yoloBuf(str))
func memHashString(userId, v string) uint64 {
h := fnv1a.HashString64(userId)
return fnv1a.AddString64(h, v)
}
18 changes: 18 additions & 0 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,24 @@ func TestFifoCacheExpire(t *testing.T) {
}
}

func Test_memHashString(test *testing.T) {
numberOfTenants := 200
numberOfMetrics := 100
occurrences := map[uint64]int{}

for k := 0; k < 10; k++ {
for j := 0; j < numberOfMetrics; j++ {
metricName := fmt.Sprintf("metricName%v", j)
for i := 0; i < numberOfTenants; i++ {
userId := fmt.Sprintf("user%v", i)
occurrences[memHashString(userId, metricName)]++
}
}

require.Len(test, occurrences, numberOfMetrics*numberOfTenants)
}
}

func RepeatStringIfNeeded(seed string, length int) string {
if len(seed) > length {
return seed
Expand Down

0 comments on commit c46aec6

Please sign in to comment.