Skip to content

Commit

Permalink
(+) support for json log format
Browse files Browse the repository at this point in the history
  • Loading branch information
ont committed Sep 22, 2020
1 parent 91525d8 commit 2153345
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 36 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/sergi/go-diff v1.1.0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.6.0
github.com/square/go-jose/v3 v3.0.0-20200630053402-0a67ce9b0693
github.com/stretchr/testify v1.6.1 // indirect
github.com/valyala/fasthttp v1.16.0 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tL
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/square/go-jose v1.1.2 h1:YCLGvrhulDcHE8AyoqKUKMaY7fQL9tg0c3Uh4QWvQzQ=
github.com/square/go-jose v2.5.1+incompatible h1:FC+BwI9FzJZWpKaE0yUhFNbp/CyFHndARzuGVME/LGk=
github.com/square/go-jose/v3 v3.0.0-20200630053402-0a67ce9b0693 h1:wD1IWQwAhdWclCwaf6DdzgCAe9Bfz1M+4AHRd7N786Y=
github.com/square/go-jose/v3 v3.0.0-20200630053402-0a67ce9b0693/go.mod h1:6hSY48PjDm4UObWmGLyJE9DxYVKTgR9kbCspXXJEhcU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down Expand Up @@ -377,6 +381,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
39 changes: 33 additions & 6 deletions pkg/server/grammar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"io"
"regexp"
"strings"

"github.com/alecthomas/participle"
"github.com/mohae/deepcopy"
Expand All @@ -22,6 +23,7 @@ type Metric struct {
Name string `"metric" @String "{"`
Params []*Param `{ @@ } "}"`

parseJSON bool
re *regexp.Regexp
script *Script
eventName string // name of event, i.e. name of special column in influx which will contain "1" value
Expand All @@ -43,17 +45,32 @@ func NewGrammar(reader io.Reader) *Grammar {

func (m *Metric) unpackParams() {
reStr := m.Get("regexp")
if reStr == "" {
log.Fatal("empty or missed regexp for metric")
}

m.re = regexp.MustCompile(reStr)
format := m.Get("format")

m.eventName = m.Get("event")
if reStr == "" && format == "" {
log.Fatal("empty or missed 'regexp' or 'format' fields in metric")
}

if script := m.Get("script"); script != "" {
m.script = NewScript(script)
}

if reStr != "" {
m.re = regexp.MustCompile(reStr)

m.eventName = m.Get("event")

} else {
if format != "json" {
log.Fatalf("unsupported format: %s", format)
}

if m.script == nil {
log.Fatal("missing 'script' section; 'script' must be used for specifying tags and values from json")
}

m.parseJSON = true
}
}

func (m *Metric) Clone() *Metric {
Expand All @@ -76,3 +93,13 @@ func (m *Metric) Get(param string) string {

return ""
}

func (m *Metric) GetBool(param string) bool {
for _, pobj := range m.Params {
if pobj.Key == param {
return strings.TrimSpace(strings.ToLower(pobj.Value)) == "true"
}
}

return false
}
4 changes: 4 additions & 0 deletions pkg/server/log-message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (l LogMessage) Message() string {
return l.getFieldStr(MessageFieldName)
}

func (l LogMessage) MessageBytes() []byte {
return []byte(l.getFieldStr(MessageFieldName))
}

func (l LogMessage) Route() string {
return l.getFieldStr(RouteFieldName)
}
Expand Down
126 changes: 96 additions & 30 deletions pkg/server/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

influx "github.com/influxdata/influxdb/client/v2"
log "github.com/sirupsen/logrus"
"github.com/square/go-jose/v3/json"
)

type Worker struct {
Expand Down Expand Up @@ -55,50 +56,115 @@ func (w *Worker) CreateBatch() {

func (w *Worker) Process(message LogMessage) {
for _, metric := range w.metrics {
matches := metric.re.FindStringSubmatch(message.Message())
if len(matches) > 0 {
var matched bool
var err error

// TODO: decompose processing logic into two structs:
// MetricJson
// MetricRegex
// ....
// TODO: AST must returns basic union type with
// MetricAST.Parse() --> MetricInterface
// ....
// MetricInterface.Process(message) --> bool, err
if metric.parseJSON {
matched, err = w.processJSON(metric, message)
} else {
matched, err = w.processRegex(metric, message)
}

log.WithField("matches", matches).Debug("worker: found matches")
if err != nil {
log.WithError(err).
WithField("message", message.Message()).
Error("error during parsing")
}

tags, values, data, err := w.GetTagsValues(message.Message(), metric, matches)
if matched {
break // stop on first matched metric
}
}

log.WithField("tags", tags).
WithField("values", values).
WithField("data", data).Debug("worker: parsed tags, values and data")
if len(w.batch.Points()) >= w.CommitAmount {
w.Flush()
}
}

if err != nil {
break
}
func (w *Worker) processJSON(metric *Metric, message LogMessage) (bool, error) {
log.Debug("worker: processing as json")

if metric.script != nil {
tags, values, err = w.ProcessScript(metric.script, message.Message(), tags, values, data)
if err != nil {
break
}
var data map[string]interface{}
err := json.Unmarshal(message.MessageBytes(), &data)
if err != nil {
return false, err
}

log.WithField("tags", tags).
WithField("values", values).
Debug("worker: parsed tags and values after script")
}
tags, values, err := w.ProcessScript(metric.script, message.Message(), PointTags{}, PointValues{}, data)
if err != nil {
return false, err
}

log.WithField("tags", tags).
WithField("values", values).
Debug("worker: parsed tags and values after script")

// add hostname and program as tag to influx point
// NOTE: it overwrites any "tag_host" and "tag_program" value from regexp and script
tags["host"] = message.Host()
tags["program"] = message.Program()

log.WithField("tags", tags).
WithField("values", values).
Debug("worker: final tags and values for point")

// add hostname and program as tag to influx point
// NOTE: it overwrites any "tag_host" and "tag_program" value from regexp and script
tags["host"] = message.Host()
tags["program"] = message.Program()
w.AddPoint(metric, tags, values)

return true, nil
}

func (w *Worker) processRegex(metric *Metric, message LogMessage) (bool, error) {
log.Debug("worker: processing as regex pattern")

matches := metric.re.FindStringSubmatch(message.Message())
if len(matches) > 0 {

log.WithField("matches", matches).Debug("worker: found matches")

tags, values, data, err := w.GetTagsValues(message.Message(), metric, matches)

log.WithField("tags", tags).
WithField("values", values).
WithField("data", data).Debug("worker: parsed tags, values and data")

if err != nil {
return false, err
}

if metric.script != nil {
tags, values, err = w.ProcessScript(metric.script, message.Message(), tags, values, data)
if err != nil {
return false, err
}

log.WithField("tags", tags).
WithField("values", values).
Debug("worker: final tags and values for point")
Debug("worker: parsed tags and values after script")
}

w.AddPoint(metric, tags, values)
// add hostname and program as tag to influx point
// NOTE: it overwrites any "tag_host" and "tag_program" value from regexp and script
tags["host"] = message.Host()
tags["program"] = message.Program()

break // ignore any other metrics from config
}
}
log.WithField("tags", tags).
WithField("values", values).
Debug("worker: final tags and values for point")

if len(w.batch.Points()) >= w.CommitAmount {
w.Flush()
w.AddPoint(metric, tags, values)

return true, nil
}

return false, nil
}

// GetTagsValues extracts tags, values and additional data from regexp match result
Expand Down

0 comments on commit 2153345

Please sign in to comment.