From 965825470bb72464a23907cd063b32691fe4d75e Mon Sep 17 00:00:00 2001 From: Callum Jones Date: Tue, 3 Aug 2021 13:11:35 -0700 Subject: [PATCH 1/3] [e2ealerting] Support pass through labels Allow for customizing the labels included in the final end_to_end_duration_seconds histogram, allowing multiple probs to be deployed and the latency split by labels forwarded to it from AlertManager. --- pkg/alerting/receiver.go | 94 ++++++++++++++++----- pkg/alerting/receiver_test.go | 149 ++++++++++++++++++++++++++++++++-- 2 files changed, 217 insertions(+), 26 deletions(-) diff --git a/pkg/alerting/receiver.go b/pkg/alerting/receiver.go index 8b965ecbd..608f86df1 100644 --- a/pkg/alerting/receiver.go +++ b/pkg/alerting/receiver.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gorilla/mux" @@ -20,16 +21,23 @@ import ( ) const ( - namespace = "e2ealerting" + namespace = "e2ealerting" + alertnameLabel = "alertname" ) +type timestampKey struct { + timestamp float64 + alertName string + labelValues string +} + // Receiver implements the Alertmanager webhook receiver. It evaluates the received alerts, finds if the alert holds an annonnation with a label of "time", and if it does computes now - time for a total duration. type Receiver struct { logger log.Logger cfg ReceiverConfig mtx sync.Mutex - timestamps map[float64]struct{} + timestamps map[timestampKey]struct{} quit chan struct{} wg sync.WaitGroup @@ -42,9 +50,11 @@ type Receiver struct { // ReceiverConfig implements the configuration for the alertmanager webhook receiver type ReceiverConfig struct { - RoundtripLabel string - PurgeLookback time.Duration - PurgeInterval time.Duration + LabelsToTrack flagext.StringSliceCSV + LabelsToForward flagext.StringSliceCSV + PurgeInterval time.Duration + PurgeLookback time.Duration + RoundtripLabel string } // RegisterFlags registers the flags for the alertmanager webhook receiver @@ -52,11 +62,13 @@ func (cfg *ReceiverConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.RoundtripLabel, "receiver.e2eduration-label", "", "Label name and value in the form 'name=value' to add for the Histogram that observes latency.") f.DurationVar(&cfg.PurgeInterval, "receiver.purge-interval", 15*time.Minute, "How often should we purge the in-memory measured timestamps tracker.") f.DurationVar(&cfg.PurgeLookback, "receiver.purge-lookback", 2*time.Hour, "Period at which measured timestamps will remain in-memory.") + f.Var(&cfg.LabelsToTrack, "receiver.labels-to-track", "Additional labels to track seen timestamps by, defaults to labels-to-forward.") + f.Var(&cfg.LabelsToForward, "receiver.labels-to-forward", "Additional labels to split alerts by.") } // NewReceiver returns an alertmanager webhook receiver func NewReceiver(cfg ReceiverConfig, log log.Logger, reg prometheus.Registerer) (*Receiver, error) { - lbl := make(map[string]string, 1) + constLabels := make(map[string]string, 1) if cfg.RoundtripLabel != "" { l := strings.Split(cfg.RoundtripLabel, "=") @@ -64,13 +76,25 @@ func NewReceiver(cfg ReceiverConfig, log log.Logger, reg prometheus.Registerer) return nil, fmt.Errorf("the label is not valid, it must have exactly one name and one value: %s has %d parts", l, len(l)) } - lbl[l[0]] = l[1] + constLabels[l[0]] = l[1] + } + + if len(cfg.LabelsToTrack) == 0 { + cfg.LabelsToTrack = cfg.LabelsToForward + } + + labelNames := make([]string, 1+len(cfg.LabelsToForward)) + labelNames[0] = alertnameLabel + if len(cfg.LabelsToForward) > 0 { + for i, nm := range cfg.LabelsToForward { + labelNames[i+1] = nm + } } r := &Receiver{ logger: log, cfg: cfg, - timestamps: map[float64]struct{}{}, + timestamps: map[timestampKey]struct{}{}, registry: reg, } @@ -94,8 +118,8 @@ func NewReceiver(cfg ReceiverConfig, log log.Logger, reg prometheus.Registerer) Name: "end_to_end_duration_seconds", Help: "Time spent (in seconds) from scraping a metric to receiving an alert.", Buckets: []float64{5, 15, 30, 60, 90, 120, 240}, - ConstLabels: lbl, - }, []string{"alertname"}) + ConstLabels: constLabels, + }, labelNames) r.wg.Add(1) go r.purgeTimestamps() @@ -123,13 +147,21 @@ func (r *Receiver) measureLatency(w http.ResponseWriter, req *http.Request) { // We only care about firing alerts as part of this analysis. for _, alert := range data.Alerts.Firing() { + labels := map[string]string{} var name string for k, v := range alert.Labels { + for _, lblName := range r.cfg.LabelsToForward { + if lblName == k { + labels[k] = v + } + } + if k != model.AlertNameLabel { continue } name = v + labels[alertnameLabel] = v } if name == "" { @@ -154,23 +186,45 @@ func (r *Receiver) measureLatency(w http.ResponseWriter, req *http.Request) { } if t == 0.0 { - level.Debug(r.logger).Log("msg", "alert does not have a `time` annonnation - we can't measure it", "labels", alert.Labels.Names(), "annonnations", alert.Annotations.Names()) + level.Debug(r.logger).Log("msg", "alert does not have a `time` annotation - we can't measure it", "labels", alert.Labels.Names(), "annonnations", alert.Annotations.Names()) continue } + // build a unique key of label values to track + // to track when a timestamp was already seen + var labelValues strings.Builder + for _, k := range r.cfg.LabelsToTrack { + if v, ok := labels[k]; ok { + labelValues.WriteString(v) + } + } + + // fill in any missing Prom labels + for _, k := range r.cfg.LabelsToForward { + if _, ok := labels[k]; !ok { + labels[k] = "" + } + } + + key := timestampKey{ + timestamp: t, + alertName: name, + labelValues: labelValues.String(), + } + latency := now.Unix() - int64(t) r.mtx.Lock() - if _, exists := r.timestamps[t]; exists { - // We have seen this timestamp before, skip it. - level.Debug(r.logger).Log("msg", "timestamp previously evaluated", "timestamp", t, "alert", name) + if _, exists := r.timestamps[key]; exists { + // We have seen this entry before, skip it. + level.Debug(r.logger).Log("msg", "entry previously evaluated", "timestamp", t, "alert", name, "labelValues", key.labelValues) r.mtx.Unlock() continue } - r.timestamps[t] = struct{}{} + r.timestamps[key] = struct{}{} r.mtx.Unlock() - r.roundtripDuration.WithLabelValues(name).Observe(float64(latency)) - level.Info(r.logger).Log("alert", name, "time", time.Unix(int64(t), 0), "duration_seconds", latency, "status", alert.Status) + r.roundtripDuration.With(labels).Observe(float64(latency)) + level.Info(r.logger).Log("alert", name, "labelValues", key.labelValues, "time", time.Unix(int64(t), 0), "duration_seconds", latency, "status", alert.Status) r.evalTotal.Inc() } @@ -193,10 +247,10 @@ func (r *Receiver) purgeTimestamps() { r.mtx.Lock() var deleted int - for t := range r.timestamps { + for k := range r.timestamps { // purge entry for the timestamp, when the deadline is after the timestamp t - if deadline.After(time.Unix(int64(t), 0)) { - delete(r.timestamps, t) + if deadline.After(time.Unix(int64(k.timestamp), 0)) { + delete(r.timestamps, k) deleted++ } } diff --git a/pkg/alerting/receiver_test.go b/pkg/alerting/receiver_test.go index 0798f2f7b..e823aa70e 100644 --- a/pkg/alerting/receiver_test.go +++ b/pkg/alerting/receiver_test.go @@ -22,7 +22,7 @@ func Test_measureLatency(t *testing.T) { name string alerts template.Data err error - tracked []float64 + tracked []timestampKey }{ { name: "with alerts to track", @@ -33,6 +33,18 @@ func Test_measureLatency(t *testing.T) { Annotations: template.KV{"time": "1.604069614e+09"}, Status: string(model.AlertFiring), }, + // duplicate alert, will be ignored + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring"}, + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, + // different alert at same time + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "ADifferentAlertAtTheSameTime"}, + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, template.Alert{ Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring"}, Annotations: template.KV{"time": "1.604069615e+09"}, @@ -40,7 +52,23 @@ func Test_measureLatency(t *testing.T) { }, }, }, - tracked: []float64{1604069614.00, 1604069615.00}, + tracked: []timestampKey{ + { + alertName: "e2ealertingAlwaysFiring", + labelValues: "", + timestamp: 1604069614.00, + }, + { + alertName: "ADifferentAlertAtTheSameTime", + labelValues: "", + timestamp: 1604069614.00, + }, + { + alertName: "e2ealertingAlwaysFiring", + labelValues: "", + timestamp: 1604069615.00, + }, + }, }, { name: "with alerts that don't have a time annotation or alertname label it ignores them", @@ -61,7 +89,7 @@ func Test_measureLatency(t *testing.T) { }, }, }, - tracked: []float64{1604069614.00}, + tracked: []timestampKey{{alertName: "e2ealertingAlwaysFiring", labelValues: "", timestamp: 1604069614.00}}, }, } @@ -87,9 +115,118 @@ func Test_measureLatency(t *testing.T) { require.Equal(t, http.StatusOK, w.Code) require.Equal(t, len(tt.tracked), len(r.timestamps)) - for _, timestamp := range tt.tracked { - _, exists := r.timestamps[timestamp] - require.True(t, exists, fmt.Sprintf("time %f is not tracked", timestamp)) + for _, key := range tt.tracked { + _, exists := r.timestamps[key] + require.True(t, exists, fmt.Sprintf("time %f is not tracked", key.timestamp)) + } + }) + } +} + +func Test_measureLatencyWithAdditionalLabels(t *testing.T) { + tc := []struct { + name string + alerts template.Data + err error + tracked []timestampKey + }{ + { + name: "with alerts to track", + alerts: template.Data{ + Alerts: template.Alerts{ + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring", "region": "us-east-1"}, + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, + // duplicate alert, will be ignored + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring", "region": "us-east-1"}, + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, + // different alert at same time + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring", "region": "us-west-1"}, + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring", "region": "us-east-1"}, + Annotations: template.KV{"time": "1.604069615e+09"}, + Status: string(model.AlertFiring), + }, + }, + }, + tracked: []timestampKey{ + { + alertName: "e2ealertingAlwaysFiring", + labelValues: "us-east-1", + timestamp: 1604069614.00, + }, + { + alertName: "e2ealertingAlwaysFiring", + labelValues: "us-west-1", + timestamp: 1604069614.00, + }, + { + alertName: "e2ealertingAlwaysFiring", + labelValues: "us-east-1", + timestamp: 1604069615.00, + }, + }, + }, + { + name: "with alerts that don't have a time annotation or alertname label it ignores them", + alerts: template.Data{ + Alerts: template.Alerts{ + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring"}, + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring"}, + Status: string(model.AlertFiring), + }, + template.Alert{ + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, + }, + }, + tracked: []timestampKey{{alertName: "e2ealertingAlwaysFiring", labelValues: "", timestamp: 1604069614.00}}, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + r, err := NewReceiver( + ReceiverConfig{ + PurgeInterval: 1 * time.Hour, + LabelsToForward: []string{"region"}, + }, + log.NewNopLogger(), + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + router := mux.NewRouter() + r.RegisterRoutes(router) + + b, err := json.Marshal(tt.alerts) + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/receiver", bytes.NewBuffer(b)) + w := httptest.NewRecorder() + + router.ServeHTTP(w, req) + + require.Equal(t, http.StatusOK, w.Code) + require.Equal(t, len(tt.tracked), len(r.timestamps)) + for _, key := range tt.tracked { + _, exists := r.timestamps[key] + require.True(t, exists, fmt.Sprintf("time %f is not tracked", key.timestamp)) } }) } From 5f35dc88e887853b36eb8503c52b2450ca371af5 Mon Sep 17 00:00:00 2001 From: Callum Jones Date: Mon, 9 Aug 2021 18:05:42 -0700 Subject: [PATCH 2/3] tweaks --- pkg/alerting/receiver.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/alerting/receiver.go b/pkg/alerting/receiver.go index 608f86df1..cebbf7e67 100644 --- a/pkg/alerting/receiver.go +++ b/pkg/alerting/receiver.go @@ -147,12 +147,12 @@ func (r *Receiver) measureLatency(w http.ResponseWriter, req *http.Request) { // We only care about firing alerts as part of this analysis. for _, alert := range data.Alerts.Firing() { - labels := map[string]string{} + labelsToForward := map[string]string{} var name string for k, v := range alert.Labels { for _, lblName := range r.cfg.LabelsToForward { if lblName == k { - labels[k] = v + labelsToForward[k] = v } } @@ -161,7 +161,7 @@ func (r *Receiver) measureLatency(w http.ResponseWriter, req *http.Request) { } name = v - labels[alertnameLabel] = v + labelsToForward[alertnameLabel] = v } if name == "" { @@ -192,24 +192,27 @@ func (r *Receiver) measureLatency(w http.ResponseWriter, req *http.Request) { // build a unique key of label values to track // to track when a timestamp was already seen - var labelValues strings.Builder + // preserve order specified to be consistent + var uniqueLabelKey strings.Builder for _, k := range r.cfg.LabelsToTrack { - if v, ok := labels[k]; ok { - labelValues.WriteString(v) + for alertKey, v := range alert.Labels { + if alertKey == k { + uniqueLabelKey.WriteString(v) + } } } // fill in any missing Prom labels for _, k := range r.cfg.LabelsToForward { - if _, ok := labels[k]; !ok { - labels[k] = "" + if _, ok := labelsToForward[k]; !ok { + labelsToForward[k] = "" } } key := timestampKey{ timestamp: t, alertName: name, - labelValues: labelValues.String(), + labelValues: uniqueLabelKey.String(), } latency := now.Unix() - int64(t) @@ -223,7 +226,7 @@ func (r *Receiver) measureLatency(w http.ResponseWriter, req *http.Request) { r.timestamps[key] = struct{}{} r.mtx.Unlock() - r.roundtripDuration.With(labels).Observe(float64(latency)) + r.roundtripDuration.With(labelsToForward).Observe(float64(latency)) level.Info(r.logger).Log("alert", name, "labelValues", key.labelValues, "time", time.Unix(int64(t), 0), "duration_seconds", latency, "status", alert.Status) r.evalTotal.Inc() } From 626c9aa2a2d392187ba76bdbab252c26339eb4fd Mon Sep 17 00:00:00 2001 From: Callum Jones Date: Mon, 9 Aug 2021 18:26:46 -0700 Subject: [PATCH 3/3] new tests --- pkg/alerting/receiver_test.go | 94 +++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/pkg/alerting/receiver_test.go b/pkg/alerting/receiver_test.go index e823aa70e..ffe1a05a7 100644 --- a/pkg/alerting/receiver_test.go +++ b/pkg/alerting/receiver_test.go @@ -231,3 +231,97 @@ func Test_measureLatencyWithAdditionalLabels(t *testing.T) { }) } } + +func Test_measureLatencyWithDifferentLabels(t *testing.T) { + tc := []struct { + name string + alerts template.Data + err error + tracked []timestampKey + }{ + { + name: "with alerts to track", + alerts: template.Data{ + Alerts: template.Alerts{ + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring", "region": "us-east-1", "host": "a"}, + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, + // different host, will be counted + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring", "region": "us-east-1", "host": "b"}, + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, + // different alert at same time + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring", "region": "us-west-1", "host": "c"}, + Annotations: template.KV{"time": "1.604069614e+09"}, + Status: string(model.AlertFiring), + }, + template.Alert{ + Labels: template.KV{model.AlertNameLabel: "e2ealertingAlwaysFiring", "region": "us-east-1", "host": "d"}, + Annotations: template.KV{"time": "1.604069615e+09"}, + Status: string(model.AlertFiring), + }, + }, + }, + tracked: []timestampKey{ + { + alertName: "e2ealertingAlwaysFiring", + labelValues: "us-east-1a", + timestamp: 1604069614.00, + }, + { + alertName: "e2ealertingAlwaysFiring", + labelValues: "us-east-1b", + timestamp: 1604069614.00, + }, + { + alertName: "e2ealertingAlwaysFiring", + labelValues: "us-west-1c", + timestamp: 1604069614.00, + }, + { + alertName: "e2ealertingAlwaysFiring", + labelValues: "us-east-1d", + timestamp: 1604069615.00, + }, + }, + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + r, err := NewReceiver( + ReceiverConfig{ + PurgeInterval: 1 * time.Hour, + LabelsToTrack: []string{"region", "host"}, + LabelsToForward: []string{"region"}, + }, + log.NewNopLogger(), + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + router := mux.NewRouter() + r.RegisterRoutes(router) + + b, err := json.Marshal(tt.alerts) + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/receiver", bytes.NewBuffer(b)) + w := httptest.NewRecorder() + + router.ServeHTTP(w, req) + + require.Equal(t, http.StatusOK, w.Code) + require.Equal(t, len(tt.tracked), len(r.timestamps)) + for _, key := range tt.tracked { + _, exists := r.timestamps[key] + require.True(t, exists, fmt.Sprintf("time %f %s is not tracked", key.timestamp, key.labelValues)) + } + }) + } +}