Skip to content

Commit

Permalink
Add distributor per labelset received samples metric (#6443)
Browse files Browse the repository at this point in the history
* add distributor per labelset ingestion rate metric

Signed-off-by: Ben Ye <[email protected]>

* changelog

Signed-off-by: Ben Ye <[email protected]>

* update metrics synchronously

Signed-off-by: Ben Ye <[email protected]>

* allocate label set counter only if there are matching limtis for the series

Signed-off-by: Ben Ye <[email protected]>

* fix counter initialization

Signed-off-by: Ben Ye <[email protected]>

* fix lock on active users

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Dec 19, 2024
1 parent 0b4041b commit cfe7ac3
Show file tree
Hide file tree
Showing 7 changed files with 418 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
* [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435
* [ENHANCEMENT] Querier: Add new `cortex_querier_codec_response_size` metric to track the size of the encoded query responses from queriers. #6444
* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_samples_per_labelset_total` metric to calculate ingestion rate per label set. #6443
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271
Expand Down
63 changes: 62 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (

clearStaleIngesterMetricsInterval = time.Minute

labelSetMetricsTickInterval = 30 * time.Second

// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
mergeSlicesParallelism = 8
Expand Down Expand Up @@ -107,6 +109,7 @@ type Distributor struct {
// Metrics
queryDuration *instrument.HistogramCollector
receivedSamples *prometheus.CounterVec
receivedSamplesPerLabelSet *prometheus.CounterVec
receivedExemplars *prometheus.CounterVec
receivedMetadata *prometheus.CounterVec
incomingSamples *prometheus.CounterVec
Expand All @@ -125,6 +128,9 @@ type Distributor struct {
validateMetrics *validation.ValidateMetrics

asyncExecutor util.AsyncExecutor

// Map to track label sets from user.
labelSetTracker *labelSetTracker
}

// Config contains the configuration required to
Expand Down Expand Up @@ -302,6 +308,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user", "type"}),
receivedSamplesPerLabelSet: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_samples_per_labelset_total",
Help: "The total number of received samples per label set, excluding rejected and deduped samples.",
}, []string{"user", "type", "labelset"}),
receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_exemplars_total",
Expand Down Expand Up @@ -377,6 +388,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
asyncExecutor: util.NewNoOpExecutor(),
}

d.labelSetTracker = newLabelSetTracker(d.receivedSamplesPerLabelSet)

if cfg.NumPushWorkers > 0 {
util_log.WarnExperimentalUse("Distributor: using goroutine worker pool")
d.asyncExecutor = util.NewWorkerPool("distributor", cfg.NumPushWorkers, reg)
Expand Down Expand Up @@ -449,6 +462,9 @@ func (d *Distributor) running(ctx context.Context) error {
staleIngesterMetricTicker := time.NewTicker(clearStaleIngesterMetricsInterval)
defer staleIngesterMetricTicker.Stop()

labelSetMetricsTicker := time.NewTicker(labelSetMetricsTickInterval)
defer labelSetMetricsTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -460,6 +476,9 @@ func (d *Distributor) running(ctx context.Context) error {
case <-staleIngesterMetricTicker.C:
d.cleanStaleIngesterMetrics()

case <-labelSetMetricsTicker.C:
d.updateLabelSetMetrics()

case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
Expand All @@ -486,6 +505,10 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
}

if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, map[string]string{"user": userID}); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user", "user", userID, "err", err)
}

validation.DeletePerUserValidationMetrics(d.validateMetrics, userID, d.log)
}

Expand Down Expand Up @@ -777,6 +800,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return &cortexpb.WriteResponse{}, firstPartialErr
}

func (d *Distributor) updateLabelSetMetrics() {
activeUserSet := make(map[string]map[uint64]struct{})
for _, user := range d.activeUsers.ActiveUsers() {
limits := d.limits.LimitsPerLabelSet(user)
activeUserSet[user] = make(map[uint64]struct{}, len(limits))
for _, l := range limits {
activeUserSet[user][l.Hash] = struct{}{}
}
}

d.labelSetTracker.updateMetrics(activeUserSet)
}

func (d *Distributor) cleanStaleIngesterMetrics() {
healthy, unhealthy, err := d.ingestersRing.GetAllInstanceDescs(ring.WriteNoExtend)
if err != nil {
Expand Down Expand Up @@ -888,8 +924,12 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
validatedFloatSamples := 0
validatedHistogramSamples := 0
validatedExemplars := 0
limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID)

var firstPartialErr error
var (
labelSetCounters map[uint64]*samplesLabelSetEntry
firstPartialErr error
)

latestSampleTimestampMs := int64(0)
defer func() {
Expand Down Expand Up @@ -1005,12 +1045,33 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
continue
}

matchedLabelSetLimits := validation.LimitsPerLabelSetsForSeries(limitsPerLabelSet, cortexpb.FromLabelAdaptersToLabels(validatedSeries.Labels))
if len(matchedLabelSetLimits) > 0 && labelSetCounters == nil {
// TODO: use pool.
labelSetCounters = make(map[uint64]*samplesLabelSetEntry, len(matchedLabelSetLimits))
}
for _, l := range matchedLabelSetLimits {
if c, exists := labelSetCounters[l.Hash]; exists {
c.floatSamples += int64(len(ts.Samples))
c.histogramSamples += int64(len(ts.Histograms))
} else {
labelSetCounters[l.Hash] = &samplesLabelSetEntry{
floatSamples: int64(len(ts.Samples)),
histogramSamples: int64(len(ts.Histograms)),
labels: l.LabelSet,
}
}
}

seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
validatedFloatSamples += len(ts.Samples)
validatedHistogramSamples += len(ts.Histograms)
validatedExemplars += len(ts.Exemplars)
}
for h, counter := range labelSetCounters {
d.labelSetTracker.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples)
}
return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
}

Expand Down
118 changes: 118 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
"cortex_distributor_metadata_in_total",
"cortex_distributor_non_ha_samples_received_total",
"cortex_distributor_latest_seen_sample_timestamp_seconds",
"cortex_distributor_received_samples_per_labelset_total",
}

allMetrics := append(removedMetrics, permanentMetrics...)
Expand All @@ -438,6 +439,8 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.nonHASamples.WithLabelValues("userA").Add(5)
d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)
d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeFloat, "{}").Add(5)
d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeHistogram, "{}").Add(10)

h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend)
ingId0, _ := r.GetInstanceIdByAddr(h[0].Addr)
Expand Down Expand Up @@ -473,6 +476,11 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
cortex_distributor_received_metadata_total{user="userA"} 5
cortex_distributor_received_metadata_total{user="userB"} 10
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="userA"} 5
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="userA"} 10
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userA"} 5
Expand Down Expand Up @@ -4081,6 +4089,116 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
}
}

func TestDistributor_PushLabelSetMetrics(t *testing.T) {
t.Parallel()
inputSeries := []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "one"},
},
{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "two"},
},
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "three"},
},
}

var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{
{Hash: 0, LabelSet: labels.FromStrings("cluster", "one")},
{Hash: 1, LabelSet: labels.FromStrings("cluster", "two")},
{Hash: 2, LabelSet: labels.EmptyLabels()},
}

ds, _, regs, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})
reg := regs[0]

// Push the series to the distributor
id := "user"
req := mockWriteRequest(inputSeries, 1, 1, false)
ctx := user.InjectOrgID(context.Background(), id)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 1
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 1
`), "cortex_distributor_received_samples_per_labelset_total"))

// Push more series.
inputSeries = []labels.Labels{
{
{Name: "__name__", Value: "baz"},
{Name: "cluster", Value: "two"},
},
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "four"},
},
}
// Write the same request twice for different users.
req = mockWriteRequest(inputSeries, 1, 1, false)
ctx2 := user.InjectOrgID(context.Background(), "user2")
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)
req = mockWriteRequest(inputSeries, 1, 1, false)
_, err = ds[0].Push(ctx2, req)
require.NoError(t, err)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user2"} 1
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
`), "cortex_distributor_received_samples_per_labelset_total"))

// Remove existing limits and add new limits
limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{
{Hash: 3, LabelSet: labels.FromStrings("cluster", "three")},
{Hash: 4, LabelSet: labels.FromStrings("cluster", "four")},
{Hash: 2, LabelSet: labels.EmptyLabels()},
}
ds[0].limits, err = validation.NewOverrides(limits, nil)
require.NoError(t, err)
ds[0].updateLabelSetMetrics()
// Old label set metrics are removed. New label set metrics will be added when
// new requests come in.
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
`), "cortex_distributor_received_samples_per_labelset_total"))

// Metrics from `user` got removed but `user2` metric should remain.
ds[0].cleanupInactiveUser(id)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
`), "cortex_distributor_received_samples_per_labelset_total"))
}

func countMockIngestersCalls(ingesters []*mockIngester, name string) int {
count := 0
for i := 0; i < len(ingesters); i++ {
Expand Down
95 changes: 95 additions & 0 deletions pkg/distributor/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package distributor

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/cortexproject/cortex/pkg/util"
)

const (
numMetricShards = 128
)

type labelSetTracker struct {
receivedSamplesPerLabelSet *prometheus.CounterVec

shards []*labelSetCounterShard
}

func newLabelSetTracker(receivedSamplesPerLabelSet *prometheus.CounterVec) *labelSetTracker {
shards := make([]*labelSetCounterShard, 0, numMetricShards)
for i := 0; i < numMetricShards; i++ {
shards = append(shards, &labelSetCounterShard{
RWMutex: &sync.RWMutex{},
userLabelSets: map[string]map[uint64]labels.Labels{},
})
}
return &labelSetTracker{shards: shards, receivedSamplesPerLabelSet: receivedSamplesPerLabelSet}
}

type labelSetCounterShard struct {
*sync.RWMutex
userLabelSets map[string]map[uint64]labels.Labels
}

type samplesLabelSetEntry struct {
floatSamples int64
histogramSamples int64
labels labels.Labels
}

func (m *labelSetTracker) increaseSamplesLabelSet(userId string, hash uint64, labelSet labels.Labels, floatSamples, histogramSamples int64) {
s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricShards]
s.Lock()
if userEntry, ok := s.userLabelSets[userId]; ok {
if _, ok2 := userEntry[hash]; !ok2 {
userEntry[hash] = labelSet
}
} else {
s.userLabelSets[userId] = map[uint64]labels.Labels{hash: labelSet}
}
// Unlock before we update metrics.
s.Unlock()

labelSetStr := labelSet.String()
if floatSamples > 0 {
m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeFloat, labelSetStr).Add(float64(floatSamples))
}
if histogramSamples > 0 {
m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeHistogram, labelSetStr).Add(float64(histogramSamples))
}
}

// Clean up dangling user and label set from the tracker as well as metrics.
func (m *labelSetTracker) updateMetrics(userSet map[string]map[uint64]struct{}) {
for i := 0; i < numMetricShards; i++ {
shard := m.shards[i]
shard.Lock()

for user, userEntry := range shard.userLabelSets {
limits, ok := userSet[user]
if !ok {
// If user is removed, we will delete user metrics in cleanupInactiveUser loop
// so skip deleting metrics here.
delete(shard.userLabelSets, user)
continue
}
for h, lbls := range userEntry {
// This limit no longer exists.
if _, ok := limits[h]; !ok {
delete(userEntry, h)
labelSetStr := lbls.String()
m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr)
m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr)
continue
}
}
}

shard.Unlock()
}
}
Loading

0 comments on commit cfe7ac3

Please sign in to comment.