From 80e3fd60e95e1f404134593deba0c82c7616fd01 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 2 Jul 2024 16:21:49 -0700 Subject: [PATCH] Metrics Labels String Interning --- docs/configuration/config-file-reference.md | 4 ++ pkg/ingester/ingester.go | 23 +++++- pkg/ingester/lifecycle_test.go | 1 + pkg/util/strings.go | 78 +++++++++++++++++++++ 4 files changed, 104 insertions(+), 2 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index ab0570c67ec..4f995c58c65 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2971,6 +2971,10 @@ instance_limits: # Customize the message contained in limit errors # CLI flag: -ingester.admin-limit-message [admin_limit_message: | default = "please contact administrator to raise it"] + +# Experimental: Enable string interning for metrics labels. +# CLI flag: -ingester.labels-string-interning +[labels_string_interning_enabled: | default = false] ``` ### `ingester_client_config` diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e0673410491..dd2fdb961d1 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -134,6 +134,8 @@ type Config struct { // For admin contact details AdminLimitMessage string `yaml:"admin_limit_message"` + + LabelsStringInterningEnabled bool `yaml:"labels_string_interning_enabled"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -158,6 +160,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.AdminLimitMessage, "ingester.admin-limit-message", "please contact administrator to raise it", "Customize the message contained in limit errors") + f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning", false, "Experimental: Enable string interning for metrics labels.") } func (cfg *Config) Validate() error { @@ -165,6 +168,10 @@ func (cfg *Config) Validate() error { return err } + if cfg.LabelsStringInterningEnabled { + logutil.WarnExperimentalUse("String interning for metrics labels Enabled") + } + return nil } @@ -296,6 +303,10 @@ type userTSDB struct { // Cached shipped blocks. shippedBlocksMtx sync.Mutex shippedBlocks map[ulid.ULID]struct{} + + // Used to dedup strings and keep a single reference in memory + labelsStringInterningEnabled bool + interner util.Interner } // Explicitly wrapping the tsdb.DB functions that we use. @@ -425,6 +436,9 @@ func (u *userTSDB) PostCreation(metric labels.Labels) { } u.seriesInMetric.increaseSeriesForMetric(metricName) u.labelSetCounter.increaseSeriesLabelSet(u, metric) + if u.labelsStringInterningEnabled { + metric.InternStrings(u.interner.Intern) + } } // PostDeletion implements SeriesLifecycleCallback interface. @@ -439,6 +453,9 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels) } u.seriesInMetric.decreaseSeriesForMetric(metricName) u.labelSetCounter.decreaseSeriesLabelSet(u, metric) + if u.labelsStringInterningEnabled { + metric.ReleaseStrings(u.interner.Release) + } } } @@ -2047,8 +2064,10 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), - instanceLimitsFn: i.getInstanceLimits, - instanceSeriesCount: &i.TSDBState.seriesCount, + instanceLimitsFn: i.getInstanceLimits, + instanceSeriesCount: &i.TSDBState.seriesCount, + interner: util.NewInterner(), + labelsStringInterningEnabled: i.cfg.LabelsStringInterningEnabled, } enableExemplars := false diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index d69be2f7795..efa739b4264 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -42,6 +42,7 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg.LifecyclerConfig.ID = "localhost" cfg.LifecyclerConfig.FinalSleep = 0 cfg.ActiveSeriesMetricsEnabled = true + cfg.LabelsStringInterningEnabled = true return cfg } diff --git a/pkg/util/strings.go b/pkg/util/strings.go index c085452286f..ddc9de9ff9c 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -6,6 +6,7 @@ import ( "unsafe" "github.com/bboreham/go-loser" + "go.uber.org/atomic" ) // StringsContain returns true if the search value is within the list of input values. @@ -139,3 +140,80 @@ func MergeSortedSlices(ctx context.Context, a ...[]string) ([]string, error) { } return r, nil } + +type Interner interface { + Intern(s string) string + Release(s string) +} + +// NewInterner returns a new Interner to be used to intern strings. +// Based on https://github.com/prometheus/prometheus/blob/726ed124e4468d0274ba89b0934a6cc8c975532d/storage/remote/intern.go#L51 +func NewInterner() Interner { + return &pool{ + pool: map[string]*entry{}, + } +} + +type pool struct { + mtx sync.RWMutex + pool map[string]*entry +} + +type entry struct { + refs atomic.Int64 + + s string +} + +func newEntry(s string) *entry { + return &entry{s: s} +} + +// Intern returns the interned string. It returns the canonical representation of string. +func (p *pool) Intern(s string) string { + if s == "" { + return "" + } + + p.mtx.RLock() + interned, ok := p.pool[s] + p.mtx.RUnlock() + if ok { + interned.refs.Inc() + return interned.s + } + p.mtx.Lock() + defer p.mtx.Unlock() + if interned, ok := p.pool[s]; ok { + interned.refs.Inc() + return interned.s + } + + p.pool[s] = newEntry(s) + p.pool[s].refs.Store(1) + return s +} + +// Release releases a reference of the string `s`. +// If the reference count become 0, the string `s` is removed from the memory +func (p *pool) Release(s string) { + p.mtx.RLock() + interned, ok := p.pool[s] + p.mtx.RUnlock() + + if !ok { + return + } + + refs := interned.refs.Dec() + if refs > 0 { + return + } + + p.mtx.Lock() + defer p.mtx.Unlock() + if interned.refs.Load() != 0 { + return + } + delete(p.pool, s) +}