diff --git a/pkg/formats/prom/init.go b/pkg/formats/prom/init.go new file mode 100644 index 00000000..c39e4c1a --- /dev/null +++ b/pkg/formats/prom/init.go @@ -0,0 +1,16 @@ +package prom + +import ( + "github.com/json-iterator/go" +) + +// fast JSON encoding +var json = jsoniter.ConfigFastest + +// jsonSorted is fast, but still sorts keys +var jsonSorted = jsoniter.Config{ + IndentionStep: 4, + EscapeHTML: false, + SortMapKeys: true, + ValidateJsonRawMessage: false, +}.Froze() diff --git a/pkg/formats/prom/prom.go b/pkg/formats/prom/prom.go index 34ee4d9a..6304f605 100644 --- a/pkg/formats/prom/prom.go +++ b/pkg/formats/prom/prom.go @@ -3,6 +3,7 @@ package prom import ( "flag" "fmt" + "strconv" "strings" "sync" @@ -23,7 +24,7 @@ var ( func init() { flag.BoolVar(&doCollectorStats, "info_collector", false, "Also send stats about this collector") - flag.IntVar(&seenNeeded, "prom_seen", 10, "Number of flows needed inbound before we start writting to the collector") + flag.IntVar(&seenNeeded, "prom_seen", 4, "Number of flows needed inbound before we start writting to the collector") } @@ -67,11 +68,11 @@ type tagVec map[string]map[string]int type PromFormat struct { logger.ContextL - vecs map[string]*prometheus.CounterVec + vecs map[string]*prometheus.GaugeVec invalids map[string]bool lastMetadata map[string]*kt.LastMetadata vecTags tagVec - seen int + seen map[string]int config *ktranslate.PrometheusFormatConfig mux sync.RWMutex @@ -83,11 +84,12 @@ func NewFormat(log logger.Underlying, compression kt.Compression, cfg *ktranslat } jf := &PromFormat{ ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "promFormat"}, log), - vecs: make(map[string]*prometheus.CounterVec), + vecs: make(map[string]*prometheus.GaugeVec), invalids: map[string]bool{}, lastMetadata: map[string]*kt.LastMetadata{}, vecTags: map[string]map[string]int{}, config: cfg, + seen: map[string]int{}, } if cfg.EnableCollectorStats { @@ -97,12 +99,6 @@ func NewFormat(log logger.Underlying, compression kt.Compression, cfg *ktranslat return jf, nil } -func (f *PromFormat) addLabels(res []PromData) { - for _, m := range res { - m.AddTagLabels(f.vecTags) - } -} - func (f *PromFormat) toLabels(name string) []string { res := make([]string, len(f.vecTags[name])) for k, v := range f.vecTags[name] { @@ -124,22 +120,22 @@ func (f *PromFormat) To(msgs []*kt.JCHF, serBuf []byte) (*kt.Output, error) { f.mux.Lock() defer f.mux.Unlock() - if f.seen < f.config.FlowsNeeded { - f.addLabels(res) - f.seen++ - if f.seen == f.config.FlowsNeeded { - f.Infof("Seen enough!") - } else { - f.Infof("Seen %d", f.seen) + for _, m := range res { + if f.seen[m.Name] < f.config.FlowsNeeded { + m.AddTagLabels(f.vecTags) + f.seen[m.Name]++ + if f.seen[m.Name] == f.config.FlowsNeeded { + f.Infof("Seen enough %s!", m.Name) + } else { + f.Infof("Seen %s -> %d", m.Name, f.seen[m.Name]) + } + continue } - return nil, nil - } - for _, m := range res { if _, ok := f.vecs[m.Name]; !ok { labels := f.toLabels(m.Name) - cv := prometheus.NewCounterVec( - prometheus.CounterOpts{ + cv := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Name: m.Name, }, labels, @@ -148,6 +144,7 @@ func (f *PromFormat) To(msgs []*kt.JCHF, serBuf []byte) (*kt.Output, error) { f.vecs[m.Name] = cv f.Infof("Adding %s %v", m.Name, labels) } + //f.Infof("%s, %v, %v %v", m.Name, m.Tags, f.vecTags[m.Name], f.toLabels(m.Name)) f.vecs[m.Name].WithLabelValues(m.GetTagValues(f.vecTags)...).Add(m.Value) } @@ -166,8 +163,8 @@ func (f *PromFormat) Rollup(rolls []rollup.Rollup) (*kt.Output, error) { continue } if _, ok := f.vecs[roll.EventType]; !ok { - f.vecs[roll.EventType] = prometheus.NewCounterVec( - prometheus.CounterOpts{ + f.vecs[roll.EventType] = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Name: strings.ReplaceAll(roll.Name, ".", ":"), }, roll.GetDims(), @@ -190,6 +187,8 @@ func (f *PromFormat) toPromMetric(in *kt.JCHF) []PromData { return f.fromSnmpInterfaceMetric(in) case kt.KENTIK_EVENT_SYNTH: return f.fromKSynth(in) + case kt.KENTIK_EVENT_SYNTH_GEST: + return f.fromKSyngest(in) case kt.KENTIK_EVENT_SNMP_METADATA: return f.fromSnmpMetadata(in) default: @@ -204,35 +203,130 @@ func (f *PromFormat) toPromMetric(in *kt.JCHF) []PromData { return nil } +var ( + synthWLAttr = map[string]bool{ + "agent_id": true, + "agent_name": true, + "dst_addr": true, + "dst_cdn_int": true, + "dst_geo": true, + "provider": true, + "src_addr": true, + "src_cdn_int": true, + "src_as_name": true, + "src_geo": true, + "test_id": true, + "test_name": true, + "test_type": true, + "test_url": true, + "src_host": true, + "dst_host": true, + "src_cloud_region": true, + "src_cloud_provider": true, + "src_site": true, + "dst_cloud_region": true, + "dst_cloud_provider": true, + "dst_site": true, + "statusMessage": true, + "statusEncoding": true, + "https_validity": true, + "https_expiry_timestamp": true, + "dest_ip": true, + } + + synthAttrKeys = []string{ + "statusMessage", + "statusEncoding", + "https_validity", + "https_expiry_timestamp", + } +) + +func (f *PromFormat) fromKSyngest(in *kt.JCHF) []PromData { + metrics := util.GetSyngestMetricNameSet() + attr := map[string]interface{}{} + f.mux.RLock() + util.SetAttr(attr, in, metrics, f.lastMetadata[in.DeviceName], false) + f.mux.RUnlock() + ms := make([]PromData, 0, len(metrics)) + + for k, v := range attr { // White list only a few attributes here. + if !synthWLAttr[k] { + delete(attr, k) + } + if k == "test_id" { // Force this to be a string. + if vi, ok := v.(int); ok { + attr[k] = strconv.Itoa(vi) + } + } + } + + for m, name := range metrics { + if in.CustomInt[m] > 0 { + ms = append(ms, PromData{ + Name: "kentik:syngest:" + name.Name, + Value: float64(in.CustomInt[m]), + Tags: attr, + }) + } + } + + return ms +} + func (f *PromFormat) fromKSynth(in *kt.JCHF) []PromData { + if in.CustomInt["result_type"] <= 1 { + return nil // Don't worry about timeouts and errors for now. + } + + rawStr := in.CustomStr["error_cause/trace_route"] // Pull this out early. metrics := util.GetSynMetricNameSet(in.CustomInt["result_type"]) attr := map[string]interface{}{} f.mux.RLock() util.SetAttr(attr, in, metrics, f.lastMetadata[in.DeviceName], false) f.mux.RUnlock() - ms := map[string]int64{} + ms := make([]PromData, 0, len(metrics)) + + // If there's str00 data, try to unserialize and pass in useful bits. + if rawStr != "" { + strData := []interface{}{} + if err := json.Unmarshal([]byte(rawStr), &strData); err == nil { + if len(strData) > 0 { + switch sd := strData[0].(type) { + case map[string]interface{}: + for _, key := range synthAttrKeys { + if val, ok := sd[key]; ok { + attr[key] = val + } + } + } + } + } + } - for m, name := range metrics { - switch m { - case "error", "timeout": - ms[name.Name] = 1 - default: - if in.CustomInt["result_type"] > 1 { - ms[name.Name] = int64(in.CustomInt[m]) + for k, v := range attr { // White list only a few attributes here. + if !synthWLAttr[k] { + delete(attr, k) + } + if k == "test_id" { // Force this to be a string. + if vi, ok := v.(int); ok { + attr[k] = strconv.Itoa(vi) } } } - res := []PromData{} - for k, v := range ms { - res = append(res, PromData{ - Name: "kentik:synth:" + k, - Value: float64(v), - Tags: attr, - }) + for m, name := range metrics { + switch name.Name { + case "avg_rtt", "jit_rtt", "time", "code", "port", "status", "ttlb", "size", "trx_time", "validation", "lost", "sent": + ms = append(ms, PromData{ + Name: "kentik:synth:" + name.Name, + Value: float64(in.CustomInt[m]), + Tags: attr, + }) + } } - return res + return ms } func (f *PromFormat) fromKflow(in *kt.JCHF) []PromData { diff --git a/pkg/sinks/prom/prom.go b/pkg/sinks/prom/prom.go index 3bf098f7..a7fc69af 100644 --- a/pkg/sinks/prom/prom.go +++ b/pkg/sinks/prom/prom.go @@ -27,7 +27,7 @@ var ( ) func init() { - flag.StringVar(&listen, "prom_listen", ":8082", "Bind to listen for prometheus requests on.") + flag.StringVar(&listen, "prom_listen", "127.0.0.1:8083", "Bind to listen for prometheus requests on.") flag.StringVar(&remoteUrl, "prom_remote_write", "", "Pass on remote write to this address.") } @@ -91,6 +91,10 @@ func (s *PromSink) Init(ctx context.Context, format formats.Format, compression return fmt.Errorf("You must set the -prom_remote_write flag to make this work.") } + if compression != kt.CompressionSnappy { + return fmt.Errorf("You used the %s unsupported compression format. Use snappy only.", compression) + } + s.Infof("Sending to remote_write endpoint %s", remoteUrl) default: return fmt.Errorf("Prometheus only supports %s and %s formats, not %s", formats.FORMAT_PROM, formats.FORMAT_PROM_REMOTE, format) @@ -99,10 +103,6 @@ func (s *PromSink) Init(ctx context.Context, format formats.Format, compression s.remoteUrl = remoteUrl s.compression = compression - if s.compression != kt.CompressionSnappy { - return fmt.Errorf("You used the %s unsupported compression format. Use snappy only.", s.compression) - } - return nil }