Skip to content

Commit

Permalink
Metrics Labels String Interning
Browse files Browse the repository at this point in the history
  • Loading branch information
alanprot committed Jul 2, 2024
1 parent ea7a076 commit 80e3fd6
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2971,6 +2971,10 @@ instance_limits:
# Customize the message contained in limit errors
# CLI flag: -ingester.admin-limit-message
[admin_limit_message: <string> | default = "please contact administrator to raise it"]
# Experimental: Enable string interning for metrics labels.
# CLI flag: -ingester.labels-string-interning
[labels_string_interning_enabled: <boolean> | default = false]
```

### `ingester_client_config`
Expand Down
23 changes: 21 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ type Config struct {

// For admin contact details
AdminLimitMessage string `yaml:"admin_limit_message"`

LabelsStringInterningEnabled bool `yaml:"labels_string_interning_enabled"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -158,13 +160,18 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.StringVar(&cfg.AdminLimitMessage, "ingester.admin-limit-message", "please contact administrator to raise it", "Customize the message contained in limit errors")

f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning", false, "Experimental: Enable string interning for metrics labels.")
}

func (cfg *Config) Validate() error {
if err := cfg.LifecyclerConfig.Validate(); err != nil {
return err
}

if cfg.LabelsStringInterningEnabled {
logutil.WarnExperimentalUse("String interning for metrics labels Enabled")
}

return nil
}

Expand Down Expand Up @@ -296,6 +303,10 @@ type userTSDB struct {
// Cached shipped blocks.
shippedBlocksMtx sync.Mutex
shippedBlocks map[ulid.ULID]struct{}

// Used to dedup strings and keep a single reference in memory
labelsStringInterningEnabled bool
interner util.Interner
}

// Explicitly wrapping the tsdb.DB functions that we use.
Expand Down Expand Up @@ -425,6 +436,9 @@ func (u *userTSDB) PostCreation(metric labels.Labels) {
}
u.seriesInMetric.increaseSeriesForMetric(metricName)
u.labelSetCounter.increaseSeriesLabelSet(u, metric)
if u.labelsStringInterningEnabled {
metric.InternStrings(u.interner.Intern)
}
}

// PostDeletion implements SeriesLifecycleCallback interface.
Expand All @@ -439,6 +453,9 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels)
}
u.seriesInMetric.decreaseSeriesForMetric(metricName)
u.labelSetCounter.decreaseSeriesLabelSet(u, metric)
if u.labelsStringInterningEnabled {
metric.ReleaseStrings(u.interner.Release)
}
}
}

Expand Down Expand Up @@ -2047,8 +2064,10 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),

instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.TSDBState.seriesCount,
instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.TSDBState.seriesCount,
interner: util.NewInterner(),
labelsStringInterningEnabled: i.cfg.LabelsStringInterningEnabled,
}

enableExemplars := false
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func defaultIngesterTestConfig(t testing.TB) Config {
cfg.LifecyclerConfig.ID = "localhost"
cfg.LifecyclerConfig.FinalSleep = 0
cfg.ActiveSeriesMetricsEnabled = true
cfg.LabelsStringInterningEnabled = true
return cfg
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/util/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"unsafe"

"github.com/bboreham/go-loser"
"go.uber.org/atomic"
)

// StringsContain returns true if the search value is within the list of input values.
Expand Down Expand Up @@ -139,3 +140,80 @@ func MergeSortedSlices(ctx context.Context, a ...[]string) ([]string, error) {
}
return r, nil
}

type Interner interface {
Intern(s string) string
Release(s string)
}

// NewInterner returns a new Interner to be used to intern strings.
// Based on https://github.com/prometheus/prometheus/blob/726ed124e4468d0274ba89b0934a6cc8c975532d/storage/remote/intern.go#L51
func NewInterner() Interner {
return &pool{
pool: map[string]*entry{},
}
}

type pool struct {
mtx sync.RWMutex
pool map[string]*entry
}

type entry struct {
refs atomic.Int64

s string
}

func newEntry(s string) *entry {
return &entry{s: s}
}

// Intern returns the interned string. It returns the canonical representation of string.
func (p *pool) Intern(s string) string {
if s == "" {
return ""
}

p.mtx.RLock()
interned, ok := p.pool[s]
p.mtx.RUnlock()
if ok {
interned.refs.Inc()
return interned.s
}
p.mtx.Lock()
defer p.mtx.Unlock()
if interned, ok := p.pool[s]; ok {
interned.refs.Inc()
return interned.s
}

p.pool[s] = newEntry(s)
p.pool[s].refs.Store(1)
return s
}

// Release releases a reference of the string `s`.
// If the reference count become 0, the string `s` is removed from the memory
func (p *pool) Release(s string) {
p.mtx.RLock()
interned, ok := p.pool[s]
p.mtx.RUnlock()

if !ok {
return
}

refs := interned.refs.Dec()
if refs > 0 {
return
}

p.mtx.Lock()
defer p.mtx.Unlock()
if interned.refs.Load() != 0 {
return
}
delete(p.pool, s)
}

0 comments on commit 80e3fd6

Please sign in to comment.