From 3ab1f96b3b83fe380b86d4ddc3447914bb2faeee Mon Sep 17 00:00:00 2001 From: Jason Coene Date: Fri, 23 Jan 2015 14:50:50 -0600 Subject: [PATCH] The big one --- .gitignore | 2 + Dockerfile | 7 + Makefile | 39 ++++-- librato.go | 158 ++++++++++++++++++++++ librato_test.go | 237 +++++++++++++++++++++++++++++++++ main.go | 342 ++++++++---------------------------------------- metric.go | 41 ++++++ metric_test.go | 60 +++++++++ network.go | 105 +++++++++++++++ parse.go | 53 ++++++++ parse_test.go | 64 +++++++++ proxy.go | 63 +++++++++ proxy_test.go | 51 ++++++++ 13 files changed, 919 insertions(+), 303 deletions(-) create mode 100644 Dockerfile create mode 100644 librato.go create mode 100644 librato_test.go create mode 100644 metric.go create mode 100644 metric_test.go create mode 100644 network.go create mode 100644 parse.go create mode 100644 parse_test.go create mode 100644 proxy.go create mode 100644 proxy_test.go diff --git a/.gitignore b/.gitignore index 1521c8b..1156d79 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ dist +statsd +statsd_linux_amd64 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d364f73 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM debian:wheezy + +ADD statsd_linux_amd64 /usr/bin/statsd + +EXPOSE 8125 + +ENTRYPOINT ["statsd"] diff --git a/Makefile b/Makefile index 51df698..03691a5 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,26 @@ -VERSION=0.1.4 +name := statsd +version := $(shell cat main.go | grep VERSION | sed -e 's/\"//g' | head -n1 |cut -d' ' -f4) -default: fmt run +default: fmt build fmt: go fmt *.go -debug: - go run main.go -debug -flush=5 -percentiles=90,95,99 - build: - go build + go build -o $(name) + +build-linux-amd64: + GOOS=linux GOARCH=amd64 go build -o $(name)_linux_amd64 + +docker-build: + GOOS=linux GOARCH=amd64 go build -o $(name)_linux_amd64 + docker build -t jcoene/statsd-librato:latest . + +docker-release: docker-build + docker push jcoene/statsd-librato:latest + +run: build + ./statsd -debug -flush=5 -percentiles=90,95,99 test: go test -cover @@ -17,12 +28,12 @@ test: release: mkdir -p dist - mkdir -p statsd-${VERSION}.darwin-amd64/bin - GOOS=darwin GOARCH=amd64 go build -o statsd-${VERSION}.darwin-amd64/bin/statsd - tar zcvf dist/statsd-${VERSION}.darwin-amd64.tar.gz statsd-${VERSION}.darwin-amd64 - rm -rf statsd-${VERSION}.darwin-amd64 + mkdir -p statsd-$(version).darwin-amd64/bin + GOOS=darwin GOARCH=amd64 go build -o statsd-$(version).darwin-amd64/bin/statsd + tar zcvf dist/statsd-$(version).darwin-amd64.tar.gz statsd-$(version).darwin-amd64 + rm -rf statsd-$(version).darwin-amd64 - mkdir -p statsd-${VERSION}.linux-amd64/bin - GOOS=linux GOARCH=amd64 go build -o statsd-${VERSION}.linux-amd64/bin/statsd - tar zcvf dist/statsd-${VERSION}.linux-amd64.tar.gz statsd-${VERSION}.linux-amd64 - rm -rf statsd-${VERSION}.linux-amd64 + mkdir -p statsd-$(version).linux-amd64/bin + GOOS=linux GOARCH=amd64 go build -o statsd-$(version).linux-amd64/bin/statsd + tar zcvf dist/statsd-$(version).linux-amd64.tar.gz statsd-$(version).linux-amd64 + rm -rf statsd-$(version).linux-amd64 diff --git a/librato.go b/librato.go new file mode 100644 index 0000000..a664fcb --- /dev/null +++ b/librato.go @@ -0,0 +1,158 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "math" + "net/http" + "sort" +) + +type Measurement struct { + Counters []*Counter `json:"counters"` + Gauges []interface{} `json:"gauges"` + Source string `json:"source,omitempty"` +} + +func (m *Measurement) Count() int { + return (len(m.Counters) + len(m.Gauges)) +} + +type Counter struct { + Name string `json:"name"` + Source string `json:"source,omitempty"` + Value float64 `json:"value"` +} + +type Gauge struct { + Name string `json:"name"` + Source string `json:"source,omitempty"` + Value float64 `json:"value"` +} + +type ComplexGauge struct { + Name string `json:"name"` + Source string `json:"source,omitempty"` + Count int `json:"count"` + Sum float64 `json:"sum"` + Min float64 `json:"min"` + Max float64 `json:"max"` + SumSquares float64 `json:"sum_squares"` +} + +func submitLibrato() (err error) { + m := buildMeasurement() + + if m.Count() == 0 { + return + } + + payload, err := json.MarshalIndent(m, "", " ") + if err != nil { + return + } + + if *debug { + log.Printf("sending payload:\n%s\n", string(payload)) + } + + req, err := http.NewRequest("POST", "https://metrics-api.librato.com/v1/metrics", bytes.NewBuffer(payload)) + if err != nil { + return + } + + req.Header.Add("Content-Type", "application/json") + req.Header.Set("User-Agent", "statsd/1.0") + req.SetBasicAuth(*libratoUser, *libratoToken) + req.Close = true + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + raw, _ := ioutil.ReadAll(resp.Body) + return fmt.Errorf("%s: %s", resp.Status, string(raw)) + } + + log.Printf("%d measurements sent to librato\n", m.Count()) + + resetTimers() + + return +} + +func buildMeasurement() (m *Measurement) { + m = &Measurement{} + if libratoSource != nil { + m.Source = *libratoSource + } + + m.Counters = make([]*Counter, len(counters)) + m.Gauges = make([]interface{}, len(gauges)) + + n := 0 + for k, v := range counters { + c := &Counter{} + c.Name, c.Source = parseSource(k) + c.Value = v + m.Counters[n] = c + n++ + } + + n = 0 + for k, v := range gauges { + g := &Gauge{} + g.Name, g.Source = parseSource(k) + g.Value = v + m.Gauges[n] = g + n++ + } + + for k, t := range timers { + for _, pct := range tiles { + if g := buildComplexGauge(k, t, pct); g != nil { + m.Gauges = append(m.Gauges, g) + } + } + } + + return +} + +func buildComplexGauge(k string, t []float64, pct float64) *ComplexGauge { + threshold := ((100.0 - pct) / 100.0) * float64(len(t)) + threshold = math.Floor(threshold + 0.5) + + count := len(t) - int(threshold) + if count <= 0 { + return nil + } + + g := &ComplexGauge{} + g.Name, g.Source = parseSource(k) + if pct != 100.0 { + if float64(int(pct)) != pct { + rem := int(math.Ceil((pct - float64(int(pct))) * 10)) + g.Name += fmt.Sprintf(".%d_%d", int(pct), rem) + } else { + g.Name += fmt.Sprintf(".%d", int(pct)) + } + } + g.Count = count + + sort.Float64s(t) + g.Min = t[0] + g.Max = t[count-1] + for i := 0; i < count; i++ { + g.Sum += t[i] + g.SumSquares += (t[i] * t[i]) + } + + return g +} diff --git a/librato_test.go b/librato_test.go new file mode 100644 index 0000000..c137a4b --- /dev/null +++ b/librato_test.go @@ -0,0 +1,237 @@ +package main + +import ( + "reflect" + "testing" +) + +func TestBuildMeasurements(t *testing.T) { + counters = make(map[string]float64) + gauges = make(map[string]float64) + timers = make(map[string][]float64) + + readPacket(packet{name: "a", bucket: "c", value: 15}) + readPacket(packet{name: "a", bucket: "c", value: 25}) + + readPacket(packet{name: "b", bucket: "g", value: 15.1}) + readPacket(packet{name: "b", bucket: "g", value: 25.1}) + + readPacket(packet{name: "c", bucket: "ms", value: 15}) + readPacket(packet{name: "c", bucket: "ms", value: 25}) + + libratoSource = nil + m := buildMeasurement() + + if m.Source != "" { + t.Errorf("got '%s', exepcted no source", m.Source) + } + + if m.Count() != 3 { + t.Errorf("got %d count, expected 3", m.Count()) + } + + if !reflect.DeepEqual(m.Counters[0], &Counter{Name: "a", Value: 40}) { + t.Errorf("unexpected value for counter 0: %+v", m.Counters[0]) + } + + if !reflect.DeepEqual(m.Gauges[0], &Gauge{Name: "b", Value: 25.1}) { + t.Errorf("unexpected value for gauge 0: %+v", m.Gauges[0]) + } + + if !reflect.DeepEqual(m.Gauges[1], &ComplexGauge{Name: "c", Count: 2, Sum: 40, Min: 15, Max: 25, SumSquares: (15 * 15) + (25 * 25)}) { + t.Errorf("unexpected value for gauge 0: %+v", m.Gauges[2]) + } + + s := "app01" + libratoSource = &s + + m = buildMeasurement() + + if m.Source != "app01" { + t.Errorf("got '%s', exepcted 'app01'", m.Source) + } + +} + +func TestComplexGaugeNoData(t *testing.T) { + got := buildComplexGauge("name", []float64{}, 100.0) + if got != nil { + t.Errorf("got '%+v', expected nil", got) + } +} + +func TestComplexGaugeOnePoint(t *testing.T) { + got := buildComplexGauge("name", []float64{30}, 100.0) + expect := &ComplexGauge{ + Name: "name", + Count: 1, + Sum: 30, + Min: 30, + Max: 30, + SumSquares: 30 * 30, + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeTwoPoints(t *testing.T) { + got := buildComplexGauge("name", []float64{30, 60}, 100.0) + expect := &ComplexGauge{ + Name: "name", + Count: 2, + Sum: (30 + 60), + Min: 30, + Max: 60, + SumSquares: (30 * 30) + (60 * 60), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeThreePoints(t *testing.T) { + got := buildComplexGauge("name", []float64{30, 60, 90}, 100.0) + expect := &ComplexGauge{ + Name: "name", + Count: 3, + Sum: (30 + 60 + 90), + Min: 30, + Max: 90, + SumSquares: (30 * 30) + (60 * 60) + (90 * 90), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeThreePoints50th(t *testing.T) { + got := buildComplexGauge("name", []float64{30, 60, 90}, 50.0) + expect := &ComplexGauge{ + Name: "name.50", + Count: 1, + Sum: 30, + Min: 30, + Max: 30, + SumSquares: (30 * 30), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeThreePoints67th(t *testing.T) { + got := buildComplexGauge("name", []float64{30, 60, 90}, 67.0) + expect := &ComplexGauge{ + Name: "name.67", + Count: 2, + Sum: (30 + 60), + Min: 30, + Max: 60, + SumSquares: (30 * 30) + (60 * 60), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeFourPoints50th(t *testing.T) { + got := buildComplexGauge("name", []float64{10, 20, 30, 40}, 50.0) + expect := &ComplexGauge{ + Name: "name.50", + Count: 2, + Sum: (10 + 20), + Min: 10, + Max: 20, + SumSquares: (10 * 10) + (20 * 20), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeFourPoints75th(t *testing.T) { + got := buildComplexGauge("name", []float64{10, 20, 30, 40}, 75.0) + expect := &ComplexGauge{ + Name: "name.75", + Count: 3, + Sum: (10 + 20 + 30), + Min: 10, + Max: 30, + SumSquares: (10 * 10) + (20 * 20) + (30 * 30), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeFourPoints99th(t *testing.T) { + got := buildComplexGauge("name", []float64{10, 20, 30, 40}, 99.0) + expect := &ComplexGauge{ + Name: "name.99", + Count: 4, + Sum: (10 + 20 + 30 + 40), + Min: 10, + Max: 40, + SumSquares: (10 * 10) + (20 * 20) + (30 * 30) + (40 * 40), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeFourPoints75p3th(t *testing.T) { + got := buildComplexGauge("name", []float64{10, 20, 30, 40}, 75.3) + expect := &ComplexGauge{ + Name: "name.75_3", + Count: 3, + Sum: (10 + 20 + 30), + Min: 10, + Max: 30, + SumSquares: (10 * 10) + (20 * 20) + (30 * 30), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeFourPoints75p8th(t *testing.T) { + got := buildComplexGauge("name", []float64{10, 20, 30, 40}, 75.8) + expect := &ComplexGauge{ + Name: "name.75_8", + Count: 3, + Sum: (10 + 20 + 30), + Min: 10, + Max: 30, + SumSquares: (10 * 10) + (20 * 20) + (30 * 30), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} + +func TestComplexGaugeFourPoints99p5th(t *testing.T) { + got := buildComplexGauge("name", []float64{10, 20, 30, 40}, 99.5) + expect := &ComplexGauge{ + Name: "name.99_5", + Count: 4, + Sum: (10 + 20 + 30 + 40), + Min: 10, + Max: 40, + SumSquares: (10 * 10) + (20 * 20) + (30 * 30) + (40 * 40), + } + + if !reflect.DeepEqual(got, expect) { + t.Errorf("got '%+v', expected '%+v'", got, expect) + } +} diff --git a/main.go b/main.go index b780556..18a0c2f 100644 --- a/main.go +++ b/main.go @@ -1,340 +1,104 @@ package main import ( - "bytes" - "encoding/json" - "errors" "flag" "fmt" - "github.com/jcoene/gologger" - "io/ioutil" - "math" - "net" - "net/http" + "log" "os" - "regexp" - "sort" - "strconv" "strings" "time" ) -const VERSION = "0.1.4" - -type Packet struct { - Bucket string - Value string - Modifier string - Sampling float32 -} - -var log *logger.Logger -var sanitizeRegexp = regexp.MustCompile("[^a-zA-Z0-9\\-_\\.,:\\|@]") -var packetRegexp = regexp.MustCompile("([a-zA-Z0-9_\\.,]+):(\\-?[0-9\\.]+)\\|(c|ms|g)(\\|@([0-9\\.]+))?") +const VERSION = "1.0.0" var ( - serviceAddress = flag.String("address", "0.0.0.0:8125", "udp listen address") - libratoUser = flag.String("user", "", "librato api username (LIBRATO_USER)") - libratoToken = flag.String("token", "", "librato api token (LIBRATO_TOKEN)") - libratoSource = flag.String("source", "", "librato api source (LIBRATO_SOURCE)") - flushInterval = flag.Int64("flush", 60, "interval at which data is sent to librato (in seconds)") - percentiles = flag.String("percentiles", "", "comma separated list of percentiles to calculate for timers (eg. \"95,99.5\")") - debug = flag.Bool("debug", false, "enable logging of inputs and submissions") - version = flag.Bool("version", false, "print version and exit") + address = flag.String("address", "0.0.0.0:8125", "udp listen address") + libratoUser = flag.String("user", "", "librato api username (LIBRATO_USER)") + libratoToken = flag.String("token", "", "librato api token (LIBRATO_TOKEN)") + libratoSource = flag.String("source", "", "librato api source (LIBRATO_SOURCE)") + interval = flag.Int64("flush", 60, "interval at which data is sent to librato (in seconds)") + percentiles = flag.String("percentiles", "", "comma separated list of percentiles to calculate for timers (eg. \"95,99.5\")") + proxy = flag.String("proxy", "", "send metrics to a proxy rather than directly to librato") + debug = flag.Bool("debug", false, "enable logging of inputs and submissions") + version = flag.Bool("version", false, "print version and exit") ) -var ( - In = make(chan Packet, 10000) - counters = make(map[string]int64) - timers = make(map[string][]float64) - gauges = make(map[string]float64) -) - -type Measurement struct { - Counters []Counter `json:"counters"` - Gauges []interface{} `json:"gauges"` - Source *string `json:"source,omitempty"` -} - -func (m *Measurement) Count() int { - return (len(m.Counters) + len(m.Gauges)) -} - -type Counter struct { - Name string `json:"name"` - Source *string `json:"source,omitempty"` - Value int64 `json:"value"` -} - -type SimpleGauge struct { - Name string `json:"name"` - Source *string `json:"source,omitempty"` - Value float64 `json:"value"` -} - -type ComplexGauge struct { - Name string `json:"name"` - Source *string `json:"source,omitempty"` - Count int `json:"count"` - Sum float64 `json:"sum"` - Min float64 `json:"min"` - Max float64 `json:"max"` - SumSquares float64 `json:"sum_squares"` - Median float64 `json:"median"` -} - func monitor() { - t := time.NewTicker(time.Duration(*flushInterval) * time.Second) + var err error + + t := time.NewTicker(time.Duration(*interval) * time.Second) for { select { case <-t.C: - if err := submit(); err != nil { - log.Error("submit: %s", err) - } - case s := <-In: - if s.Modifier == "ms" { - _, ok := timers[s.Bucket] - if !ok { - var t []float64 - timers[s.Bucket] = t - } - floatValue, _ := strconv.ParseFloat(s.Value, 64) - timers[s.Bucket] = append(timers[s.Bucket], floatValue) - } else if s.Modifier == "g" { - _, ok := gauges[s.Bucket] - if !ok { - gauges[s.Bucket] = float64(0) + if *proxy != "" { + if err = submitProxy(); err != nil { + log.Printf("unable to submit to proxy at %s: %s\n", *proxy, err) } - floatValue, _ := strconv.ParseFloat(s.Value, 64) - gauges[s.Bucket] = floatValue } else { - _, ok := counters[s.Bucket] - if !ok { - counters[s.Bucket] = 0 + if err := submitLibrato(); err != nil { + log.Printf("unable to submit measurements: %s\n", err) } - floatValue, _ := strconv.ParseFloat(s.Value, 32) - counters[s.Bucket] += int64(float32(floatValue) * (1 / s.Sampling)) } - } - } -} -func parseBucket(bucket string) (string, *string) { - if strings.Contains(bucket, ",") { - ss := strings.SplitN(bucket, ",", 2) - return ss[1], &ss[0] + case p := <-packets: + readPacket(p) + } } - - return bucket, nil } -func submit() (err error) { - m := new(Measurement) - if *libratoSource != "" { - m.Source = libratoSource - } - m.Counters = make([]Counter, 0) - m.Gauges = make([]interface{}, 0) +func main() { + flag.Parse() - for k, v := range counters { - c := new(Counter) - c.Name, c.Source = parseBucket(k) - c.Value = v - m.Counters = append(m.Counters, *c) + if *version { + fmt.Printf("statsd-librato v%s\n", VERSION) + return } - for k, v := range gauges { - g := new(SimpleGauge) - g.Name, g.Source = parseBucket(k) - g.Value = v - m.Gauges = append(m.Gauges, *g) + if *proxy == "" { + getEnv(proxy, "PROXY") } - for k, t := range timers { - g := gaugePercentile(k, t, 100.0, "") - m.Gauges = append(m.Gauges, *g) - - if *percentiles != "" { - pcts := strings.Split(*percentiles, ",") - for _, pct := range pcts { - pctf, err := strconv.ParseFloat(pct, 64) - if err != nil { - log.Warn("error parsing '%s' as float: %s", pct, err) - continue - } - - if g = gaugePercentile(k, t, pctf, pct); g != nil { - m.Gauges = append(m.Gauges, *g) - } + if *proxy != "" { + log.Printf("sending metrics to proxy at %s\n", *proxy) + } else { + if *libratoUser == "" { + if !getEnv(libratoUser, "LIBRATO_USER") { + log.Fatal("specify a librato user with -user or the LIBRATO_USER environment variable") } } - } - if m.Count() == 0 { - log.Info("no new measurements in the last %d seconds", *flushInterval) - return - } - - payload, err := json.MarshalIndent(m, "", " ") - if err != nil { - return - } - - log.Debug("sending payload:\n%s", payload) - - req, err := http.NewRequest("POST", "https://metrics-api.librato.com/v1/metrics", bytes.NewBuffer(payload)) - - req.Close = true - - if err != nil { - return - } - - req.Header.Add("Content-Type", "application/json") - req.Header.Set("User-Agent", "statsd/1.0") - req.SetBasicAuth(*libratoUser, *libratoToken) - resp, err := http.DefaultClient.Do(req) - if err == nil && resp.StatusCode != 200 { - if err == nil { - raw, _ := ioutil.ReadAll(resp.Body) - err = errors.New(fmt.Sprintf("%s: %s", resp.Status, string(raw))) + if *libratoToken == "" { + if !getEnv(libratoToken, "LIBRATO_TOKEN") { + log.Fatal("specify a librato token with -token or the LIBRATO_TOKEN environment variable") + } } - log.Warn("error sending %d measurements: %s", m.Count(), err) - return - } - - log.Info("%d measurements sent", m.Count()) - - for k, _ := range timers { - delete(timers, k) - } - - return -} - -func gaugePercentile(k string, t []float64, pct float64, suffix string) *ComplexGauge { - thresholdIdx := ((100.0 - pct) / 100.0) * float64(len(t)) - thresholdIdx = math.Floor(thresholdIdx + 0.5) - numInPct := len(t) - int(thresholdIdx) - if numInPct <= 0 { - return nil - } - - g := new(ComplexGauge) - g.Name, g.Source = parseBucket(k) - if suffix != "" { - g.Name += "." + suffix - } - g.Count = numInPct - - if g.Count > 0 { - sort.Float64s(t) - g.Min = t[0] - g.Max = t[numInPct-1] - for i := 0; i < numInPct; i++ { - v := t[i] - g.Sum += v - g.SumSquares += (v * v) + if *libratoSource == "" { + getEnv(libratoSource, "LIBRATO_SOURCE") } - mid := g.Count / 2 - g.Median = t[mid] - - if g.Count > 2 && g.Count%2 == 0 { - g.Median += t[mid-1] + if *percentiles == "" { + getEnv(percentiles, "PERCENTILES") } - } - return g -} - -func handle(conn *net.UDPConn, remaddr net.Addr, buf *bytes.Buffer) { - var packet Packet - var value string - s := sanitizeRegexp.ReplaceAllString(buf.String(), "") - - for _, item := range packetRegexp.FindAllStringSubmatch(s, -1) { - value = item[2] - if item[3] == "ms" { - _, err := strconv.ParseFloat(item[2], 32) - if err != nil { - value = "0" + if *percentiles != "" { + for _, s := range strings.Split(*percentiles, ",") { + if f := parseFloat(s); f > 0.0 && f < 100.0 { + tiles = append(tiles, f) + log.Printf("including percentile %f for timers\n", f) + } } } - sampleRate, err := strconv.ParseFloat(item[5], 32) - if err != nil { - sampleRate = 1 - } - - packet.Bucket = item[1] - packet.Value = value - packet.Modifier = item[3] - packet.Sampling = float32(sampleRate) - - In <- packet - } -} - -func listen() { - address, _ := net.ResolveUDPAddr("udp", *serviceAddress) - listener, err := net.ListenUDP("udp", address) - defer listener.Close() - if err != nil { - log.Fatal("unable to listen: %s", err) - os.Exit(1) - } - - log.Info("listening for events...") - - for { - message := make([]byte, 512) - n, remaddr, error := listener.ReadFrom(message) - - if error != nil { - continue - } - - log.Debug("received metric: %s", message) - - buf := bytes.NewBuffer(message[0:n]) - go handle(listener, remaddr, buf) + log.Printf("sending metrics to librato\n") } -} -func main() { - flag.Parse() + log.Printf("flushing metrics every %d seconds\n", *interval) - if *version { - fmt.Printf("statsd-librato v%s\n", VERSION) - return - } - - if *debug { - log = logger.NewLogger(logger.LOG_LEVEL_DEBUG, "statsd") - } else { - log = logger.NewLogger(logger.LOG_LEVEL_INFO, "statsd") - } - - if *libratoUser == "" { - if !getEnv(libratoUser, "LIBRATO_USER") { - log.Fatal("specify a librato user with -user or the LIBRATO_USER environment variable") - } - } - - if *libratoToken == "" { - if !getEnv(libratoToken, "LIBRATO_TOKEN") { - log.Fatal("specify a librato token with -token or the LIBRATO_TOKEN environment variable") - } - } - - if *libratoSource == "" { - getEnv(libratoSource, "LIBRATO_SOURCE") - } + go listenUdp() + go listenTcp() - go listen() monitor() } diff --git a/metric.go b/metric.go new file mode 100644 index 0000000..c84eacd --- /dev/null +++ b/metric.go @@ -0,0 +1,41 @@ +package main + +var ( + counters = make(map[string]float64) + gauges = make(map[string]float64) + timers = make(map[string][]float64) + tiles = make([]float64, 0) +) + +func init() { + tiles = append(tiles, 100.0) +} + +func readPacket(p packet) { + switch p.bucket { + case "c": + if _, f := counters[p.name]; !f { + counters[p.name] = 0.0 + } + counters[p.name] += p.value + + case "g": + gauges[p.name] = p.value + + case "ms": + if _, f := timers[p.name]; !f { + timers[p.name] = make([]float64, 0) + } + timers[p.name] = append(timers[p.name], p.value) + } +} + +func resetTimers() { + timers = make(map[string][]float64) +} + +func resetAll() { + counters = make(map[string]float64) + gauges = make(map[string]float64) + timers = make(map[string][]float64) +} diff --git a/metric_test.go b/metric_test.go new file mode 100644 index 0000000..3280c62 --- /dev/null +++ b/metric_test.go @@ -0,0 +1,60 @@ +package main + +import ( + "reflect" + "testing" +) + +func TestReadPackets(t *testing.T) { + counters = make(map[string]float64) + gauges = make(map[string]float64) + timers = make(map[string][]float64) + + readPacket(packet{name: "a", bucket: "c", value: 15}) + readPacket(packet{name: "a", bucket: "c", value: 25}) + readPacket(packet{name: "b", bucket: "c", value: 90}) + + if len(counters) != 2 { + t.Errorf("got %d counters, expected 2", len(counters)) + } + + if counters["a"] != 40 { + t.Errorf("got %d for counter a, expected 40", counters["a"]) + } + + if counters["b"] != 90 { + t.Errorf("got %d for counter b, expected 90", counters["b"]) + } + + readPacket(packet{name: "a", bucket: "g", value: 15.1}) + readPacket(packet{name: "a", bucket: "g", value: 25.1}) + readPacket(packet{name: "b", bucket: "g", value: 90.1}) + + if len(gauges) != 2 { + t.Errorf("got %d gauges, expected 2", len(gauges)) + } + + if gauges["a"] != 25.1 { + t.Errorf("got %f for gauge a, expected 25.1", gauges["a"]) + } + + if gauges["b"] != 90.1 { + t.Errorf("got %f for gauge b, expected 90.1", gauges["b"]) + } + + readPacket(packet{name: "c", bucket: "ms", value: 15.3}) + readPacket(packet{name: "c", bucket: "ms", value: 25.3}) + readPacket(packet{name: "d", bucket: "ms", value: 90.3}) + + if len(timers) != 2 { + t.Errorf("got %d timers, expected 2", len(timers)) + } + + if !reflect.DeepEqual(timers["c"], []float64{15.3, 25.3}) { + t.Errorf("got %+v for timer c, expected {15.3, 25.3}", timers["c"]) + } + + if !reflect.DeepEqual(timers["d"], []float64{90.3}) { + t.Errorf("got %+v for timer d, expected {90.3}", timers["d"]) + } +} diff --git a/network.go b/network.go new file mode 100644 index 0000000..8f0d036 --- /dev/null +++ b/network.go @@ -0,0 +1,105 @@ +package main + +import ( + "io" + "log" + "net" +) + +type packet struct { + name string + bucket string + value float64 +} + +var packets = make(chan packet, 10000) + +func listenTcp() { + listener, err := net.Listen("tcp", *address) + if err != nil { + log.Fatal("unable to listen on tcp %s: %s", *address, err) + } + + log.Printf("listening for events at tcp %s...\n", *address) + + for { + conn, err := listener.Accept() + if err != nil { + continue + } + + log.Printf("new connection from tcp %s", conn.RemoteAddr()) + + go handleTcpConn(conn) + } +} + +func handleTcpConn(conn net.Conn) { + defer conn.Close() + + msg := make([]byte, 0) + tmp := make([]byte, 8192) + + for { + n, err := conn.Read(tmp) + if err != nil { + if err == io.EOF { + break + } + + log.Printf("unable to read from tcp: %s\n", err) + return + } + msg = append(msg, tmp[0:n]...) + } + + if *debug { + log.Printf("received %d bytes from tcp %s:\n%s\n", len(msg), conn.RemoteAddr(), string(msg)) + } + + handle(string(msg)) +} + +func listenUdp() { + addr, err := net.ResolveUDPAddr("udp", *address) + if err != nil { + log.Fatal("unable to resolve service address: %s", err) + } + + listener, err := net.ListenUDP("udp", addr) + if err != nil { + log.Fatal("unable to listen on udp %s: %s", *address, err) + } + defer listener.Close() + + log.Printf("listening for events at udp %s...\n", *address) + + msg := make([]byte, 512) + + for { + n, _, err := listener.ReadFrom(msg) + if err != nil { + if err == io.EOF { + continue + } + + log.Printf("listener: unable to read: %s\n", err) + continue + } + + if *debug { + log.Printf("received metric: %s\n", string(msg[0:n])) + } + + handle(string(msg[0:n])) + } +} + +func handle(msg string) { + for _, p := range parsePacket(msg) { + if *debug { + log.Printf("received packet: %+v\n", p) + } + packets <- p + } +} diff --git a/parse.go b/parse.go new file mode 100644 index 0000000..df13364 --- /dev/null +++ b/parse.go @@ -0,0 +1,53 @@ +package main + +import ( + "math" + "regexp" + "strconv" + "strings" +) + +var re = regexp.MustCompile("([a-zA-Z0-9_\\.,]+):(\\-?[0-9\\.]+)\\|(c|g|ms)(\\|@([0-9\\.]+))?") + +func parsePacket(msg string) (packets []packet) { + packets = make([]packet, 0) + matches := re.FindAllStringSubmatch(msg, -1) + + for _, match := range matches { + p := packet{ + name: match[1], + bucket: match[3], + value: parseFloat(match[2]), + } + + if len(match) >= 5 { + sample := parseFloat(match[5]) + if sample > 0.0 && sample < 1.0 && p.bucket == "c" { + p.value = math.Floor(p.value * (1.0 / sample)) + } + } + + packets = append(packets, p) + } + + return +} + +// Extracts a key into a name and source, if present. +// "my_key" => "my_key", "" +// "my_source,my_key" => "my_key", "my_source" +func parseSource(s string) (name string, source string) { + ss := strings.SplitN(s, ",", 2) + if len(ss) == 2 { + return ss[1], ss[0] + } + + return s, "" +} + +// Converts a string to a float, ignoring any errors. +// In case of error, the float will be empty(0.0) +func parseFloat(s string) (n float64) { + n, _ = strconv.ParseFloat(s, 64) + return +} diff --git a/parse_test.go b/parse_test.go new file mode 100644 index 0000000..50ab561 --- /dev/null +++ b/parse_test.go @@ -0,0 +1,64 @@ +package main + +import ( + "testing" +) + +var parsePacketTests = []struct { + msg string + length int + name string + bucket string + value float64 +}{ + {"invalid", 0, "", "", 0}, + {"wrong.type:1|z", 0, "", "", 0}, + {"prefix.type:2|cg", 1, "prefix.type", "c", 2}, + {"my.name:1|c", 1, "my.name", "c", 1}, + {"first.name:6|c\nlast.name:7|c", 2, "first.name", "c", 6}, + {"some.gauge:234.6|g", 1, "some.gauge", "g", 234.6}, + {"sampled.counter:4|c|@0.5", 1, "sampled.counter", "c", 8}, + {"sampled.counter:4|c|@0.33", 1, "sampled.counter", "c", 12}, + {"first.timer:123.4567|ms\nsecond.timer:456.7890|ms", 2, "first.timer", "ms", 123.4567}, +} + +func TestParsePacket(t *testing.T) { + for _, s := range parsePacketTests { + ps := parsePacket(s.msg) + if len(ps) != s.length { + t.Errorf("%s: got %d packets, expected %d", s.msg, len(ps), s.length) + } + if len(ps) > 0 { + if ps[0].name != s.name { + t.Errorf("%s: got name '%s', expected '%s'", s.msg, ps[0].name, s.name) + } + if ps[0].bucket != s.bucket { + t.Errorf("%s: got bucket '%s', expected '%s'", s.msg, ps[0].bucket, s.bucket) + } + if ps[0].value != s.value { + t.Errorf("%s: got value '%f', expected '%f'", s.msg, ps[0].value, s.value) + } + } + } +} + +var parseSourceTests = []struct { + in string + name string + source string +}{ + {"some_metric", "some_metric", ""}, + {"some_source,some_metric", "some_metric", "some_source"}, +} + +func TestParseSource(t *testing.T) { + for _, s := range parseSourceTests { + name, source := parseSource(s.in) + if name != s.name { + t.Errorf("%s: got '%s', expected '%s'", s.in, name, s.name) + } + if source != s.source { + t.Errorf("%s: got '%s', expected '%s'", s.in, source, s.source) + } + } +} diff --git a/proxy.go b/proxy.go new file mode 100644 index 0000000..265165f --- /dev/null +++ b/proxy.go @@ -0,0 +1,63 @@ +package main + +import ( + "fmt" + "log" + "net" +) + +func submitProxy() (err error) { + msg, num := buildPayload() + + if num == 0 { + return + } + + conn, err := net.Dial("tcp", *proxy) + if err != nil { + return + } + + defer conn.Close() + + n, err := conn.Write(msg) + if err != nil { + return + } + + if n != len(msg) { + return fmt.Errorf("wrote %d of %d bytes", n, len(msg)) + } + + log.Printf("%d measurements sent to proxy\n", num) + + resetAll() + + return +} + +func buildPayload() ([]byte, int) { + result := "" + + for k, v := range counters { + result += buildMetric(k, "c", v) + } + + for k, v := range gauges { + result += buildMetric(k, "g", v) + } + + n := len(counters) + len(gauges) + for k, vs := range timers { + n += len(vs) + for _, v := range vs { + result += buildMetric(k, "ms", v) + } + } + + return []byte(result), n +} + +func buildMetric(name string, bucket string, value float64) string { + return fmt.Sprintf("%s:%f|%s\n", name, value, bucket) +} diff --git a/proxy_test.go b/proxy_test.go new file mode 100644 index 0000000..39e7fd8 --- /dev/null +++ b/proxy_test.go @@ -0,0 +1,51 @@ +package main + +import ( + "sort" + "strings" + "testing" +) + +func TestBuildPayload(t *testing.T) { + counters = make(map[string]float64) + gauges = make(map[string]float64) + timers = make(map[string][]float64) + + readPacket(packet{name: "a", bucket: "c", value: 15}) + readPacket(packet{name: "a", bucket: "c", value: 25}) + readPacket(packet{name: "b", bucket: "c", value: 90}) + + readPacket(packet{name: "a", bucket: "g", value: 15.1}) + readPacket(packet{name: "a", bucket: "g", value: 25.1}) + readPacket(packet{name: "b", bucket: "g", value: 90.1}) + + readPacket(packet{name: "c", bucket: "ms", value: 15.3}) + readPacket(packet{name: "c", bucket: "ms", value: 25.3}) + readPacket(packet{name: "d", bucket: "ms", value: 90.3}) + + expect := sortLines( + "a:40.000000|c\n" + + "b:90.000000|c\n" + + "a:25.100000|g\n" + + "b:90.100000|g\n" + + "c:15.300000|ms\n" + + "c:25.300000|ms\n" + + "d:90.300000|ms\n") + + buf, num := buildPayload() + got := sortLines(string(buf)) + + if expect != string(got) { + t.Errorf("got '%s', expected '%s'", string(got), expect) + } + + if num != 7 { + t.Errorf("got %d measurements, expected 7", num) + } +} + +func sortLines(s string) string { + ss := strings.Split(s, "\n") + sort.Strings(ss) + return strings.Join(ss, "\n") +}