From 2153345210e500a44b51405caf68a788980a0305 Mon Sep 17 00:00:00 2001 From: ont Date: Tue, 22 Sep 2020 15:28:05 +0700 Subject: [PATCH] (+) support for json log format --- go.mod | 1 + go.sum | 5 ++ pkg/server/grammar.go | 39 ++++++++++-- pkg/server/log-message.go | 4 ++ pkg/server/worker.go | 126 +++++++++++++++++++++++++++++--------- 5 files changed, 139 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 36398d0..c5d6a9c 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/sergi/go-diff v1.1.0 // indirect github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect github.com/sirupsen/logrus v1.6.0 + github.com/square/go-jose/v3 v3.0.0-20200630053402-0a67ce9b0693 github.com/stretchr/testify v1.6.1 // indirect github.com/valyala/fasthttp v1.16.0 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect diff --git a/go.sum b/go.sum index 2ec182a..c6f2c45 100644 --- a/go.sum +++ b/go.sum @@ -333,6 +333,10 @@ github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tL github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/square/go-jose v1.1.2 h1:YCLGvrhulDcHE8AyoqKUKMaY7fQL9tg0c3Uh4QWvQzQ= +github.com/square/go-jose v2.5.1+incompatible h1:FC+BwI9FzJZWpKaE0yUhFNbp/CyFHndARzuGVME/LGk= +github.com/square/go-jose/v3 v3.0.0-20200630053402-0a67ce9b0693 h1:wD1IWQwAhdWclCwaf6DdzgCAe9Bfz1M+4AHRd7N786Y= +github.com/square/go-jose/v3 v3.0.0-20200630053402-0a67ce9b0693/go.mod h1:6hSY48PjDm4UObWmGLyJE9DxYVKTgR9kbCspXXJEhcU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -377,6 +381,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/pkg/server/grammar.go b/pkg/server/grammar.go index 19da3b3..adb1e56 100644 --- a/pkg/server/grammar.go +++ b/pkg/server/grammar.go @@ -3,6 +3,7 @@ package server import ( "io" "regexp" + "strings" "github.com/alecthomas/participle" "github.com/mohae/deepcopy" @@ -22,6 +23,7 @@ type Metric struct { Name string `"metric" @String "{"` Params []*Param `{ @@ } "}"` + parseJSON bool re *regexp.Regexp script *Script eventName string // name of event, i.e. name of special column in influx which will contain "1" value @@ -43,17 +45,32 @@ func NewGrammar(reader io.Reader) *Grammar { func (m *Metric) unpackParams() { reStr := m.Get("regexp") - if reStr == "" { - log.Fatal("empty or missed regexp for metric") - } - - m.re = regexp.MustCompile(reStr) + format := m.Get("format") - m.eventName = m.Get("event") + if reStr == "" && format == "" { + log.Fatal("empty or missed 'regexp' or 'format' fields in metric") + } if script := m.Get("script"); script != "" { m.script = NewScript(script) } + + if reStr != "" { + m.re = regexp.MustCompile(reStr) + + m.eventName = m.Get("event") + + } else { + if format != "json" { + log.Fatalf("unsupported format: %s", format) + } + + if m.script == nil { + log.Fatal("missing 'script' section; 'script' must be used for specifying tags and values from json") + } + + m.parseJSON = true + } } func (m *Metric) Clone() *Metric { @@ -76,3 +93,13 @@ func (m *Metric) Get(param string) string { return "" } + +func (m *Metric) GetBool(param string) bool { + for _, pobj := range m.Params { + if pobj.Key == param { + return strings.TrimSpace(strings.ToLower(pobj.Value)) == "true" + } + } + + return false +} diff --git a/pkg/server/log-message.go b/pkg/server/log-message.go index 20c320b..e646cbf 100644 --- a/pkg/server/log-message.go +++ b/pkg/server/log-message.go @@ -30,6 +30,10 @@ func (l LogMessage) Message() string { return l.getFieldStr(MessageFieldName) } +func (l LogMessage) MessageBytes() []byte { + return []byte(l.getFieldStr(MessageFieldName)) +} + func (l LogMessage) Route() string { return l.getFieldStr(RouteFieldName) } diff --git a/pkg/server/worker.go b/pkg/server/worker.go index 6d4a9bf..40a945f 100644 --- a/pkg/server/worker.go +++ b/pkg/server/worker.go @@ -7,6 +7,7 @@ import ( influx "github.com/influxdata/influxdb/client/v2" log "github.com/sirupsen/logrus" + "github.com/square/go-jose/v3/json" ) type Worker struct { @@ -55,50 +56,115 @@ func (w *Worker) CreateBatch() { func (w *Worker) Process(message LogMessage) { for _, metric := range w.metrics { - matches := metric.re.FindStringSubmatch(message.Message()) - if len(matches) > 0 { + var matched bool + var err error + + // TODO: decompose processing logic into two structs: + // MetricJson + // MetricRegex + // .... + // TODO: AST must returns basic union type with + // MetricAST.Parse() --> MetricInterface + // .... + // MetricInterface.Process(message) --> bool, err + if metric.parseJSON { + matched, err = w.processJSON(metric, message) + } else { + matched, err = w.processRegex(metric, message) + } - log.WithField("matches", matches).Debug("worker: found matches") + if err != nil { + log.WithError(err). + WithField("message", message.Message()). + Error("error during parsing") + } - tags, values, data, err := w.GetTagsValues(message.Message(), metric, matches) + if matched { + break // stop on first matched metric + } + } - log.WithField("tags", tags). - WithField("values", values). - WithField("data", data).Debug("worker: parsed tags, values and data") + if len(w.batch.Points()) >= w.CommitAmount { + w.Flush() + } +} - if err != nil { - break - } +func (w *Worker) processJSON(metric *Metric, message LogMessage) (bool, error) { + log.Debug("worker: processing as json") - if metric.script != nil { - tags, values, err = w.ProcessScript(metric.script, message.Message(), tags, values, data) - if err != nil { - break - } + var data map[string]interface{} + err := json.Unmarshal(message.MessageBytes(), &data) + if err != nil { + return false, err + } - log.WithField("tags", tags). - WithField("values", values). - Debug("worker: parsed tags and values after script") - } + tags, values, err := w.ProcessScript(metric.script, message.Message(), PointTags{}, PointValues{}, data) + if err != nil { + return false, err + } + + log.WithField("tags", tags). + WithField("values", values). + Debug("worker: parsed tags and values after script") + + // add hostname and program as tag to influx point + // NOTE: it overwrites any "tag_host" and "tag_program" value from regexp and script + tags["host"] = message.Host() + tags["program"] = message.Program() + + log.WithField("tags", tags). + WithField("values", values). + Debug("worker: final tags and values for point") - // add hostname and program as tag to influx point - // NOTE: it overwrites any "tag_host" and "tag_program" value from regexp and script - tags["host"] = message.Host() - tags["program"] = message.Program() + w.AddPoint(metric, tags, values) + + return true, nil +} + +func (w *Worker) processRegex(metric *Metric, message LogMessage) (bool, error) { + log.Debug("worker: processing as regex pattern") + + matches := metric.re.FindStringSubmatch(message.Message()) + if len(matches) > 0 { + + log.WithField("matches", matches).Debug("worker: found matches") + + tags, values, data, err := w.GetTagsValues(message.Message(), metric, matches) + + log.WithField("tags", tags). + WithField("values", values). + WithField("data", data).Debug("worker: parsed tags, values and data") + + if err != nil { + return false, err + } + + if metric.script != nil { + tags, values, err = w.ProcessScript(metric.script, message.Message(), tags, values, data) + if err != nil { + return false, err + } log.WithField("tags", tags). WithField("values", values). - Debug("worker: final tags and values for point") + Debug("worker: parsed tags and values after script") + } - w.AddPoint(metric, tags, values) + // add hostname and program as tag to influx point + // NOTE: it overwrites any "tag_host" and "tag_program" value from regexp and script + tags["host"] = message.Host() + tags["program"] = message.Program() - break // ignore any other metrics from config - } - } + log.WithField("tags", tags). + WithField("values", values). + Debug("worker: final tags and values for point") - if len(w.batch.Points()) >= w.CommitAmount { - w.Flush() + w.AddPoint(metric, tags, values) + + return true, nil } + + return false, nil } // GetTagsValues extracts tags, values and additional data from regexp match result