diff --git a/src/flux/config.go b/src/flux/config.go index a03473d..8cd73c5 100644 --- a/src/flux/config.go +++ b/src/flux/config.go @@ -11,12 +11,7 @@ import ( func NewRootConsumer(app *iris.Application) *RootConsumer { consumer := &RootConsumer{ - BaseConsumer: BaseConsumer{ - HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message - MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ... - }, - RouteFieldName: GetenvStr("FLUX_ROUTE_FIELD_NAME", "ROUTE"), - consumers: make(map[string]*Consumer), + consumers: make(map[string]*Consumer), } app.Post("/", consumer.Handle) @@ -24,14 +19,10 @@ func NewRootConsumer(app *iris.Application) *RootConsumer { return consumer } -func NewConsumer(app *iris.Application, route *Route) (*Consumer, chan *LogMessage) { - queue := make(chan *LogMessage, GetenvInt("FLUX_INTERNAL_BUFFER", 1000)) +func NewConsumer(app *iris.Application, route *Route) (*Consumer, chan LogMessage) { + queue := make(chan LogMessage, GetenvInt("FLUX_INTERNAL_BUFFER", 1000)) consumer := &Consumer{ - BaseConsumer: BaseConsumer{ - HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message - MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ... - }, queue: queue, } @@ -40,7 +31,7 @@ func NewConsumer(app *iris.Application, route *Route) (*Consumer, chan *LogMessa return consumer, queue } -func NewWorkers(queue chan *LogMessage, metrics []*Metric) []*Worker { +func NewWorkers(queue chan LogMessage, metrics []*Metric) []*Worker { cnt := GetenvInt("FlUX_WORKERS", 2) workers := make([]*Worker, 0, cnt) @@ -51,7 +42,7 @@ func NewWorkers(queue chan *LogMessage, metrics []*Metric) []*Worker { return workers } -func NewWorker(queue chan *LogMessage, metrics []*Metric) *Worker { +func NewWorker(queue chan LogMessage, metrics []*Metric) *Worker { client, err := influx.NewHTTPClient(influx.HTTPConfig{ Addr: os.Getenv("FLUX_INFLUX_URL"), //"http://localhost:8086" }) diff --git a/src/flux/consumer-base.go b/src/flux/consumer-base.go index bf75c41..753ac61 100644 --- a/src/flux/consumer-base.go +++ b/src/flux/consumer-base.go @@ -8,20 +8,10 @@ import ( log "github.com/sirupsen/logrus" ) -type BaseConsumer struct { - HostFieldName string // name of "host" field in json log message - MessageFieldName string // name of "message" field in json log message -} - -type LogMessage struct { - Host string - Message string +type BaseConsumer struct{} - data map[string]interface{} -} - -func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []*LogMessage { - messages := make([]*LogMessage, 0) // TODO: change to channel (don't parse all json messages into memory) +func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []LogMessage { + messages := make([]LogMessage, 0) // TODO: change to channel (don't parse all json messages into memory) reader := bufio.NewReader(body) for { @@ -32,40 +22,19 @@ func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []*LogMessage { break } - var data map[string]interface{} - var host, message string - var logMessage *LogMessage + var message LogMessage - if err := json.Unmarshal(bytes, &data); err != nil { + if err := json.Unmarshal(bytes, &message); err != nil { log.WithError(err).WithField("line", string(bytes)).Error("can't parse line from POST body as JSON") goto eofcheck } - if value, ok := data[c.HostFieldName].(string); ok { - host = value - } else { - log.WithField("field_name", c.HostFieldName). - WithField("value", data[c.HostFieldName]). - Error("can't find/convert 'host' field from JSON to string") + if !message.Validate() { + log.WithField("line", string(bytes)).Error("skip invalid message") goto eofcheck } - if value, ok := data[c.MessageFieldName].(string); ok { - message = value - } else { - log.WithField("field_name", c.MessageFieldName). - WithField("value", data[c.MessageFieldName]). - Error("can't find/convert 'message' field from JSON to string") - goto eofcheck - } - - logMessage = &LogMessage{ - Host: host, - Message: message, - data: data, - } - - messages = append(messages, logMessage) + messages = append(messages, message) eofcheck: if errReader == io.EOF { diff --git a/src/flux/consumer-root.go b/src/flux/consumer-root.go index c360b27..99ccfcf 100644 --- a/src/flux/consumer-root.go +++ b/src/flux/consumer-root.go @@ -14,16 +14,7 @@ type RootConsumer struct { func (c *RootConsumer) Handle(ctx iris.Context) { for _, message := range c.parseJSONs(ctx.Request().Body) { - var route string - - if value, ok := message.data[c.RouteFieldName].(string); ok { - route = value - } else { - log.WithField("field_name", c.RouteFieldName). - WithField("value", message.data[c.RouteFieldName]). - Error("can't find/convert 'route' field from JSON to string") - continue - } + route := message.Route() if consumer, found := c.consumers[route]; found { log.WithField("message", message).WithField("route", route).Debug("consumer: sending message to queue of route") diff --git a/src/flux/consumer.go b/src/flux/consumer.go index c3e4ff2..42bf87c 100644 --- a/src/flux/consumer.go +++ b/src/flux/consumer.go @@ -6,7 +6,7 @@ import ( ) type Consumer struct { - queue chan *LogMessage + queue chan LogMessage BaseConsumer } diff --git a/src/flux/log-message.go b/src/flux/log-message.go new file mode 100644 index 0000000..f871280 --- /dev/null +++ b/src/flux/log-message.go @@ -0,0 +1,51 @@ +package main + +import log "github.com/sirupsen/logrus" + +var ( + HostFieldName string // name of "host" field in json log message + MessageFieldName string // name of "message" field in json log message + RouteFieldName string // name of "route" field in json log message +) + +func init() { + HostFieldName = GetenvStr("FLUX_HOST_FIELD_NAME", "HOST") + MessageFieldName = GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE") + RouteFieldName = GetenvStr("FLUX_ROUTE_FIELD_NAME", "ROUTE") +} + +type LogMessage map[string]interface{} + +func (l LogMessage) Host() string { + return l.getFieldStr(HostFieldName) +} + +func (l LogMessage) Message() string { + return l.getFieldStr(MessageFieldName) +} + +func (l LogMessage) Route() string { + return l.getFieldStr(RouteFieldName) +} + +func (l LogMessage) Validate() bool { + return l.hasField(HostFieldName) && + l.hasField(MessageFieldName) && + l.hasField(RouteFieldName) +} + +func (l LogMessage) getFieldStr(name string) string { + if value, ok := l[name].(string); ok { + return value + } else { + log.WithField("field_name", name). + WithField("value", l[name]). + Error("can't find/convert field from JSON to string") + return "" + } +} + +func (l LogMessage) hasField(name string) bool { + _, found := l[name] + return found +} diff --git a/src/flux/worker.go b/src/flux/worker.go index 5802e44..ccf372a 100644 --- a/src/flux/worker.go +++ b/src/flux/worker.go @@ -14,7 +14,7 @@ type Worker struct { CommitAmount int Database string - queue chan *LogMessage + queue chan LogMessage batch influx.BatchPoints client influx.Client metrics []*Metric @@ -26,13 +26,13 @@ func (w *Worker) Start() { for { select { - case line, ok := <-w.queue: + case message, ok := <-w.queue: if !ok { w.Flush() return // queue closed, we can exit } - w.Process(line) + w.Process(message) case <-tick: w.Flush() // send bulk query to influx every tick event @@ -53,14 +53,14 @@ func (w *Worker) CreateBatch() { w.batch = batch } -func (w *Worker) Process(message *LogMessage) { +func (w *Worker) Process(message LogMessage) { for _, metric := range w.metrics { - matches := metric.re.FindStringSubmatch(message.Message) + matches := metric.re.FindStringSubmatch(message.Message()) if len(matches) > 0 { log.WithField("matches", matches).Debug("worker: found matches") - tags, values, data, err := w.GetTagsValues(message.Message, metric, matches) + tags, values, data, err := w.GetTagsValues(message.Message(), metric, matches) log.WithField("tags", tags). WithField("values", values). @@ -71,7 +71,7 @@ func (w *Worker) Process(message *LogMessage) { } if metric.script != nil { - tags, values, err = w.ProcessScript(metric.script, message.Message, tags, values, data) + tags, values, err = w.ProcessScript(metric.script, message.Message(), tags, values, data) if err != nil { break } @@ -83,7 +83,7 @@ func (w *Worker) Process(message *LogMessage) { // add hostname as tag to point // NOTE: it overwrites any "tag_host" value from regexp and script - tags["host"] = message.Host + tags["host"] = message.Host() log.WithField("tags", tags). WithField("values", values).