Skip to content

Commit

Permalink
Switch to line-protocol-sender library
Browse files Browse the repository at this point in the history
  • Loading branch information
itzg committed Dec 28, 2019
1 parent 140548f commit ef01509
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 107 deletions.
2 changes: 2 additions & 0 deletions examples/mc-monitor-telegraf/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ version: '3'
services:
telegraf:
image: telegraf:1.13
ports:
- 8094:8094
volumes:
- ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
monitor:
Expand Down
28 changes: 22 additions & 6 deletions gather_telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/google/subcommands"
"github.com/itzg/go-flagsfiller"
lpsender "github.com/itzg/line-protocol-sender"
"go.uber.org/zap"
"log"
"os"
Expand Down Expand Up @@ -41,7 +42,7 @@ func (c *gatherTelegrafCmd) SetFlags(flags *flag.FlagSet) {
}
}

func (c *gatherTelegrafCmd) Execute(ctx context.Context, f *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {
func (c *gatherTelegrafCmd) Execute(ctx context.Context, _ *flag.FlagSet, args ...interface{}) subcommands.ExitStatus {

if len(c.Servers) == 0 {
_, _ = fmt.Fprintln(os.Stderr, "requires at least one server")
Expand All @@ -62,7 +63,11 @@ func (c *gatherTelegrafCmd) Execute(ctx context.Context, f *flag.FlagSet, args .

ticker := time.NewTicker(c.Interval)

gatherers := c.createGatherers()
gatherers, err := c.createGatherers()
if err != nil {
c.logger.Error("failed to setup gatherers", zap.Error(err))
return subcommands.ExitFailure
}

for {
select {
Expand All @@ -77,22 +82,33 @@ func (c *gatherTelegrafCmd) Execute(ctx context.Context, f *flag.FlagSet, args .
}
}

func (c *gatherTelegrafCmd) createGatherers() []*TelegrafGatherer {
func (c *gatherTelegrafCmd) createGatherers() ([]*TelegrafGatherer, error) {
gatherers := make([]*TelegrafGatherer, 0, len(c.Servers))

lpClient, err := lpsender.NewClient(context.Background(), lpsender.Config{
Endpoint: c.TelegrafAddress,
BatchSize: len(c.Servers),
ErrorListener: func(err error) {
c.logger.Error("failed to send metrics", zap.Error(err))
},
})
if err != nil {
return nil, err
}

for _, addr := range c.Servers {
parts := strings.SplitN(addr, ":", 2)
if len(parts) == 2 {
port, err := strconv.Atoi(parts[1])
if err != nil {
log.Printf("WARN: unable to process %s: %s\n", addr, err)
} else {
gatherers = append(gatherers, NewTelegrafGatherer(parts[0], port, c.TelegrafAddress, c.logger))
gatherers = append(gatherers, NewTelegrafGatherer(parts[0], port, lpClient, c.logger))
}
} else {
gatherers = append(gatherers, NewTelegrafGatherer(parts[0], DefaultPort, c.TelegrafAddress, c.logger))
gatherers = append(gatherers, NewTelegrafGatherer(parts[0], DefaultPort, lpClient, c.logger))
}
}

return gatherers
return gatherers, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/google/subcommands v1.0.1
github.com/influxdata/line-protocol v0.0.0-20190509173118-5712a8124a9a
github.com/itzg/go-flagsfiller v1.4.1
github.com/itzg/line-protocol-sender v0.1.0
github.com/itzg/zapconfigs v0.1.0
go.uber.org/zap v1.13.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/itzg/go-flagsfiller v1.4.0 h1:iFi1Xd2yTOP9J+FMJ9QFxM9KdtMMeGMnhgDSWCE
github.com/itzg/go-flagsfiller v1.4.0/go.mod h1:mfQgTahSs4OHn8PYev2Wwi1LJXUiYiGuZVCpBLxzbYs=
github.com/itzg/go-flagsfiller v1.4.1 h1:h/t5g+WkvsOR449bz1ngU8UGosKNm4Sr3iMNNgOqHfo=
github.com/itzg/go-flagsfiller v1.4.1/go.mod h1:mfQgTahSs4OHn8PYev2Wwi1LJXUiYiGuZVCpBLxzbYs=
github.com/itzg/line-protocol-sender v0.1.0 h1:1egfi0GVXn4uTHlvW5uW32Rw9YPUmeMpXxu6nl3kMG4=
github.com/itzg/line-protocol-sender v0.1.0/go.mod h1:Cd948iZ7YibnGcLt5D/11RfKmteh8lQyXpGUbY97WBw=
github.com/itzg/zapconfigs v0.1.0 h1:Gokocm8VaTNnZjvIiVA5NEhzZ1v7lEyXY/AbeBmq6YQ=
github.com/itzg/zapconfigs v0.1.0/go.mod h1:y4dArgRUOFbGRkUNJ8XSSw98FGn03wtkvMPy+OSA5Rc=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand Down
117 changes: 16 additions & 101 deletions telegraf.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
package main

import (
"bytes"
mcpinger "github.com/Raqbit/mc-pinger"
protocol "github.com/influxdata/line-protocol"
lpsender "github.com/itzg/line-protocol-sender"
"go.uber.org/zap"
"log"
"net"
"strconv"
"time"
)

type TelegrafGatherer struct {
host string
port string
telegrafEndpoint string
pinger mcpinger.Pinger
logger *zap.Logger
host string
port string
pinger mcpinger.Pinger
logger *zap.Logger
lpClient lpsender.Client
}

func NewTelegrafGatherer(host string, port int, telegrafEndpoint string, logger *zap.Logger) *TelegrafGatherer {
func NewTelegrafGatherer(host string, port int, lpClient lpsender.Client, logger *zap.Logger) *TelegrafGatherer {
return &TelegrafGatherer{
host: host,
port: strconv.FormatInt(int64(port), 10),
pinger: mcpinger.New(host, uint16(port)),
telegrafEndpoint: telegrafEndpoint,
logger: logger,
host: host,
port: strconv.FormatInt(int64(port), 10),
pinger: mcpinger.New(host, uint16(port)),
lpClient: lpClient,
logger: logger,
}
}

Expand All @@ -49,7 +47,7 @@ func (g *TelegrafGatherer) Gather() {
}

func (g *TelegrafGatherer) sendInfoMetrics(info *mcpinger.ServerInfo, elapsed time.Duration) error {
m := NewSimpleMetric(MetricName)
m := lpsender.NewSimpleMetric(MetricName)

m.AddTag(TagHost, g.host)
m.AddTag(TagPort, g.port)
Expand All @@ -59,23 +57,13 @@ func (g *TelegrafGatherer) sendInfoMetrics(info *mcpinger.ServerInfo, elapsed ti
m.AddField(FieldOnline, uint64(info.Players.Online))
m.AddField(FieldMax, uint64(info.Players.Max))

var buf bytes.Buffer
encoder := protocol.NewEncoder(&buf)
_, err := encoder.Encode(m)
if err != nil {
return err
}

err = g.sendLine(buf.Bytes())
if err != nil {
return nil
}
g.lpClient.Send(m)

return nil
}

func (g *TelegrafGatherer) sendFailedMetrics(err error, elapsed time.Duration) error {
m := NewSimpleMetric(MetricName)
m := lpsender.NewSimpleMetric(MetricName)

m.AddTag(TagHost, g.host)
m.AddTag(TagPort, g.port)
Expand All @@ -84,80 +72,7 @@ func (g *TelegrafGatherer) sendFailedMetrics(err error, elapsed time.Duration) e
m.AddField(FieldError, err.Error())
m.AddField(FieldResponseTime, elapsed.Seconds())

var buf bytes.Buffer
encoder := protocol.NewEncoder(&buf)
_, err = encoder.Encode(m)
if err != nil {
return err
}

err = g.sendLine(buf.Bytes())
if err != nil {
return err
}

return nil
}

func (g *TelegrafGatherer) sendLine(lineBytes []byte) error {
g.logger.Debug("sending metrics",
zap.String("endpoint", g.telegrafEndpoint),
zap.ByteString("line", lineBytes))
conn, err := net.Dial("tcp", g.telegrafEndpoint)
if err != nil {
return err
}
defer func() {
closeErr := conn.Close()
if closeErr != nil {
log.Printf("failed to close line protocol connection: %s", closeErr)
}
}()

_, err = conn.Write(lineBytes)
if err != nil {
return err
}
g.lpClient.Send(m)

return nil
}

type SimpleMetric struct {
name string
tags []*protocol.Tag
fields []*protocol.Field
}

func NewSimpleMetric(name string) *SimpleMetric {
return &SimpleMetric{name: name}
}

func (m *SimpleMetric) Time() time.Time {
return time.Now()
}

func (m *SimpleMetric) Name() string {
return m.name
}

func (m *SimpleMetric) TagList() []*protocol.Tag {
return m.tags
}

func (m *SimpleMetric) FieldList() []*protocol.Field {
return m.fields
}

func (m *SimpleMetric) AddTag(key, value string) {
m.tags = append(m.tags, &protocol.Tag{
Key: key,
Value: value,
})
}

func (m *SimpleMetric) AddField(key string, value interface{}) {
m.fields = append(m.fields, &protocol.Field{
Key: key,
Value: value,
})
}

0 comments on commit ef01509

Please sign in to comment.