Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributor per labelset received samples metric #6443

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406
* [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435
* [ENHANCEMENT] Querier: Add new `cortex_querier_codec_response_size` metric to track the size of the encoded query responses from queriers. #6444
* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_samples_per_labelset_total` metric to calculate ingestion rate per label set. #6443
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271
Expand Down
54 changes: 54 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (

clearStaleIngesterMetricsInterval = time.Minute

labelSetMetricsTickInterval = 30 * time.Second

// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
mergeSlicesParallelism = 8
Expand Down Expand Up @@ -107,6 +109,7 @@ type Distributor struct {
// Metrics
queryDuration *instrument.HistogramCollector
receivedSamples *prometheus.CounterVec
receivedSamplesPerLabelSet *prometheus.CounterVec
receivedExemplars *prometheus.CounterVec
receivedMetadata *prometheus.CounterVec
incomingSamples *prometheus.CounterVec
Expand All @@ -125,6 +128,9 @@ type Distributor struct {
validateMetrics *validation.ValidateMetrics

asyncExecutor util.AsyncExecutor

// Map to track label sets from user.
labelSetTracker *labelSetTracker
}

// Config contains the configuration required to
Expand Down Expand Up @@ -302,6 +308,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user", "type"}),
receivedSamplesPerLabelSet: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_samples_per_labelset_total",
Help: "The total number of received samples per label set, excluding rejected and deduped samples.",
}, []string{"user", "type", "labelset"}),
receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_exemplars_total",
Expand Down Expand Up @@ -377,6 +388,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
asyncExecutor: util.NewNoOpExecutor(),
}

d.labelSetTracker = newLabelSetTracker(d.receivedSamplesPerLabelSet)

if cfg.NumPushWorkers > 0 {
util_log.WarnExperimentalUse("Distributor: using goroutine worker pool")
d.asyncExecutor = util.NewWorkerPool("distributor", cfg.NumPushWorkers, reg)
Expand Down Expand Up @@ -449,6 +462,9 @@ func (d *Distributor) running(ctx context.Context) error {
staleIngesterMetricTicker := time.NewTicker(clearStaleIngesterMetricsInterval)
defer staleIngesterMetricTicker.Stop()

labelSetMetricsTicker := time.NewTicker(labelSetMetricsTickInterval)
defer labelSetMetricsTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -460,6 +476,9 @@ func (d *Distributor) running(ctx context.Context) error {
case <-staleIngesterMetricTicker.C:
d.cleanStaleIngesterMetrics()

case <-labelSetMetricsTicker.C:
d.updateLabelSetMetrics()

case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
Expand All @@ -486,6 +505,10 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
}

if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, map[string]string{"user": userID}); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user", "user", userID, "err", err)
}

validation.DeletePerUserValidationMetrics(d.validateMetrics, userID, d.log)
}

Expand Down Expand Up @@ -777,6 +800,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return &cortexpb.WriteResponse{}, firstPartialErr
}

func (d *Distributor) updateLabelSetMetrics() {
activeUserSet := make(map[string]map[uint64]struct{})
for _, user := range d.activeUsers.ActiveUsers() {
limits := d.limits.LimitsPerLabelSet(user)
activeUserSet[user] = make(map[uint64]struct{}, len(limits))
for _, l := range limits {
activeUserSet[user][l.Hash] = struct{}{}
}
}

d.labelSetTracker.updateMetrics(activeUserSet)
}

func (d *Distributor) cleanStaleIngesterMetrics() {
healthy, unhealthy, err := d.ingestersRing.GetAllInstanceDescs(ring.WriteNoExtend)
if err != nil {
Expand Down Expand Up @@ -888,7 +924,9 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
validatedFloatSamples := 0
validatedHistogramSamples := 0
validatedExemplars := 0
limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID)

labelSetCounters := make(map[uint64]*samplesLabelSetEntry)
var firstPartialErr error

latestSampleTimestampMs := int64(0)
Expand Down Expand Up @@ -1005,12 +1043,28 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
continue
}

for _, l := range validation.LimitsPerLabelSetsForSeries(limitsPerLabelSet, cortexpb.FromLabelAdaptersToLabels(validatedSeries.Labels)) {
if c, exists := labelSetCounters[l.Hash]; exists {
c.floatSamples += int64(len(ts.Samples))
c.histogramSamples += int64(len(ts.Histograms))
} else {
labelSetCounters[l.Hash] = &samplesLabelSetEntry{
floatSamples: int64(len(ts.Samples)),
histogramSamples: int64(len(ts.Histograms)),
labels: l.LabelSet,
}
}
}

seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
validatedFloatSamples += len(ts.Samples)
validatedHistogramSamples += len(ts.Histograms)
validatedExemplars += len(ts.Exemplars)
}
for h, counter := range labelSetCounters {
d.labelSetTracker.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples)
}
return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
}

Expand Down
118 changes: 118 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
"cortex_distributor_metadata_in_total",
"cortex_distributor_non_ha_samples_received_total",
"cortex_distributor_latest_seen_sample_timestamp_seconds",
"cortex_distributor_received_samples_per_labelset_total",
}

allMetrics := append(removedMetrics, permanentMetrics...)
Expand All @@ -438,6 +439,8 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.nonHASamples.WithLabelValues("userA").Add(5)
d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)
d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeFloat, "{}").Add(5)
d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeHistogram, "{}").Add(10)

h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend)
ingId0, _ := r.GetInstanceIdByAddr(h[0].Addr)
Expand Down Expand Up @@ -473,6 +476,11 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
cortex_distributor_received_metadata_total{user="userA"} 5
cortex_distributor_received_metadata_total{user="userB"} 10

# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="userA"} 5
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="userA"} 10

# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userA"} 5
Expand Down Expand Up @@ -4081,6 +4089,116 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
}
}

func TestDistributor_PushLabelSetMetrics(t *testing.T) {
t.Parallel()
inputSeries := []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "one"},
},
{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "two"},
},
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "three"},
},
}

var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{
{Hash: 0, LabelSet: labels.FromStrings("cluster", "one")},
{Hash: 1, LabelSet: labels.FromStrings("cluster", "two")},
{Hash: 2, LabelSet: labels.EmptyLabels()},
}

ds, _, regs, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})
reg := regs[0]

// Push the series to the distributor
id := "user"
req := mockWriteRequest(inputSeries, 1, 1, false)
ctx := user.InjectOrgID(context.Background(), id)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 1
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 1
`), "cortex_distributor_received_samples_per_labelset_total"))

// Push more series.
inputSeries = []labels.Labels{
{
{Name: "__name__", Value: "baz"},
{Name: "cluster", Value: "two"},
},
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "four"},
},
}
// Write the same request twice for different users.
req = mockWriteRequest(inputSeries, 1, 1, false)
ctx2 := user.InjectOrgID(context.Background(), "user2")
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)
req = mockWriteRequest(inputSeries, 1, 1, false)
_, err = ds[0].Push(ctx2, req)
require.NoError(t, err)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user2"} 1
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
`), "cortex_distributor_received_samples_per_labelset_total"))

// Remove existing limits and add new limits
limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{
{Hash: 3, LabelSet: labels.FromStrings("cluster", "three")},
{Hash: 4, LabelSet: labels.FromStrings("cluster", "four")},
{Hash: 2, LabelSet: labels.EmptyLabels()},
}
ds[0].limits, err = validation.NewOverrides(limits, nil)
require.NoError(t, err)
ds[0].updateLabelSetMetrics()
// Old label set metrics are removed. New label set metrics will be added when
// new requests come in.
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
`), "cortex_distributor_received_samples_per_labelset_total"))

// Metrics from `user` got removed but `user2` metric should remain.
ds[0].cleanupInactiveUser(id)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_per_labelset_total counter
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
`), "cortex_distributor_received_samples_per_labelset_total"))
}

func countMockIngestersCalls(ingesters []*mockIngester, name string) int {
count := 0
for i := 0; i < len(ingesters); i++ {
Expand Down
95 changes: 95 additions & 0 deletions pkg/distributor/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package distributor

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/cortexproject/cortex/pkg/util"
)

const (
numMetricShards = 128
)

type labelSetTracker struct {
receivedSamplesPerLabelSet *prometheus.CounterVec

shards []*labelSetCounterShard
}

func newLabelSetTracker(receivedSamplesPerLabelSet *prometheus.CounterVec) *labelSetTracker {
shards := make([]*labelSetCounterShard, 0, numMetricShards)
for i := 0; i < numMetricShards; i++ {
shards = append(shards, &labelSetCounterShard{
RWMutex: &sync.RWMutex{},
userLabelSets: map[string]map[uint64]labels.Labels{},
})
}
return &labelSetTracker{shards: shards, receivedSamplesPerLabelSet: receivedSamplesPerLabelSet}
}

type labelSetCounterShard struct {
*sync.RWMutex
userLabelSets map[string]map[uint64]labels.Labels
}

type samplesLabelSetEntry struct {
floatSamples int64
histogramSamples int64
labels labels.Labels
}

func (m *labelSetTracker) increaseSamplesLabelSet(userId string, hash uint64, labelSet labels.Labels, floatSamples, histogramSamples int64) {
s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricShards]
s.Lock()
if userEntry, ok := s.userLabelSets[userId]; ok {
if _, ok2 := userEntry[hash]; !ok2 {
userEntry[hash] = labelSet
}
} else {
s.userLabelSets[userId] = map[uint64]labels.Labels{hash: labelSet}
}
// Unlock before we update metrics.
s.Unlock()

labelSetStr := labelSet.String()
if floatSamples > 0 {
m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeFloat, labelSetStr).Add(float64(floatSamples))
}
if histogramSamples > 0 {
m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeHistogram, labelSetStr).Add(float64(histogramSamples))
}
}

// Clean up dangling user and label set from the tracker as well as metrics.
func (m *labelSetTracker) updateMetrics(userSet map[string]map[uint64]struct{}) {
for i := 0; i < numMetricShards; i++ {
shard := m.shards[i]
shard.Lock()

for user, userEntry := range shard.userLabelSets {
limits, ok := userSet[user]
if !ok {
// If user is removed, we will delete user metrics in cleanupInactiveUser loop
// so skip deleting metrics here.
delete(shard.userLabelSets, user)
continue
}
for h, lbls := range userEntry {
// This limit no longer exists.
if _, ok := limits[h]; !ok {
delete(userEntry, h)
labelSetStr := lbls.String()
m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr)
m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr)
continue
}
}
}

shard.Unlock()
}
}
Loading
Loading