Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[e2ealerting] Support pass through labels #207

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 77 additions & 20 deletions pkg/alerting/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
Expand All @@ -20,16 +21,23 @@ import (
)

const (
namespace = "e2ealerting"
namespace = "e2ealerting"
alertnameLabel = "alertname"
)

type timestampKey struct {
timestamp float64
alertName string
labelValues string
}

// Receiver implements the Alertmanager webhook receiver. It evaluates the received alerts, finds if the alert holds an annonnation with a label of "time", and if it does computes now - time for a total duration.
type Receiver struct {
logger log.Logger
cfg ReceiverConfig

mtx sync.Mutex
timestamps map[float64]struct{}
timestamps map[timestampKey]struct{}

quit chan struct{}
wg sync.WaitGroup
Expand All @@ -42,35 +50,51 @@ type Receiver struct {

// ReceiverConfig implements the configuration for the alertmanager webhook receiver
type ReceiverConfig struct {
RoundtripLabel string
PurgeLookback time.Duration
PurgeInterval time.Duration
LabelsToTrack flagext.StringSliceCSV
LabelsToForward flagext.StringSliceCSV
PurgeInterval time.Duration
PurgeLookback time.Duration
RoundtripLabel string
}

// RegisterFlags registers the flags for the alertmanager webhook receiver
func (cfg *ReceiverConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.RoundtripLabel, "receiver.e2eduration-label", "", "Label name and value in the form 'name=value' to add for the Histogram that observes latency.")
f.DurationVar(&cfg.PurgeInterval, "receiver.purge-interval", 15*time.Minute, "How often should we purge the in-memory measured timestamps tracker.")
f.DurationVar(&cfg.PurgeLookback, "receiver.purge-lookback", 2*time.Hour, "Period at which measured timestamps will remain in-memory.")
f.Var(&cfg.LabelsToTrack, "receiver.labels-to-track", "Additional labels to track seen timestamps by, defaults to labels-to-forward.")
f.Var(&cfg.LabelsToForward, "receiver.labels-to-forward", "Additional labels to split alerts by.")
}

// NewReceiver returns an alertmanager webhook receiver
func NewReceiver(cfg ReceiverConfig, log log.Logger, reg prometheus.Registerer) (*Receiver, error) {
lbl := make(map[string]string, 1)
constLabels := make(map[string]string, 1)
if cfg.RoundtripLabel != "" {
l := strings.Split(cfg.RoundtripLabel, "=")

if len(l) != 2 {
return nil, fmt.Errorf("the label is not valid, it must have exactly one name and one value: %s has %d parts", l, len(l))
}

lbl[l[0]] = l[1]
constLabels[l[0]] = l[1]
}

if len(cfg.LabelsToTrack) == 0 {
cfg.LabelsToTrack = cfg.LabelsToForward
}

labelNames := make([]string, 1+len(cfg.LabelsToForward))
labelNames[0] = alertnameLabel
if len(cfg.LabelsToForward) > 0 {
for i, nm := range cfg.LabelsToForward {
labelNames[i+1] = nm
}
}

r := &Receiver{
logger: log,
cfg: cfg,
timestamps: map[float64]struct{}{},
timestamps: map[timestampKey]struct{}{},
registry: reg,
}

Expand All @@ -94,8 +118,8 @@ func NewReceiver(cfg ReceiverConfig, log log.Logger, reg prometheus.Registerer)
Name: "end_to_end_duration_seconds",
Help: "Time spent (in seconds) from scraping a metric to receiving an alert.",
Buckets: []float64{5, 15, 30, 60, 90, 120, 240},
ConstLabels: lbl,
}, []string{"alertname"})
ConstLabels: constLabels,
}, labelNames)

r.wg.Add(1)
go r.purgeTimestamps()
Expand Down Expand Up @@ -123,13 +147,21 @@ func (r *Receiver) measureLatency(w http.ResponseWriter, req *http.Request) {

// We only care about firing alerts as part of this analysis.
for _, alert := range data.Alerts.Firing() {
labelsToForward := map[string]string{}
var name string
for k, v := range alert.Labels {
for _, lblName := range r.cfg.LabelsToForward {
if lblName == k {
labelsToForward[k] = v
}
}

if k != model.AlertNameLabel {
continue
}

name = v
labelsToForward[alertnameLabel] = v
}

if name == "" {
Expand All @@ -154,23 +186,48 @@ func (r *Receiver) measureLatency(w http.ResponseWriter, req *http.Request) {
}

if t == 0.0 {
level.Debug(r.logger).Log("msg", "alert does not have a `time` annonnation - we can't measure it", "labels", alert.Labels.Names(), "annonnations", alert.Annotations.Names())
level.Debug(r.logger).Log("msg", "alert does not have a `time` annotation - we can't measure it", "labels", alert.Labels.Names(), "annonnations", alert.Annotations.Names())
continue
}

// build a unique key of label values to track
// to track when a timestamp was already seen
// preserve order specified to be consistent
var uniqueLabelKey strings.Builder
for _, k := range r.cfg.LabelsToTrack {
for alertKey, v := range alert.Labels {
if alertKey == k {
uniqueLabelKey.WriteString(v)
}
}
}

// fill in any missing Prom labels
for _, k := range r.cfg.LabelsToForward {
if _, ok := labelsToForward[k]; !ok {
labelsToForward[k] = ""
}
}

key := timestampKey{
timestamp: t,
alertName: name,
labelValues: uniqueLabelKey.String(),
}

latency := now.Unix() - int64(t)
r.mtx.Lock()
if _, exists := r.timestamps[t]; exists {
// We have seen this timestamp before, skip it.
level.Debug(r.logger).Log("msg", "timestamp previously evaluated", "timestamp", t, "alert", name)
if _, exists := r.timestamps[key]; exists {
// We have seen this entry before, skip it.
level.Debug(r.logger).Log("msg", "entry previously evaluated", "timestamp", t, "alert", name, "labelValues", key.labelValues)
r.mtx.Unlock()
continue
}
r.timestamps[t] = struct{}{}
r.timestamps[key] = struct{}{}
r.mtx.Unlock()

r.roundtripDuration.WithLabelValues(name).Observe(float64(latency))
level.Info(r.logger).Log("alert", name, "time", time.Unix(int64(t), 0), "duration_seconds", latency, "status", alert.Status)
r.roundtripDuration.With(labelsToForward).Observe(float64(latency))
level.Info(r.logger).Log("alert", name, "labelValues", key.labelValues, "time", time.Unix(int64(t), 0), "duration_seconds", latency, "status", alert.Status)
r.evalTotal.Inc()
}

Expand All @@ -193,10 +250,10 @@ func (r *Receiver) purgeTimestamps() {

r.mtx.Lock()
var deleted int
for t := range r.timestamps {
for k := range r.timestamps {
// purge entry for the timestamp, when the deadline is after the timestamp t
if deadline.After(time.Unix(int64(t), 0)) {
delete(r.timestamps, t)
if deadline.After(time.Unix(int64(k.timestamp), 0)) {
delete(r.timestamps, k)
deleted++
}
}
Expand Down
Loading