Skip to content

Commit

Permalink
inputs.nfsclient: use uint64, also update error handling (influxdata#…
Browse files Browse the repository at this point in the history
…9067)

* Use uint64
Fix error handling

* update comment

* More detail to error
  • Loading branch information
sspaink authored Mar 31, 2021
1 parent 66c6396 commit 071fef7
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 90 deletions.
43 changes: 28 additions & 15 deletions plugins/inputs/nfsclient/nfsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package nfsclient

import (
"bufio"
"log"
"fmt"
"os"
"regexp"
"strconv"
Expand Down Expand Up @@ -61,45 +61,48 @@ func (n *NFSClient) Description() string {
return "Read per-mount NFS client metrics from /proc/self/mountstats"
}

func convertToInt64(line []string) []int64 {
func convertToUint64(line []string) ([]uint64, error) {
/* A "line" of input data (a pre-split array of strings) is
processed one field at a time. Each field is converted to
an int64 value, and appened to an array of return values.
On an error, check for ErrRange, and throw a fatal error
an uint64 value, and appened to an array of return values.
On an error, check for ErrRange, and returns an error
if found. This situation indicates a pretty major issue in
the /proc/self/mountstats file, and returning faulty data
is worse than no data. Other errors are ignored, and append
whatever we got in the first place (probably 0).
Yes, this is ugly. */

var nline []int64
var nline []uint64

if len(line) < 2 {
return nline
return nline, nil
}

// Skip the first field; it's handled specially as the "first" variable
for _, l := range line[1:] {
val, err := strconv.ParseInt(l, 10, 64)
val, err := strconv.ParseUint(l, 10, 64)
if err != nil {
if numError, ok := err.(*strconv.NumError); ok {
if numError.Err == strconv.ErrRange {
log.Fatalf("ErrRange: line:[%v] raw:[%v] -> parsed:[%v]\n", line, l, val)
return nil, fmt.Errorf("errrange: line:[%v] raw:[%v] -> parsed:[%v]", line, l, val)
}
}
}
nline = append(nline, val)
}
return nline
return nline, nil
}

func (n *NFSClient) parseStat(mountpoint string, export string, version string, line []string, acc telegraf.Accumulator) {
func (n *NFSClient) parseStat(mountpoint string, export string, version string, line []string, acc telegraf.Accumulator) error {
tags := map[string]string{"mountpoint": mountpoint, "serverexport": export}
nline := convertToInt64(line)
nline, err := convertToUint64(line)
if err != nil {
return err
}

if len(nline) == 0 {
n.Log.Warnf("Parsing Stat line with one field: %s\n", line)
return
return nil
}

first := strings.Replace(line[0], ":", "", 1)
Expand Down Expand Up @@ -240,9 +243,11 @@ func (n *NFSClient) parseStat(mountpoint string, export string, version string,
}
}
}

return nil
}

func (n *NFSClient) processText(scanner *bufio.Scanner, acc telegraf.Accumulator) {
func (n *NFSClient) processText(scanner *bufio.Scanner, acc telegraf.Accumulator) error {
var mount string
var version string
var export string
Expand Down Expand Up @@ -293,9 +298,14 @@ func (n *NFSClient) processText(scanner *bufio.Scanner, acc telegraf.Accumulator
}

if !skip {
n.parseStat(mount, export, version, line, acc)
err := n.parseStat(mount, export, version, line, acc)
if err != nil {
return fmt.Errorf("could not parseStat: %w", err)
}
}
}

return nil
}

func (n *NFSClient) getMountStatsPath() string {
Expand All @@ -316,7 +326,10 @@ func (n *NFSClient) Gather(acc telegraf.Accumulator) error {
defer file.Close()

scanner := bufio.NewScanner(file)
n.processText(scanner, acc)
err = n.processText(scanner, acc)
if err != nil {
return err
}

if err := scanner.Err(); err != nil {
n.Log.Errorf("%s", err)
Expand Down
179 changes: 104 additions & 75 deletions plugins/inputs/nfsclient/nfsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package nfsclient

import (
"bufio"
"github.com/influxdata/telegraf/testutil"
"os"
"strings"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func getMountStatsPath() string {
Expand All @@ -24,17 +26,18 @@ func TestNFSClientParsev3(t *testing.T) {
nfsclient.nfs3Ops = map[string]bool{"READLINK": true, "GETATTR": false}
nfsclient.nfs4Ops = map[string]bool{"READLINK": true, "GETATTR": false}
data := strings.Fields(" READLINK: 500 501 502 503 504 505 506 507")
nfsclient.parseStat("1.2.3.4:/storage/NFS", "/A", "3", data, &acc)
err := nfsclient.parseStat("1.2.3.4:/storage/NFS", "/A", "3", data, &acc)
require.NoError(t, err)

fieldsOps := map[string]interface{}{
"ops": int64(500),
"trans": int64(501),
"timeouts": int64(502),
"bytes_sent": int64(503),
"bytes_recv": int64(504),
"queue_time": int64(505),
"response_time": int64(506),
"total_time": int64(507),
"ops": uint64(500),
"trans": uint64(501),
"timeouts": uint64(502),
"bytes_sent": uint64(503),
"bytes_recv": uint64(504),
"queue_time": uint64(505),
"response_time": uint64(506),
"total_time": uint64(507),
}
acc.AssertContainsFields(t, "nfs_ops", fieldsOps)
}
Expand All @@ -46,17 +49,41 @@ func TestNFSClientParsev4(t *testing.T) {
nfsclient.nfs3Ops = map[string]bool{"DESTROY_SESSION": true, "GETATTR": false}
nfsclient.nfs4Ops = map[string]bool{"DESTROY_SESSION": true, "GETATTR": false}
data := strings.Fields(" DESTROY_SESSION: 500 501 502 503 504 505 506 507")
nfsclient.parseStat("2.2.2.2:/nfsdata/", "/B", "4", data, &acc)
err := nfsclient.parseStat("2.2.2.2:/nfsdata/", "/B", "4", data, &acc)
require.NoError(t, err)

fieldsOps := map[string]interface{}{
"ops": uint64(500),
"trans": uint64(501),
"timeouts": uint64(502),
"bytes_sent": uint64(503),
"bytes_recv": uint64(504),
"queue_time": uint64(505),
"response_time": uint64(506),
"total_time": uint64(507),
}
acc.AssertContainsFields(t, "nfs_ops", fieldsOps)
}

func TestNFSClientParseLargeValue(t *testing.T) {
var acc testutil.Accumulator

nfsclient := NFSClient{Fullstat: true}
nfsclient.nfs3Ops = map[string]bool{"SETCLIENTID": true, "GETATTR": false}
nfsclient.nfs4Ops = map[string]bool{"SETCLIENTID": true, "GETATTR": false}
data := strings.Fields(" SETCLIENTID: 218 216 0 53568 12960 18446744073709531008 134 197")
err := nfsclient.parseStat("2.2.2.2:/nfsdata/", "/B", "4", data, &acc)
require.NoError(t, err)

fieldsOps := map[string]interface{}{
"ops": int64(500),
"trans": int64(501),
"timeouts": int64(502),
"bytes_sent": int64(503),
"bytes_recv": int64(504),
"queue_time": int64(505),
"response_time": int64(506),
"total_time": int64(507),
"ops": uint64(218),
"trans": uint64(216),
"timeouts": uint64(0),
"bytes_sent": uint64(53568),
"bytes_recv": uint64(12960),
"queue_time": uint64(18446744073709531008),
"response_time": uint64(134),
"total_time": uint64(197),
}
acc.AssertContainsFields(t, "nfs_ops", fieldsOps)
}
Expand All @@ -72,14 +99,15 @@ func TestNFSClientProcessStat(t *testing.T) {

scanner := bufio.NewScanner(file)

nfsclient.processText(scanner, &acc)
err := nfsclient.processText(scanner, &acc)
require.NoError(t, err)

fieldsReadstat := map[string]interface{}{
"ops": int64(600),
"retrans": int64(1),
"bytes": int64(1207),
"rtt": int64(606),
"exe": int64(607),
"ops": uint64(600),
"retrans": uint64(1),
"bytes": uint64(1207),
"rtt": uint64(606),
"exe": uint64(607),
}

readTags := map[string]string{
Expand All @@ -91,11 +119,11 @@ func TestNFSClientProcessStat(t *testing.T) {
acc.AssertContainsTaggedFields(t, "nfsstat", fieldsReadstat, readTags)

fieldsWritestat := map[string]interface{}{
"ops": int64(700),
"retrans": int64(1),
"bytes": int64(1407),
"rtt": int64(706),
"exe": int64(707),
"ops": uint64(700),
"retrans": uint64(1),
"bytes": uint64(1407),
"rtt": uint64(706),
"exe": uint64(707),
}

writeTags := map[string]string{
Expand All @@ -117,57 +145,58 @@ func TestNFSClientProcessFull(t *testing.T) {

scanner := bufio.NewScanner(file)

nfsclient.processText(scanner, &acc)
err := nfsclient.processText(scanner, &acc)
require.NoError(t, err)

fieldsEvents := map[string]interface{}{
"inoderevalidates": int64(301736),
"dentryrevalidates": int64(22838),
"datainvalidates": int64(410979),
"attrinvalidates": int64(26188427),
"vfsopen": int64(27525),
"vfslookup": int64(9140),
"vfsaccess": int64(114420),
"vfsupdatepage": int64(30785253),
"vfsreadpage": int64(5308856),
"vfsreadpages": int64(5364858),
"vfswritepage": int64(30784819),
"vfswritepages": int64(79832668),
"vfsgetdents": int64(170),
"vfssetattr": int64(64),
"vfsflush": int64(18194),
"vfsfsync": int64(29294718),
"vfslock": int64(0),
"vfsrelease": int64(18279),
"congestionwait": int64(0),
"setattrtrunc": int64(2),
"extendwrite": int64(785551),
"sillyrenames": int64(0),
"shortreads": int64(0),
"shortwrites": int64(0),
"delay": int64(0),
"pnfsreads": int64(0),
"pnfswrites": int64(0),
"inoderevalidates": uint64(301736),
"dentryrevalidates": uint64(22838),
"datainvalidates": uint64(410979),
"attrinvalidates": uint64(26188427),
"vfsopen": uint64(27525),
"vfslookup": uint64(9140),
"vfsaccess": uint64(114420),
"vfsupdatepage": uint64(30785253),
"vfsreadpage": uint64(5308856),
"vfsreadpages": uint64(5364858),
"vfswritepage": uint64(30784819),
"vfswritepages": uint64(79832668),
"vfsgetdents": uint64(170),
"vfssetattr": uint64(64),
"vfsflush": uint64(18194),
"vfsfsync": uint64(29294718),
"vfslock": uint64(0),
"vfsrelease": uint64(18279),
"congestionwait": uint64(0),
"setattrtrunc": uint64(2),
"extendwrite": uint64(785551),
"sillyrenames": uint64(0),
"shortreads": uint64(0),
"shortwrites": uint64(0),
"delay": uint64(0),
"pnfsreads": uint64(0),
"pnfswrites": uint64(0),
}
fieldsBytes := map[string]interface{}{
"normalreadbytes": int64(204440464584),
"normalwritebytes": int64(110857586443),
"directreadbytes": int64(783170354688),
"directwritebytes": int64(296174954496),
"serverreadbytes": int64(1134399088816),
"serverwritebytes": int64(407107155723),
"readpages": int64(85749323),
"writepages": int64(30784819),
"normalreadbytes": uint64(204440464584),
"normalwritebytes": uint64(110857586443),
"directreadbytes": uint64(783170354688),
"directwritebytes": uint64(296174954496),
"serverreadbytes": uint64(1134399088816),
"serverwritebytes": uint64(407107155723),
"readpages": uint64(85749323),
"writepages": uint64(30784819),
}
fieldsXprtTCP := map[string]interface{}{
"bind_count": int64(1),
"connect_count": int64(1),
"connect_time": int64(0),
"idle_time": int64(0),
"rpcsends": int64(96172963),
"rpcreceives": int64(96172963),
"badxids": int64(0),
"inflightsends": int64(620878754),
"backlogutil": int64(0),
"bind_count": uint64(1),
"connect_count": uint64(1),
"connect_time": uint64(0),
"idle_time": uint64(0),
"rpcsends": uint64(96172963),
"rpcreceives": uint64(96172963),
"badxids": uint64(0),
"inflightsends": uint64(620878754),
"backlogutil": uint64(0),
}

acc.AssertContainsFields(t, "nfs_events", fieldsEvents)
Expand Down

0 comments on commit 071fef7

Please sign in to comment.