Skip to content

Commit

Permalink
(r) LogMessage is extended map[string]interface{}
Browse files Browse the repository at this point in the history
ont committed May 8, 2018
1 parent 6013600 commit d9bdd20
Showing 6 changed files with 74 additions and 72 deletions.
19 changes: 5 additions & 14 deletions src/flux/config.go
Original file line number Diff line number Diff line change
@@ -11,27 +11,18 @@ import (

func NewRootConsumer(app *iris.Application) *RootConsumer {
consumer := &RootConsumer{
BaseConsumer: BaseConsumer{
HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message
MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ...
},
RouteFieldName: GetenvStr("FLUX_ROUTE_FIELD_NAME", "ROUTE"),
consumers: make(map[string]*Consumer),
consumers: make(map[string]*Consumer),
}

app.Post("/", consumer.Handle)

return consumer
}

func NewConsumer(app *iris.Application, route *Route) (*Consumer, chan *LogMessage) {
queue := make(chan *LogMessage, GetenvInt("FLUX_INTERNAL_BUFFER", 1000))
func NewConsumer(app *iris.Application, route *Route) (*Consumer, chan LogMessage) {
queue := make(chan LogMessage, GetenvInt("FLUX_INTERNAL_BUFFER", 1000))

consumer := &Consumer{
BaseConsumer: BaseConsumer{
HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message
MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ...
},
queue: queue,
}

@@ -40,7 +31,7 @@ func NewConsumer(app *iris.Application, route *Route) (*Consumer, chan *LogMessa
return consumer, queue
}

func NewWorkers(queue chan *LogMessage, metrics []*Metric) []*Worker {
func NewWorkers(queue chan LogMessage, metrics []*Metric) []*Worker {
cnt := GetenvInt("FlUX_WORKERS", 2)
workers := make([]*Worker, 0, cnt)

@@ -51,7 +42,7 @@ func NewWorkers(queue chan *LogMessage, metrics []*Metric) []*Worker {
return workers
}

func NewWorker(queue chan *LogMessage, metrics []*Metric) *Worker {
func NewWorker(queue chan LogMessage, metrics []*Metric) *Worker {
client, err := influx.NewHTTPClient(influx.HTTPConfig{
Addr: os.Getenv("FLUX_INFLUX_URL"), //"http://localhost:8086"
})
47 changes: 8 additions & 39 deletions src/flux/consumer-base.go
Original file line number Diff line number Diff line change
@@ -8,20 +8,10 @@ import (
log "github.com/sirupsen/logrus"
)

type BaseConsumer struct {
HostFieldName string // name of "host" field in json log message
MessageFieldName string // name of "message" field in json log message
}

type LogMessage struct {
Host string
Message string
type BaseConsumer struct{}

data map[string]interface{}
}

func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []*LogMessage {
messages := make([]*LogMessage, 0) // TODO: change to channel (don't parse all json messages into memory)
func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []LogMessage {
messages := make([]LogMessage, 0) // TODO: change to channel (don't parse all json messages into memory)

reader := bufio.NewReader(body)
for {
@@ -32,40 +22,19 @@ func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []*LogMessage {
break
}

var data map[string]interface{}
var host, message string
var logMessage *LogMessage
var message LogMessage

if err := json.Unmarshal(bytes, &data); err != nil {
if err := json.Unmarshal(bytes, &message); err != nil {
log.WithError(err).WithField("line", string(bytes)).Error("can't parse line from POST body as JSON")
goto eofcheck
}

if value, ok := data[c.HostFieldName].(string); ok {
host = value
} else {
log.WithField("field_name", c.HostFieldName).
WithField("value", data[c.HostFieldName]).
Error("can't find/convert 'host' field from JSON to string")
if !message.Validate() {
log.WithField("line", string(bytes)).Error("skip invalid message")
goto eofcheck
}

if value, ok := data[c.MessageFieldName].(string); ok {
message = value
} else {
log.WithField("field_name", c.MessageFieldName).
WithField("value", data[c.MessageFieldName]).
Error("can't find/convert 'message' field from JSON to string")
goto eofcheck
}

logMessage = &LogMessage{
Host: host,
Message: message,
data: data,
}

messages = append(messages, logMessage)
messages = append(messages, message)

eofcheck:
if errReader == io.EOF {
11 changes: 1 addition & 10 deletions src/flux/consumer-root.go
Original file line number Diff line number Diff line change
@@ -14,16 +14,7 @@ type RootConsumer struct {

func (c *RootConsumer) Handle(ctx iris.Context) {
for _, message := range c.parseJSONs(ctx.Request().Body) {
var route string

if value, ok := message.data[c.RouteFieldName].(string); ok {
route = value
} else {
log.WithField("field_name", c.RouteFieldName).
WithField("value", message.data[c.RouteFieldName]).
Error("can't find/convert 'route' field from JSON to string")
continue
}
route := message.Route()

if consumer, found := c.consumers[route]; found {
log.WithField("message", message).WithField("route", route).Debug("consumer: sending message to queue of route")
2 changes: 1 addition & 1 deletion src/flux/consumer.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ import (
)

type Consumer struct {
queue chan *LogMessage
queue chan LogMessage

BaseConsumer
}
51 changes: 51 additions & 0 deletions src/flux/log-message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import log "github.com/sirupsen/logrus"

var (
HostFieldName string // name of "host" field in json log message
MessageFieldName string // name of "message" field in json log message
RouteFieldName string // name of "route" field in json log message
)

func init() {
HostFieldName = GetenvStr("FLUX_HOST_FIELD_NAME", "HOST")
MessageFieldName = GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE")
RouteFieldName = GetenvStr("FLUX_ROUTE_FIELD_NAME", "ROUTE")
}

type LogMessage map[string]interface{}

func (l LogMessage) Host() string {
return l.getFieldStr(HostFieldName)
}

func (l LogMessage) Message() string {
return l.getFieldStr(MessageFieldName)
}

func (l LogMessage) Route() string {
return l.getFieldStr(RouteFieldName)
}

func (l LogMessage) Validate() bool {
return l.hasField(HostFieldName) &&
l.hasField(MessageFieldName) &&
l.hasField(RouteFieldName)
}

func (l LogMessage) getFieldStr(name string) string {
if value, ok := l[name].(string); ok {
return value
} else {
log.WithField("field_name", name).
WithField("value", l[name]).
Error("can't find/convert field from JSON to string")
return ""
}
}

func (l LogMessage) hasField(name string) bool {
_, found := l[name]
return found
}
16 changes: 8 additions & 8 deletions src/flux/worker.go
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ type Worker struct {
CommitAmount int
Database string

queue chan *LogMessage
queue chan LogMessage
batch influx.BatchPoints
client influx.Client
metrics []*Metric
@@ -26,13 +26,13 @@ func (w *Worker) Start() {

for {
select {
case line, ok := <-w.queue:
case message, ok := <-w.queue:
if !ok {
w.Flush()
return // queue closed, we can exit
}

w.Process(line)
w.Process(message)

case <-tick:
w.Flush() // send bulk query to influx every tick event
@@ -53,14 +53,14 @@ func (w *Worker) CreateBatch() {
w.batch = batch
}

func (w *Worker) Process(message *LogMessage) {
func (w *Worker) Process(message LogMessage) {
for _, metric := range w.metrics {
matches := metric.re.FindStringSubmatch(message.Message)
matches := metric.re.FindStringSubmatch(message.Message())
if len(matches) > 0 {

log.WithField("matches", matches).Debug("worker: found matches")

tags, values, data, err := w.GetTagsValues(message.Message, metric, matches)
tags, values, data, err := w.GetTagsValues(message.Message(), metric, matches)

log.WithField("tags", tags).
WithField("values", values).
@@ -71,7 +71,7 @@ func (w *Worker) Process(message *LogMessage) {
}

if metric.script != nil {
tags, values, err = w.ProcessScript(metric.script, message.Message, tags, values, data)
tags, values, err = w.ProcessScript(metric.script, message.Message(), tags, values, data)
if err != nil {
break
}
@@ -83,7 +83,7 @@ func (w *Worker) Process(message *LogMessage) {

// add hostname as tag to point
// NOTE: it overwrites any "tag_host" value from regexp and script
tags["host"] = message.Host
tags["host"] = message.Host()

log.WithField("tags", tags).
WithField("values", values).

0 comments on commit d9bdd20

Please sign in to comment.