forked from bshuster-repo/logrus-logstash-hook
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathlogstash.go
345 lines (280 loc) · 11.1 KB
/
logstash.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
package logrustash
import (
"fmt"
"math"
"net"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
)
const defaultAsyncBufferSize = 8192
// Hook represents a connection to a Logstash instance
type Hook struct {
sync.RWMutex
conn net.Conn
protocol string
address string
appName string
alwaysSentFields logrus.Fields
hookOnlyPrefix string
TimeFormat string
fireChannel chan *logrus.Entry
AsyncBufferSize int
WaitUntilBufferFrees bool
Timeout time.Duration // Timeout for sending message.
MaxSendRetries int // Declares how many times we will try to resend message.
ReconnectBaseDelay time.Duration // First reconnect delay.
ReconnectDelayMultiplier float64 // Base multiplier for delay before reconnect.
MaxReconnectRetries int // Declares how many times we will try to reconnect.
}
// NewHook creates a new hook to a Logstash instance, which listens on
// `protocol`://`address`.
func NewHook(protocol, address, appName string) (*Hook, error) {
return NewHookWithFields(protocol, address, appName, make(logrus.Fields))
}
// NewAsyncHook creates a new hook to a Logstash instance, which listens on
// `protocol`://`address`.
// Logs will be sent asynchronously.
func NewAsyncHook(protocol, address, appName string) (*Hook, error) {
return NewAsyncHookWithFields(protocol, address, appName, make(logrus.Fields))
}
// NewHookWithConn creates a new hook to a Logstash instance, using the supplied connection.
func NewHookWithConn(conn net.Conn, appName string) (*Hook, error) {
return NewHookWithFieldsAndConn(conn, appName, make(logrus.Fields))
}
// NewAsyncHookWithConn creates a new hook to a Logstash instance, using the supplied connection.
// Logs will be sent asynchronously.
func NewAsyncHookWithConn(conn net.Conn, appName string) (*Hook, error) {
return NewAsyncHookWithFieldsAndConn(conn, appName, make(logrus.Fields))
}
// NewHookWithFields creates a new hook to a Logstash instance, which listens on
// `protocol`://`address`. alwaysSentFields will be sent with every log entry.
func NewHookWithFields(protocol, address, appName string, alwaysSentFields logrus.Fields) (*Hook, error) {
return NewHookWithFieldsAndPrefix(protocol, address, appName, alwaysSentFields, "")
}
// NewAsyncHookWithFields creates a new hook to a Logstash instance, which listens on
// `protocol`://`address`. alwaysSentFields will be sent with every log entry.
// Logs will be sent asynchronously.
func NewAsyncHookWithFields(protocol, address, appName string, alwaysSentFields logrus.Fields) (*Hook, error) {
return NewAsyncHookWithFieldsAndPrefix(protocol, address, appName, alwaysSentFields, "")
}
// NewHookWithFieldsAndPrefix creates a new hook to a Logstash instance, which listens on
// `protocol`://`address`. alwaysSentFields will be sent with every log entry. prefix is used to select fields to filter.
func NewHookWithFieldsAndPrefix(protocol, address, appName string, alwaysSentFields logrus.Fields, prefix string) (*Hook, error) {
conn, err := net.Dial(protocol, address)
if err != nil {
return nil, err
}
hook, err := NewHookWithFieldsAndConnAndPrefix(conn, appName, alwaysSentFields, prefix)
hook.protocol = protocol
hook.address = address
return hook, err
}
// NewAsyncHookWithFieldsAndPrefix creates a new hook to a Logstash instance, which listens on
// `protocol`://`address`. alwaysSentFields will be sent with every log entry. prefix is used to select fields to filter.
// Logs will be sent asynchronously.
func NewAsyncHookWithFieldsAndPrefix(protocol, address, appName string, alwaysSentFields logrus.Fields, prefix string) (*Hook, error) {
hook, err := NewHookWithFieldsAndPrefix(protocol, address, appName, alwaysSentFields, prefix)
if err != nil {
return nil, err
}
hook.makeAsync()
return hook, err
}
// NewHookWithFieldsAndConn creates a new hook to a Logstash instance using the supplied connection.
func NewHookWithFieldsAndConn(conn net.Conn, appName string, alwaysSentFields logrus.Fields) (*Hook, error) {
return NewHookWithFieldsAndConnAndPrefix(conn, appName, alwaysSentFields, "")
}
// NewAsyncHookWithFieldsAndConn creates a new hook to a Logstash instance using the supplied connection.
// Logs will be sent asynchronously.
func NewAsyncHookWithFieldsAndConn(conn net.Conn, appName string, alwaysSentFields logrus.Fields) (*Hook, error) {
return NewAsyncHookWithFieldsAndConnAndPrefix(conn, appName, alwaysSentFields, "")
}
//NewHookWithFieldsAndConnAndPrefix creates a new hook to a Logstash instance using the suppolied connection and prefix.
func NewHookWithFieldsAndConnAndPrefix(conn net.Conn, appName string, alwaysSentFields logrus.Fields, prefix string) (*Hook, error) {
return &Hook{conn: conn, appName: appName, alwaysSentFields: alwaysSentFields, hookOnlyPrefix: prefix}, nil
}
// NewAsyncHookWithFieldsAndConnAndPrefix creates a new hook to a Logstash instance using the suppolied connection and prefix.
// Logs will be sent asynchronously.
func NewAsyncHookWithFieldsAndConnAndPrefix(conn net.Conn, appName string, alwaysSentFields logrus.Fields, prefix string) (*Hook, error) {
hook := &Hook{conn: conn, appName: appName, alwaysSentFields: alwaysSentFields, hookOnlyPrefix: prefix}
hook.makeAsync()
return hook, nil
}
// NewFilterHook makes a new hook which does not forward to logstash, but simply enforces the prefix rules.
func NewFilterHook() *Hook {
return NewFilterHookWithPrefix("")
}
// NewAsyncFilterHook makes a new hook which does not forward to logstash, but simply enforces the prefix rules.
// Logs will be sent asynchronously.
func NewAsyncFilterHook() *Hook {
return NewAsyncFilterHookWithPrefix("")
}
// NewFilterHookWithPrefix make a new hook which does not forward to logstash, but simply enforces the specified prefix.
func NewFilterHookWithPrefix(prefix string) *Hook {
return &Hook{conn: nil, appName: "", alwaysSentFields: make(logrus.Fields), hookOnlyPrefix: prefix}
}
// NewAsyncFilterHookWithPrefix make a new hook which does not forward to logstash, but simply enforces the specified prefix.
// Logs will be sent asynchronously.
func NewAsyncFilterHookWithPrefix(prefix string) *Hook {
hook := NewFilterHookWithPrefix(prefix)
hook.makeAsync()
return hook
}
func (h *Hook) makeAsync() {
if h.AsyncBufferSize == 0 {
h.AsyncBufferSize = defaultAsyncBufferSize
}
h.fireChannel = make(chan *logrus.Entry, h.AsyncBufferSize)
go func() {
for entry := range h.fireChannel {
if err := h.sendMessage(entry); err != nil {
fmt.Println("Error during sending message to logstash:", err)
}
}
}()
}
func (h *Hook) filterHookOnly(entry *logrus.Entry) {
if h.hookOnlyPrefix != "" {
for key := range entry.Data {
if strings.HasPrefix(key, h.hookOnlyPrefix) {
delete(entry.Data, key)
}
}
}
}
//WithPrefix sets a prefix filter to use in all subsequent logging
func (h *Hook) WithPrefix(prefix string) {
h.hookOnlyPrefix = prefix
}
//WithField add field with value that will be sent with each message
func (h *Hook) WithField(key string, value interface{}) {
h.alwaysSentFields[key] = value
}
// WithFields add fields with values that will be sent with each message
func (h *Hook) WithFields(fields logrus.Fields) {
// Add all the new fields to the 'alwaysSentFields', possibly overwriting existing fields
for key, value := range fields {
h.alwaysSentFields[key] = value
}
}
// Fire send message to logstash.
// In async mode log message will be dropped if message buffer is full.
// If you want wait until message buffer frees – set WaitUntilBufferFrees to true.
func (h *Hook) Fire(entry *logrus.Entry) error {
if h.fireChannel != nil { // Async mode.
select {
case h.fireChannel <- entry:
default:
if h.WaitUntilBufferFrees {
h.fireChannel <- entry // Blocks the goroutine because buffer is full.
return nil
}
// Drop message by default.
}
return nil
}
return h.sendMessage(entry)
}
func (h *Hook) sendMessage(entry *logrus.Entry) error {
// Make sure we always clear the hook only fields from the entry
defer h.filterHookOnly(entry)
// Add in the alwaysSentFields. We don't override fields that are already set.
for k, v := range h.alwaysSentFields {
if _, inMap := entry.Data[k]; !inMap {
entry.Data[k] = v
}
}
// For a filteringHook, stop here
h.RLock()
if h.conn == nil {
h.RUnlock()
return nil
}
h.RUnlock()
formatter := LogstashFormatter{Type: h.appName}
if h.TimeFormat != "" {
formatter.TimestampFormat = h.TimeFormat
}
dataBytes, err := formatter.FormatWithPrefix(entry, h.hookOnlyPrefix)
if err != nil {
return err
}
return h.performSend(dataBytes, 0)
}
// performSend tries to send data recursively.
// sendRetries is the actual number of attempts to resend message.
func (h *Hook) performSend(data []byte, sendRetries int) error {
if h.Timeout > 0 {
h.Lock()
h.conn.SetWriteDeadline(time.Now().Add(h.Timeout))
h.Unlock()
}
h.Lock()
_, err := h.conn.Write(data)
h.Unlock()
if err != nil {
return h.processSendError(err, data, sendRetries)
}
return nil
}
func (h *Hook) processSendError(err error, data []byte, sendRetries int) error {
netErr, ok := err.(net.Error)
if !ok {
return err
}
if h.isNeedToResendMessage(netErr, sendRetries) {
return h.performSend(data, sendRetries+1)
}
if !netErr.Temporary() && h.MaxReconnectRetries > 0 {
if err := h.reconnect(0); err != nil {
return fmt.Errorf("Couldn't reconnect to logstash: %s. The reason of reconnect: %s", err, netErr)
}
return h.performSend(data, 0)
}
return err
}
// TODO Check reconnect for NOT ASYNC mode.
// The hook will reconnect to Logstash several times with increasing sleep duration between each reconnect attempt.
// Sleep duration calculated as product of ReconnectBaseDelay by ReconnectDelayMultiplier to the power of reconnectRetries.
// reconnectRetries is the actual number of attempts to reconnect.
func (h *Hook) reconnect(reconnectRetries int) error {
if h.protocol == "" || h.address == "" {
return fmt.Errorf("Can't reconnect because current configuration doesn't support it")
}
// Sleep before reconnect.
delay := float64(h.ReconnectBaseDelay) * math.Pow(h.ReconnectDelayMultiplier, float64(reconnectRetries))
time.Sleep(time.Duration(delay))
conn, err := net.Dial(h.protocol, h.address)
// Oops. Can't connect. No problem. Let's try again.
if err != nil {
if !h.isNeedToReconnect(reconnectRetries) {
// We have reached limit of re-connections.
return err
}
return h.reconnect(reconnectRetries + 1)
}
h.Lock()
h.conn = conn
h.Unlock()
return nil
}
func (h *Hook) isNeedToResendMessage(err net.Error, sendRetries int) bool {
return (err.Temporary() || err.Timeout()) && sendRetries < h.MaxSendRetries
}
func (h *Hook) isNeedToReconnect(reconnectRetries int) bool {
return reconnectRetries < h.MaxReconnectRetries
}
// Levels specifies "active" log levels.
// Log messages with this levels will be sent to logstash.
func (h *Hook) Levels() []logrus.Level {
return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
logrus.DebugLevel,
}
}