-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathencoder.go
172 lines (152 loc) · 4.58 KB
/
encoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package fluxline
// DCSO fluxline
// Copyright (c) 2017, 2018, DCSO GmbH
import (
"fmt"
"io"
"reflect"
"sort"
"strings"
"time"
"github.com/Showmax/go-fqdn"
)
// Encoder represents a component that encapsulates a target environment for
// measurement submissions, as given by hostname and receiving writer.
type Encoder struct {
host string
Writer io.Writer
}
func escapeSpecialChars(in string) string {
str := strings.Replace(in, ",", `\,`, -1)
str = strings.Replace(str, "=", `\=`, -1)
str = strings.Replace(str, " ", `\ `, -1)
return str
}
func toInfluxRepr(tag string, val interface{}, nostatictypes bool) (string, error) {
switch v := val.(type) {
case string:
if len(v) > 64000 {
return "", fmt.Errorf("%s: string too long (%d characters, max. 64K)", tag, len(v))
}
return fmt.Sprintf("%q", v), nil
case int32, int64, int16, int8, int, uint32, uint64, uint16, uint8, uint:
if nostatictypes {
return fmt.Sprintf("%d", v), nil
}
return fmt.Sprintf("%di", v), nil
case float64, float32:
return fmt.Sprintf("%g", v), nil
case bool:
return fmt.Sprintf("%t", v), nil
case time.Time:
return fmt.Sprintf("%d", uint64(v.UnixNano())), nil
default:
return "", fmt.Errorf("%s: unsupported type for Influx Line Protocol", tag)
}
}
func recordFields(val interface{},
fieldSet map[string]string, nostatictypes bool) (map[string]string, error) {
t := reflect.TypeOf(val)
v := reflect.ValueOf(val)
for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
tag := field.Tag.Get("influx")
if tag == "" {
continue
}
repr, err := toInfluxRepr(tag, v.Field(i).Interface(), nostatictypes)
if err != nil {
return nil, err
}
fieldSet[tag] = repr
}
return fieldSet, nil
}
func (a *Encoder) formatLineProtocol(prefix string,
tags map[string]string, fieldSet map[string]string) string {
out := ""
tagstr := ""
// sort by key to obtain stable output order
keys := make([]string, 0, len(tags))
for key := range tags {
keys = append(keys, key)
}
sort.Strings(keys)
// serialize tags
for _, k := range keys {
tagstr += ","
tagstr += fmt.Sprintf("%s=%s", escapeSpecialChars(k), escapeSpecialChars(tags[k]))
}
// sort by key to obtain stable output order
keys = make([]string, 0, len(fieldSet))
for key := range fieldSet {
keys = append(keys, key)
}
sort.Strings(keys)
// serialize fields
first := true
for _, k := range keys {
if !first {
out += ","
} else {
first = false
}
out += fmt.Sprintf("%s=%s", escapeSpecialChars(k), fieldSet[k])
}
if out == "" {
return ""
}
// construct line protocol string
return fmt.Sprintf("%s,host=%s%s %s %d\n", prefix, a.host,
tagstr, out, uint64(time.Now().UnixNano()))
}
// Encode writes the line protocol representation for a given measurement
// name, data struct and tag map to the io.Writer specified on encoder creation.
func (a *Encoder) encodeGeneric(prefix string, val interface{},
tags map[string]string, nostatictypes bool) error {
fieldSet := make(map[string]string)
fieldSet, err := recordFields(val, fieldSet, nostatictypes)
if err != nil {
return err
}
_, err = a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, fieldSet)))
return err
}
// Encode writes the line protocol representation for a given measurement
// name, data struct and tag map to the io.Writer specified on encoder creation.
func (a *Encoder) Encode(prefix string, val interface{},
tags map[string]string) error {
return a.encodeGeneric(prefix, val, tags, false)
}
// EncodeWithoutTypes writes the line protocol representation for a given measurement
// name, data struct and tag map to the io.Writer specified on encoder creation.
// In contrast to Encode(), this method never appends type suffixes to values.
func (a *Encoder) EncodeWithoutTypes(prefix string, val interface{},
tags map[string]string) error {
return a.encodeGeneric(prefix, val, tags, true)
}
// EncodeMap writes the line protocol representation for a given measurement
// name, field value map and tag map to the io.Writer specified on encoder
// creation.
func (a *Encoder) EncodeMap(prefix string, val map[string]string,
tags map[string]string) error {
_, err := a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, val)))
return err
}
// NewEncoder creates a new encoder that writes to the given io.Writer.
func NewEncoder(w io.Writer) *Encoder {
a := &Encoder{
host: fqdn.Get(),
Writer: w,
}
return a
}
// NewEncoderWithHostname creates a new encoder that writes to the given
// io.Writer with an overridden hostname
func NewEncoderWithHostname(w io.Writer, host string) *Encoder {
a := &Encoder{
host: host,
Writer: w,
}
return a
}