Skip to content

Commit

Permalink
Cleaning up stale ingester metrics (#5930)
Browse files Browse the repository at this point in the history
* Cleaning up stale ingester metrics

Signed-off-by: alanprot <[email protected]>

* changelog

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored May 7, 2024
1 parent c11dc24 commit 73dfd1a
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
* [ENHANCEMENT] Ingester: Allowing to configure `-blocks-storage.tsdb.head-compaction-interval` flag up to 30 min and add a jitter on the first head compaction. #5919 #5928
* [ENHANCEMENT] Distributor: Added `max_inflight_push_requests` config to ingester client to protect distributor from OOMKilled. #5917
* [ENHANCEMENT] Distributor/Querier: Clean stale per-ingester metrics after ingester restarts. #5930
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920

Expand Down
43 changes: 43 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ const (

instanceIngestionRateTickInterval = time.Second

clearStaleIngesterMetricsInterval = time.Minute

// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
mergeSlicesParallelism = 8
Expand Down Expand Up @@ -398,6 +400,9 @@ func (d *Distributor) running(ctx context.Context) error {
ingestionRateTicker := time.NewTicker(instanceIngestionRateTickInterval)
defer ingestionRateTicker.Stop()

staleIngesterMetricTicker := time.NewTicker(clearStaleIngesterMetricsInterval)
defer staleIngesterMetricTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -406,6 +411,9 @@ func (d *Distributor) running(ctx context.Context) error {
case <-ingestionRateTicker.C:
d.ingestionRate.Tick()

case <-staleIngesterMetricTicker.C:
d.cleanStaleIngesterMetrics()

case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
Expand Down Expand Up @@ -701,6 +709,41 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return &cortexpb.WriteResponse{}, firstPartialErr
}

func (d *Distributor) cleanStaleIngesterMetrics() {
healthy, unhealthy, err := d.ingestersRing.GetAllInstanceDescs(ring.WriteNoExtend)
if err != nil {
level.Warn(d.log).Log("msg", "error cleaning metrics: GetAllInstanceDescs", "err", err)
return
}

ipsMap := map[string]struct{}{}

for _, ing := range append(healthy, unhealthy...) {
ipsMap[ing.Addr] = struct{}{}
}

ingesterMetrics := []*prometheus.CounterVec{d.ingesterAppends, d.ingesterAppendFailures, d.ingesterQueries, d.ingesterQueryFailures}

for _, m := range ingesterMetrics {
metrics, err := util.GetLabels(m, make(map[string]string))

if err != nil {
level.Warn(d.log).Log("msg", "error cleaning metrics: GetLabels", "err", err)
return
}

for _, lbls := range metrics {
if _, ok := ipsMap[lbls.Get("ingester")]; !ok {
err := util.DeleteMatchingLabels(m, map[string]string{"ingester": lbls.Get("ingester")})
if err != nil {
level.Warn(d.log).Log("msg", "error cleaning metrics: DeleteMatchingLabels", "err", err)
return
}
}
}
}
}

func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string) error {
span, _ := opentracing.StartSpanFromContext(ctx, "doBatch")
defer span.Finish()
Expand Down
62 changes: 61 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ func TestDistributor_Push(t *testing.T) {

func TestDistributor_MetricsCleanup(t *testing.T) {
t.Parallel()
dists, _, regs, _ := prepare(t, prepConfig{
dists, _, regs, r := prepare(t, prepConfig{
numDistributors: 1,
numIngesters: 2,
happyIngesters: 2,
})
d := dists[0]
reg := regs[0]
Expand All @@ -334,6 +336,10 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
"cortex_distributor_metadata_in_total",
"cortex_distributor_non_ha_samples_received_total",
"cortex_distributor_latest_seen_sample_timestamp_seconds",
"cortex_distributor_ingester_append_failures_total",
"cortex_distributor_ingester_appends_total",
"cortex_distributor_ingester_query_failures_total",
"cortex_distributor_ingester_queries_total",
}

d.receivedSamples.WithLabelValues("userA").Add(5)
Expand All @@ -349,6 +355,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)

h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend)
d.ingesterAppends.WithLabelValues(h[0].Addr, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(h[0].Addr, typeMetadata, "2xx").Inc()
d.ingesterAppends.WithLabelValues(h[1].Addr, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(h[1].Addr, typeMetadata, "2xx").Inc()
d.ingesterQueries.WithLabelValues(h[0].Addr).Inc()
d.ingesterQueries.WithLabelValues(h[1].Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(h[0].Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(h[1].Addr).Inc()

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
# TYPE cortex_distributor_deduped_samples_total counter
Expand Down Expand Up @@ -388,10 +404,41 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
# TYPE cortex_distributor_exemplars_in_total counter
cortex_distributor_exemplars_in_total{user="userA"} 5
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="0",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
# TYPE cortex_distributor_ingester_queries_total counter
cortex_distributor_ingester_queries_total{ingester="0"} 1
cortex_distributor_ingester_queries_total{ingester="1"} 1
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
# TYPE cortex_distributor_ingester_query_failures_total counter
cortex_distributor_ingester_query_failures_total{ingester="0"} 1
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
`), metrics...))

d.cleanupInactiveUser("userA")

err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) {
r := in.(*ring.Desc)
delete(r.Ingesters, "0")
return in, true, nil
})

test.Poll(t, time.Second, true, func() interface{} {
ings, _, _ := r.GetAllInstanceDescs(ring.Write)
return len(ings) == 1
})

require.NoError(t, err)
d.cleanStaleIngesterMetrics()

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
# TYPE cortex_distributor_deduped_samples_total counter
Expand Down Expand Up @@ -422,6 +469,19 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
# TYPE cortex_distributor_exemplars_in_total counter
# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
# TYPE cortex_distributor_ingester_queries_total counter
cortex_distributor_ingester_queries_total{ingester="1"} 1
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
# TYPE cortex_distributor_ingester_query_failures_total counter
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
`), metrics...))
}

Expand Down

0 comments on commit 73dfd1a

Please sign in to comment.