Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaning up stale ingester metrics #5930

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
* [ENHANCEMENT] Ingester: Allowing to configure `-blocks-storage.tsdb.head-compaction-interval` flag up to 30 min and add a jitter on the first head compaction. #5919
* [ENHANCEMENT] Distributor: Added `max_inflight_push_requests` config to ingester client to protect distributor from OOMKilled. #5917
* [ENHANCEMENT] Distributor/Querier: Clean stale per-ingester metrics after ingester restarts. #5930
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920

Expand Down
43 changes: 43 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ const (

instanceIngestionRateTickInterval = time.Second

clearStaleIngesterMetricsInterval = time.Minute

// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
mergeSlicesParallelism = 8
Expand Down Expand Up @@ -398,6 +400,9 @@ func (d *Distributor) running(ctx context.Context) error {
ingestionRateTicker := time.NewTicker(instanceIngestionRateTickInterval)
defer ingestionRateTicker.Stop()

staleIngesterMetricTicker := time.NewTicker(clearStaleIngesterMetricsInterval)
defer staleIngesterMetricTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -406,6 +411,9 @@ func (d *Distributor) running(ctx context.Context) error {
case <-ingestionRateTicker.C:
d.ingestionRate.Tick()

case <-staleIngesterMetricTicker.C:
d.cleanStaleIngesterMetrics()

case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
Expand Down Expand Up @@ -701,6 +709,41 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return &cortexpb.WriteResponse{}, firstPartialErr
}

func (d *Distributor) cleanStaleIngesterMetrics() {
healthy, unhealthy, err := d.ingestersRing.GetAllInstanceDescs(ring.WriteNoExtend)
if err != nil {
level.Warn(d.log).Log("msg", "error cleaning metrics: GetAllInstanceDescs", "err", err)
return
}

ipsMap := map[string]struct{}{}

for _, ing := range append(healthy, unhealthy...) {
ipsMap[ing.Addr] = struct{}{}
}

ingesterMetrics := []*prometheus.CounterVec{d.ingesterAppends, d.ingesterAppendFailures, d.ingesterQueries, d.ingesterQueryFailures}

for _, m := range ingesterMetrics {
metrics, err := util.GetLabels(m, make(map[string]string))

if err != nil {
level.Warn(d.log).Log("msg", "error cleaning metrics: GetLabels", "err", err)
return
}

for _, lbls := range metrics {
if _, ok := ipsMap[lbls.Get("ingester")]; !ok {
err := util.DeleteMatchingLabels(m, map[string]string{"ingester": lbls.Get("ingester")})
if err != nil {
level.Warn(d.log).Log("msg", "error cleaning metrics: DeleteMatchingLabels", "err", err)
return
}
}
}
}
}

func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string) error {
span, _ := opentracing.StartSpanFromContext(ctx, "doBatch")
defer span.Finish()
Expand Down
62 changes: 61 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ func TestDistributor_Push(t *testing.T) {

func TestDistributor_MetricsCleanup(t *testing.T) {
t.Parallel()
dists, _, regs, _ := prepare(t, prepConfig{
dists, _, regs, r := prepare(t, prepConfig{
numDistributors: 1,
numIngesters: 2,
happyIngesters: 2,
})
d := dists[0]
reg := regs[0]
Expand All @@ -334,6 +336,10 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
"cortex_distributor_metadata_in_total",
"cortex_distributor_non_ha_samples_received_total",
"cortex_distributor_latest_seen_sample_timestamp_seconds",
"cortex_distributor_ingester_append_failures_total",
"cortex_distributor_ingester_appends_total",
"cortex_distributor_ingester_query_failures_total",
"cortex_distributor_ingester_queries_total",
}

d.receivedSamples.WithLabelValues("userA").Add(5)
Expand All @@ -349,6 +355,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)

h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend)
d.ingesterAppends.WithLabelValues(h[0].Addr, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(h[0].Addr, typeMetadata, "2xx").Inc()
d.ingesterAppends.WithLabelValues(h[1].Addr, typeMetadata).Inc()
d.ingesterAppendFailures.WithLabelValues(h[1].Addr, typeMetadata, "2xx").Inc()
d.ingesterQueries.WithLabelValues(h[0].Addr).Inc()
d.ingesterQueries.WithLabelValues(h[1].Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(h[0].Addr).Inc()
d.ingesterQueryFailures.WithLabelValues(h[1].Addr).Inc()

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
# TYPE cortex_distributor_deduped_samples_total counter
Expand Down Expand Up @@ -388,10 +404,41 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
# TYPE cortex_distributor_exemplars_in_total counter
cortex_distributor_exemplars_in_total{user="userA"} 5

# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="0",status="2xx",type="metadata"} 1
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="0",type="metadata"} 1
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
# TYPE cortex_distributor_ingester_queries_total counter
cortex_distributor_ingester_queries_total{ingester="0"} 1
cortex_distributor_ingester_queries_total{ingester="1"} 1
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
# TYPE cortex_distributor_ingester_query_failures_total counter
cortex_distributor_ingester_query_failures_total{ingester="0"} 1
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
`), metrics...))

d.cleanupInactiveUser("userA")

err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) {
r := in.(*ring.Desc)
delete(r.Ingesters, "0")
return in, true, nil
})

test.Poll(t, time.Second, true, func() interface{} {
ings, _, _ := r.GetAllInstanceDescs(ring.Write)
return len(ings) == 1
})

require.NoError(t, err)
d.cleanStaleIngesterMetrics()

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
# TYPE cortex_distributor_deduped_samples_total counter
Expand Down Expand Up @@ -422,6 +469,19 @@ func TestDistributor_MetricsCleanup(t *testing.T) {

# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
# TYPE cortex_distributor_exemplars_in_total counter

# HELP cortex_distributor_ingester_append_failures_total The total number of failed batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_append_failures_total counter
cortex_distributor_ingester_append_failures_total{ingester="1",status="2xx",type="metadata"} 1
# HELP cortex_distributor_ingester_appends_total The total number of batch appends sent to ingesters.
# TYPE cortex_distributor_ingester_appends_total counter
cortex_distributor_ingester_appends_total{ingester="1",type="metadata"} 1
# HELP cortex_distributor_ingester_queries_total The total number of queries sent to ingesters.
# TYPE cortex_distributor_ingester_queries_total counter
cortex_distributor_ingester_queries_total{ingester="1"} 1
# HELP cortex_distributor_ingester_query_failures_total The total number of failed queries sent to ingesters.
# TYPE cortex_distributor_ingester_query_failures_total counter
cortex_distributor_ingester_query_failures_total{ingester="1"} 1
`), metrics...))
}

Expand Down
Loading