diff --git a/CHANGELOG.md b/CHANGELOG.md index bf025c719d..7da08bf6ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ * [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406 * [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435 * [ENHANCEMENT] Querier: Add new `cortex_querier_codec_response_size` metric to track the size of the encoded query responses from queriers. #6444 +* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_samples_per_labelset_total` metric to calculate ingestion rate per label set. #6443 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 * [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326 * [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 05a57a6213..eb25969978 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -64,6 +64,8 @@ const ( clearStaleIngesterMetricsInterval = time.Minute + labelSetMetricsTickInterval = 30 * time.Second + // mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and // it was based on empirical observation: See BenchmarkMergeSlicesParallel mergeSlicesParallelism = 8 @@ -107,6 +109,7 @@ type Distributor struct { // Metrics queryDuration *instrument.HistogramCollector receivedSamples *prometheus.CounterVec + receivedSamplesPerLabelSet *prometheus.CounterVec receivedExemplars *prometheus.CounterVec receivedMetadata *prometheus.CounterVec incomingSamples *prometheus.CounterVec @@ -125,6 +128,9 @@ type Distributor struct { validateMetrics *validation.ValidateMetrics asyncExecutor util.AsyncExecutor + + // Map to track label sets from user. + labelSetTracker *labelSetTracker } // Config contains the configuration required to @@ -302,6 +308,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "distributor_received_samples_total", Help: "The total number of received samples, excluding rejected and deduped samples.", }, []string{"user", "type"}), + receivedSamplesPerLabelSet: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "distributor_received_samples_per_labelset_total", + Help: "The total number of received samples per label set, excluding rejected and deduped samples.", + }, []string{"user", "type", "labelset"}), receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "distributor_received_exemplars_total", @@ -377,6 +388,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove asyncExecutor: util.NewNoOpExecutor(), } + d.labelSetTracker = newLabelSetTracker(d.receivedSamplesPerLabelSet) + if cfg.NumPushWorkers > 0 { util_log.WarnExperimentalUse("Distributor: using goroutine worker pool") d.asyncExecutor = util.NewWorkerPool("distributor", cfg.NumPushWorkers, reg) @@ -449,6 +462,9 @@ func (d *Distributor) running(ctx context.Context) error { staleIngesterMetricTicker := time.NewTicker(clearStaleIngesterMetricsInterval) defer staleIngesterMetricTicker.Stop() + labelSetMetricsTicker := time.NewTicker(labelSetMetricsTickInterval) + defer labelSetMetricsTicker.Stop() + for { select { case <-ctx.Done(): @@ -460,6 +476,9 @@ func (d *Distributor) running(ctx context.Context) error { case <-staleIngesterMetricTicker.C: d.cleanStaleIngesterMetrics() + case <-labelSetMetricsTicker.C: + d.updateLabelSetMetrics() + case err := <-d.subservicesWatcher.Chan(): return errors.Wrap(err, "distributor subservice failed") } @@ -486,6 +505,10 @@ func (d *Distributor) cleanupInactiveUser(userID string) { level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err) } + if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, map[string]string{"user": userID}); err != nil { + level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user", "user", userID, "err", err) + } + validation.DeletePerUserValidationMetrics(d.validateMetrics, userID, d.log) } @@ -777,6 +800,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return &cortexpb.WriteResponse{}, firstPartialErr } +func (d *Distributor) updateLabelSetMetrics() { + activeUserSet := make(map[string]map[uint64]struct{}) + for _, user := range d.activeUsers.ActiveUsers() { + limits := d.limits.LimitsPerLabelSet(user) + activeUserSet[user] = make(map[uint64]struct{}, len(limits)) + for _, l := range limits { + activeUserSet[user][l.Hash] = struct{}{} + } + } + + d.labelSetTracker.updateMetrics(activeUserSet) +} + func (d *Distributor) cleanStaleIngesterMetrics() { healthy, unhealthy, err := d.ingestersRing.GetAllInstanceDescs(ring.WriteNoExtend) if err != nil { @@ -888,8 +924,12 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write validatedFloatSamples := 0 validatedHistogramSamples := 0 validatedExemplars := 0 + limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID) - var firstPartialErr error + var ( + labelSetCounters map[uint64]*samplesLabelSetEntry + firstPartialErr error + ) latestSampleTimestampMs := int64(0) defer func() { @@ -1005,12 +1045,33 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write continue } + matchedLabelSetLimits := validation.LimitsPerLabelSetsForSeries(limitsPerLabelSet, cortexpb.FromLabelAdaptersToLabels(validatedSeries.Labels)) + if len(matchedLabelSetLimits) > 0 && labelSetCounters == nil { + // TODO: use pool. + labelSetCounters = make(map[uint64]*samplesLabelSetEntry, len(matchedLabelSetLimits)) + } + for _, l := range matchedLabelSetLimits { + if c, exists := labelSetCounters[l.Hash]; exists { + c.floatSamples += int64(len(ts.Samples)) + c.histogramSamples += int64(len(ts.Histograms)) + } else { + labelSetCounters[l.Hash] = &samplesLabelSetEntry{ + floatSamples: int64(len(ts.Samples)), + histogramSamples: int64(len(ts.Histograms)), + labels: l.LabelSet, + } + } + } + seriesKeys = append(seriesKeys, key) validatedTimeseries = append(validatedTimeseries, validatedSeries) validatedFloatSamples += len(ts.Samples) validatedHistogramSamples += len(ts.Histograms) validatedExemplars += len(ts.Exemplars) } + for h, counter := range labelSetCounters { + d.labelSetTracker.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples) + } return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2c95f82788..4ac187ffd7 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -420,6 +420,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) { "cortex_distributor_metadata_in_total", "cortex_distributor_non_ha_samples_received_total", "cortex_distributor_latest_seen_sample_timestamp_seconds", + "cortex_distributor_received_samples_per_labelset_total", } allMetrics := append(removedMetrics, permanentMetrics...) @@ -438,6 +439,8 @@ func TestDistributor_MetricsCleanup(t *testing.T) { d.nonHASamples.WithLabelValues("userA").Add(5) d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111) + d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeFloat, "{}").Add(5) + d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeHistogram, "{}").Add(10) h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend) ingId0, _ := r.GetInstanceIdByAddr(h[0].Addr) @@ -473,6 +476,11 @@ func TestDistributor_MetricsCleanup(t *testing.T) { cortex_distributor_received_metadata_total{user="userA"} 5 cortex_distributor_received_metadata_total{user="userB"} 10 + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="userA"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="userA"} 10 + # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. # TYPE cortex_distributor_received_samples_total counter cortex_distributor_received_samples_total{type="float",user="userA"} 5 @@ -4081,6 +4089,116 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing } } +func TestDistributor_PushLabelSetMetrics(t *testing.T) { + t.Parallel() + inputSeries := []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, + { + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "one"}, + }, + { + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "two"}, + }, + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "three"}, + }, + } + + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{ + {Hash: 0, LabelSet: labels.FromStrings("cluster", "one")}, + {Hash: 1, LabelSet: labels.FromStrings("cluster", "two")}, + {Hash: 2, LabelSet: labels.EmptyLabels()}, + } + + ds, _, regs, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + reg := regs[0] + + // Push the series to the distributor + id := "user" + req := mockWriteRequest(inputSeries, 1, 1, false) + ctx := user.InjectOrgID(context.Background(), id) + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 1 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 1 + `), "cortex_distributor_received_samples_per_labelset_total")) + + // Push more series. + inputSeries = []labels.Labels{ + { + {Name: "__name__", Value: "baz"}, + {Name: "cluster", Value: "two"}, + }, + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "four"}, + }, + } + // Write the same request twice for different users. + req = mockWriteRequest(inputSeries, 1, 1, false) + ctx2 := user.InjectOrgID(context.Background(), "user2") + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) + req = mockWriteRequest(inputSeries, 1, 1, false) + _, err = ds[0].Push(ctx2, req) + require.NoError(t, err) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user2"} 1 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1 + `), "cortex_distributor_received_samples_per_labelset_total")) + + // Remove existing limits and add new limits + limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{ + {Hash: 3, LabelSet: labels.FromStrings("cluster", "three")}, + {Hash: 4, LabelSet: labels.FromStrings("cluster", "four")}, + {Hash: 2, LabelSet: labels.EmptyLabels()}, + } + ds[0].limits, err = validation.NewOverrides(limits, nil) + require.NoError(t, err) + ds[0].updateLabelSetMetrics() + // Old label set metrics are removed. New label set metrics will be added when + // new requests come in. + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1 + `), "cortex_distributor_received_samples_per_labelset_total")) + + // Metrics from `user` got removed but `user2` metric should remain. + ds[0].cleanupInactiveUser(id) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1 + `), "cortex_distributor_received_samples_per_labelset_total")) +} + func countMockIngestersCalls(ingesters []*mockIngester, name string) int { count := 0 for i := 0; i < len(ingesters); i++ { diff --git a/pkg/distributor/metrics.go b/pkg/distributor/metrics.go new file mode 100644 index 0000000000..786ab954c2 --- /dev/null +++ b/pkg/distributor/metrics.go @@ -0,0 +1,95 @@ +package distributor + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/cortexproject/cortex/pkg/util" +) + +const ( + numMetricShards = 128 +) + +type labelSetTracker struct { + receivedSamplesPerLabelSet *prometheus.CounterVec + + shards []*labelSetCounterShard +} + +func newLabelSetTracker(receivedSamplesPerLabelSet *prometheus.CounterVec) *labelSetTracker { + shards := make([]*labelSetCounterShard, 0, numMetricShards) + for i := 0; i < numMetricShards; i++ { + shards = append(shards, &labelSetCounterShard{ + RWMutex: &sync.RWMutex{}, + userLabelSets: map[string]map[uint64]labels.Labels{}, + }) + } + return &labelSetTracker{shards: shards, receivedSamplesPerLabelSet: receivedSamplesPerLabelSet} +} + +type labelSetCounterShard struct { + *sync.RWMutex + userLabelSets map[string]map[uint64]labels.Labels +} + +type samplesLabelSetEntry struct { + floatSamples int64 + histogramSamples int64 + labels labels.Labels +} + +func (m *labelSetTracker) increaseSamplesLabelSet(userId string, hash uint64, labelSet labels.Labels, floatSamples, histogramSamples int64) { + s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricShards] + s.Lock() + if userEntry, ok := s.userLabelSets[userId]; ok { + if _, ok2 := userEntry[hash]; !ok2 { + userEntry[hash] = labelSet + } + } else { + s.userLabelSets[userId] = map[uint64]labels.Labels{hash: labelSet} + } + // Unlock before we update metrics. + s.Unlock() + + labelSetStr := labelSet.String() + if floatSamples > 0 { + m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeFloat, labelSetStr).Add(float64(floatSamples)) + } + if histogramSamples > 0 { + m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeHistogram, labelSetStr).Add(float64(histogramSamples)) + } +} + +// Clean up dangling user and label set from the tracker as well as metrics. +func (m *labelSetTracker) updateMetrics(userSet map[string]map[uint64]struct{}) { + for i := 0; i < numMetricShards; i++ { + shard := m.shards[i] + shard.Lock() + + for user, userEntry := range shard.userLabelSets { + limits, ok := userSet[user] + if !ok { + // If user is removed, we will delete user metrics in cleanupInactiveUser loop + // so skip deleting metrics here. + delete(shard.userLabelSets, user) + continue + } + for h, lbls := range userEntry { + // This limit no longer exists. + if _, ok := limits[h]; !ok { + delete(userEntry, h) + labelSetStr := lbls.String() + m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr) + m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr) + continue + } + } + } + + shard.Unlock() + } +} diff --git a/pkg/distributor/metrics_test.go b/pkg/distributor/metrics_test.go new file mode 100644 index 0000000000..842e4fe6c3 --- /dev/null +++ b/pkg/distributor/metrics_test.go @@ -0,0 +1,103 @@ +package distributor + +import ( + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestLabelSetCounter(t *testing.T) { + metricName := "cortex_distributor_received_samples_per_labelset_total" + reg := prometheus.NewPedanticRegistry() + dummyCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: metricName, + Help: "", + }, []string{"user", "type", "labelset"}) + counter := newLabelSetTracker(dummyCounter) + reg.MustRegister(dummyCounter) + + userID := "1" + userID2 := "2" + userID3 := "3" + + counter.increaseSamplesLabelSet(userID, 0, labels.FromStrings("foo", "bar"), 10, 0) + counter.increaseSamplesLabelSet(userID, 1, labels.FromStrings("foo", "baz"), 0, 5) + counter.increaseSamplesLabelSet(userID, 3, labels.EmptyLabels(), 20, 20) + counter.increaseSamplesLabelSet(userID2, 0, labels.FromStrings("foo", "bar"), 100, 5) + counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 20 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 20 + `), metricName)) + + // Increment metrics and add a new user. + counter.increaseSamplesLabelSet(userID, 3, labels.EmptyLabels(), 20, 20) + counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100) + counter.increaseSamplesLabelSet(userID2, 4, labels.FromStrings("cluster", "us-west-2"), 10, 10) + counter.increaseSamplesLabelSet(userID3, 4, labels.FromStrings("cluster", "us-east-1"), 30, 30) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="float",user="3"} 30 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="histogram",user="3"} 30 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40 + `), metricName)) + + // Remove user 2. But metrics for user 2 not cleaned up as it is expected to be cleaned up + // in cleanupInactiveUser loop. It is expected to have 3 minutes delay in this case. + userSet := map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, + userID3: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}}, + } + counter.updateMetrics(userSet) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="float",user="3"} 30 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="histogram",user="3"} 30 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40 + `), metricName)) + + // Simulate existing limits removed for each user. + userSet = map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}}, + userID2: {}, + userID3: {}, + } + counter.updateMetrics(userSet) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 + `), metricName)) +} diff --git a/pkg/util/active_user.go b/pkg/util/active_user.go index f735dc2278..fb2165904a 100644 --- a/pkg/util/active_user.go +++ b/pkg/util/active_user.go @@ -94,6 +94,18 @@ func (m *ActiveUsers) PurgeInactiveUsers(deadline int64) []string { return inactive } +func (m *ActiveUsers) ActiveUsers(deadline int64) []string { + active := make([]string, 0, len(m.timestamps)) + m.mu.RLock() + defer m.mu.RUnlock() + for userID, ts := range m.timestamps { + if ts.Load() > deadline { + active = append(active, userID) + } + } + return active +} + // ActiveUsersCleanupService tracks active users, and periodically purges inactive ones while running. type ActiveUsersCleanupService struct { services.Service @@ -129,3 +141,7 @@ func (s *ActiveUsersCleanupService) iteration(_ context.Context) error { } return nil } + +func (s *ActiveUsersCleanupService) ActiveUsers() []string { + return s.activeUsers.ActiveUsers(time.Now().Add(-s.inactiveTimeout).UnixNano()) +} diff --git a/pkg/util/active_user_test.go b/pkg/util/active_user_test.go index f2e50866fc..4db9e7b0cd 100644 --- a/pkg/util/active_user_test.go +++ b/pkg/util/active_user_test.go @@ -3,6 +3,7 @@ package util import ( "fmt" "runtime" + "sort" "strconv" "sync" "testing" @@ -28,6 +29,28 @@ func TestActiveUser(t *testing.T) { require.Equal(t, []string{"test1"}, as.PurgeInactiveUsers(20)) } +func TestActiveUser_ActiveUsers(t *testing.T) { + as := NewActiveUsers() + as.UpdateUserTimestamp("test1", 5) + as.UpdateUserTimestamp("test2", 10) + as.UpdateUserTimestamp("test3", 15) + + users := as.ActiveUsers(0) + sort.Strings(users) + require.Equal(t, []string{"test1", "test2", "test3"}, users) + + users = as.ActiveUsers(5) + sort.Strings(users) + require.Equal(t, []string{"test2", "test3"}, users) + + users = as.ActiveUsers(10) + sort.Strings(users) + require.Equal(t, []string{"test3"}, users) + + users = as.ActiveUsers(15) + require.Equal(t, []string{}, users) +} + func TestActiveUserConcurrentUpdateAndPurge(t *testing.T) { count := 10