Skip to content

Commit

Permalink
Add riak plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jcoene authored and sparrc committed Feb 19, 2016
1 parent e4e1749 commit 53c130b
Show file tree
Hide file tree
Showing 5 changed files with 550 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ Currently implemented sources:
* raindrops
* redis
* rethinkdb
* riak
* sql server (microsoft)
* twemproxy
* zfs
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/raindrops"
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
Expand Down
76 changes: 76 additions & 0 deletions plugins/inputs/riak/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Riak Plugin

The Riak plugin gathers metrics from one or more riak instances.

### Configuration:

```toml
# Description
[[inputs.riak]]
# Specify a list of one or more riak http servers
servers = ["http://localhost:8098"]
```

### Measurements & Fields:

Riak provides one measurement named "riak", with the following fields:

- cpu_avg1
- cpu_avg15
- cpu_avg5
- memory_code
- memory_ets
- memory_processes
- memory_system
- memory_total
- node_get_fsm_objsize_100
- node_get_fsm_objsize_95
- node_get_fsm_objsize_99
- node_get_fsm_objsize_mean
- node_get_fsm_objsize_median
- node_get_fsm_siblings_100
- node_get_fsm_siblings_95
- node_get_fsm_siblings_99
- node_get_fsm_siblings_mean
- node_get_fsm_siblings_median
- node_get_fsm_time_100
- node_get_fsm_time_95
- node_get_fsm_time_99
- node_get_fsm_time_mean
- node_get_fsm_time_median
- node_gets
- node_gets_total
- node_put_fsm_time_100
- node_put_fsm_time_95
- node_put_fsm_time_99
- node_put_fsm_time_mean
- node_put_fsm_time_median
- node_puts
- node_puts_total
- pbc_active
- pbc_connects
- pbc_connects_total
- vnode_gets
- vnode_gets_total
- vnode_index_reads
- vnode_index_reads_total
- vnode_index_writes
- vnode_index_writes_total
- vnode_puts
- vnode_puts_total

Measurements of time (such as node_get_fsm_time_mean) are measured in nanoseconds.

### Tags:

All measurements have the following tags:

- server (the host:port of the given server address, ex. `127.0.0.1:8087`)
- nodename (the internal node name received, ex. `[email protected]` )

### Example Output:

```
$ ./telegraf -config telegraf.conf -input-filter riak -test
> riak,[email protected],server=localhost:8098 cpu_avg1=31i,cpu_avg15=69i,cpu_avg5=51i,memory_code=11563738i,memory_ets=5925872i,memory_processes=30236069i,memory_system=93074971i,memory_total=123311040i,node_get_fsm_objsize_100=0i,node_get_fsm_objsize_95=0i,node_get_fsm_objsize_99=0i,node_get_fsm_objsize_mean=0i,node_get_fsm_objsize_median=0i,node_get_fsm_siblings_100=0i,node_get_fsm_siblings_95=0i,node_get_fsm_siblings_99=0i,node_get_fsm_siblings_mean=0i,node_get_fsm_siblings_median=0i,node_get_fsm_time_100=0i,node_get_fsm_time_95=0i,node_get_fsm_time_99=0i,node_get_fsm_time_mean=0i,node_get_fsm_time_median=0i,node_gets=0i,node_gets_total=19i,node_put_fsm_time_100=0i,node_put_fsm_time_95=0i,node_put_fsm_time_99=0i,node_put_fsm_time_mean=0i,node_put_fsm_time_median=0i,node_puts=0i,node_puts_total=0i,pbc_active=0i,pbc_connects=0i,pbc_connects_total=20i,vnode_gets=0i,vnode_gets_total=57i,vnode_index_reads=0i,vnode_index_reads_total=0i,vnode_index_writes=0i,vnode_index_writes_total=0i,vnode_puts=0i,vnode_puts_total=0i 1455913392622482332
```gt
196 changes: 196 additions & 0 deletions plugins/inputs/riak/riak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package riak

import (
"encoding/json"
"fmt"
"net/http"
"net/url"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)

// Type Riak gathers statistics from one or more Riak instances
type Riak struct {
// Servers is a slice of servers as http addresses (ex. http://127.0.0.1:8098)
Servers []string

client *http.Client
}

// NewRiak return a new instance of Riak with a default http client
func NewRiak() *Riak {
return &Riak{client: http.DefaultClient}
}

// Type riakStats represents the data that is received from Riak
type riakStats struct {
CpuAvg1 int64 `json:"cpu_avg1"`
CpuAvg15 int64 `json:"cpu_avg15"`
CpuAvg5 int64 `json:"cpu_avg5"`
MemoryCode int64 `json:"memory_code"`
MemoryEts int64 `json:"memory_ets"`
MemoryProcesses int64 `json:"memory_processes"`
MemorySystem int64 `json:"memory_system"`
MemoryTotal int64 `json:"memory_total"`
NodeGetFsmObjsize100 int64 `json:"node_get_fsm_objsize_100"`
NodeGetFsmObjsize95 int64 `json:"node_get_fsm_objsize_95"`
NodeGetFsmObjsize99 int64 `json:"node_get_fsm_objsize_99"`
NodeGetFsmObjsizeMean int64 `json:"node_get_fsm_objsize_mean"`
NodeGetFsmObjsizeMedian int64 `json:"node_get_fsm_objsize_median"`
NodeGetFsmSiblings100 int64 `json:"node_get_fsm_siblings_100"`
NodeGetFsmSiblings95 int64 `json:"node_get_fsm_siblings_95"`
NodeGetFsmSiblings99 int64 `json:"node_get_fsm_siblings_99"`
NodeGetFsmSiblingsMean int64 `json:"node_get_fsm_siblings_mean"`
NodeGetFsmSiblingsMedian int64 `json:"node_get_fsm_siblings_median"`
NodeGetFsmTime100 int64 `json:"node_get_fsm_time_100"`
NodeGetFsmTime95 int64 `json:"node_get_fsm_time_95"`
NodeGetFsmTime99 int64 `json:"node_get_fsm_time_99"`
NodeGetFsmTimeMean int64 `json:"node_get_fsm_time_mean"`
NodeGetFsmTimeMedian int64 `json:"node_get_fsm_time_median"`
NodeGets int64 `json:"node_gets"`
NodeGetsTotal int64 `json:"node_gets_total"`
Nodename string `json:"nodename"`
NodePutFsmTime100 int64 `json:"node_put_fsm_time_100"`
NodePutFsmTime95 int64 `json:"node_put_fsm_time_95"`
NodePutFsmTime99 int64 `json:"node_put_fsm_time_99"`
NodePutFsmTimeMean int64 `json:"node_put_fsm_time_mean"`
NodePutFsmTimeMedian int64 `json:"node_put_fsm_time_median"`
NodePuts int64 `json:"node_puts"`
NodePutsTotal int64 `json:"node_puts_total"`
PbcActive int64 `json:"pbc_active"`
PbcConnects int64 `json:"pbc_connects"`
PbcConnectsTotal int64 `json:"pbc_connects_total"`
VnodeGets int64 `json:"vnode_gets"`
VnodeGetsTotal int64 `json:"vnode_gets_total"`
VnodeIndexReads int64 `json:"vnode_index_reads"`
VnodeIndexReadsTotal int64 `json:"vnode_index_reads_total"`
VnodeIndexWrites int64 `json:"vnode_index_writes"`
VnodeIndexWritesTotal int64 `json:"vnode_index_writes_total"`
VnodePuts int64 `json:"vnode_puts"`
VnodePutsTotal int64 `json:"vnode_puts_total"`
}

// A sample configuration to only gather stats from localhost, default port.
const sampleConfig = `
# Specify a list of one or more riak http servers
servers = ["http://localhost:8098"]
`

// Returns a sample configuration for the plugin
func (r *Riak) SampleConfig() string {
return sampleConfig
}

// Returns a description of the plugin
func (r *Riak) Description() string {
return "Read metrics one or many Riak servers"
}

// Reads stats from all configured servers.
func (r *Riak) Gather(acc telegraf.Accumulator) error {
// Default to a single server at localhost (default port) if none specified
if len(r.Servers) == 0 {
r.Servers = []string{"http://127.0.0.1:8098"}
}

// Range over all servers, gathering stats. Returns early in case of any error.
for _, s := range r.Servers {
if err := r.gatherServer(s, acc); err != nil {
return err
}
}

return nil
}

// Gathers stats from a single server, adding them to the accumulator
func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error {
// Parse the given URL to extract the server tag
u, err := url.Parse(s)
if err != nil {
return fmt.Errorf("riak unable to parse given server url %s: %s", s, err)
}

// Perform the GET request to the riak /stats endpoint
resp, err := r.client.Get(s + "/stats")
if err != nil {
return err
}
defer resp.Body.Close()

// Successful responses will always return status code 200
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("riak responded with unexepcted status code %d", resp.StatusCode)
}

// Decode the response JSON into a new stats struct
stats := &riakStats{}
if err := json.NewDecoder(resp.Body).Decode(stats); err != nil {
return fmt.Errorf("unable to decode riak response: %s", err)
}

// Build a map of tags
tags := map[string]string{
"nodename": stats.Nodename,
"server": u.Host,
}

// Build a map of field values
fields := map[string]interface{}{
"cpu_avg1": stats.CpuAvg1,
"cpu_avg15": stats.CpuAvg15,
"cpu_avg5": stats.CpuAvg5,
"memory_code": stats.MemoryCode,
"memory_ets": stats.MemoryEts,
"memory_processes": stats.MemoryProcesses,
"memory_system": stats.MemorySystem,
"memory_total": stats.MemoryTotal,
"node_get_fsm_objsize_100": stats.NodeGetFsmObjsize100,
"node_get_fsm_objsize_95": stats.NodeGetFsmObjsize95,
"node_get_fsm_objsize_99": stats.NodeGetFsmObjsize99,
"node_get_fsm_objsize_mean": stats.NodeGetFsmObjsizeMean,
"node_get_fsm_objsize_median": stats.NodeGetFsmObjsizeMedian,
"node_get_fsm_siblings_100": stats.NodeGetFsmSiblings100,
"node_get_fsm_siblings_95": stats.NodeGetFsmSiblings95,
"node_get_fsm_siblings_99": stats.NodeGetFsmSiblings99,
"node_get_fsm_siblings_mean": stats.NodeGetFsmSiblingsMean,
"node_get_fsm_siblings_median": stats.NodeGetFsmSiblingsMedian,
"node_get_fsm_time_100": stats.NodeGetFsmTime100,
"node_get_fsm_time_95": stats.NodeGetFsmTime95,
"node_get_fsm_time_99": stats.NodeGetFsmTime99,
"node_get_fsm_time_mean": stats.NodeGetFsmTimeMean,
"node_get_fsm_time_median": stats.NodeGetFsmTimeMedian,
"node_gets": stats.NodeGets,
"node_gets_total": stats.NodeGetsTotal,
"node_put_fsm_time_100": stats.NodePutFsmTime100,
"node_put_fsm_time_95": stats.NodePutFsmTime95,
"node_put_fsm_time_99": stats.NodePutFsmTime99,
"node_put_fsm_time_mean": stats.NodePutFsmTimeMean,
"node_put_fsm_time_median": stats.NodePutFsmTimeMedian,
"node_puts": stats.NodePuts,
"node_puts_total": stats.NodePutsTotal,
"pbc_active": stats.PbcActive,
"pbc_connects": stats.PbcConnects,
"pbc_connects_total": stats.PbcConnectsTotal,
"vnode_gets": stats.VnodeGets,
"vnode_gets_total": stats.VnodeGetsTotal,
"vnode_index_reads": stats.VnodeIndexReads,
"vnode_index_reads_total": stats.VnodeIndexReadsTotal,
"vnode_index_writes": stats.VnodeIndexWrites,
"vnode_index_writes_total": stats.VnodeIndexWritesTotal,
"vnode_puts": stats.VnodePuts,
"vnode_puts_total": stats.VnodePutsTotal,
}

// Accumulate the tags and values
acc.AddFields("riak", fields, tags)

return nil
}

func init() {
inputs.Add("riak", func() telegraf.Input {
return NewRiak()
})
}
Loading

0 comments on commit 53c130b

Please sign in to comment.