Skip to content

Commit

Permalink
(+) root consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ont committed Mar 28, 2018
1 parent ff2f895 commit 0b5248a
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 64 deletions.
27 changes: 22 additions & 5 deletions src/flux/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,35 @@ import (
log "github.com/sirupsen/logrus"
)

func NewConsumer(app *iris.Application, route *Route) chan *LogMessage {
func NewRootConsumer(app *iris.Application) *RootConsumer {
consumer := &RootConsumer{
BaseConsumer: BaseConsumer{
HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message
MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ...
},
RouteFieldName: GetenvStr("FLUX_ROUTE_FIELD_NAME", "ROUTE"),
consumers: make(map[string]*Consumer),
}

app.Post("/", consumer.Handle)

return consumer
}

func NewConsumer(app *iris.Application, route *Route) (*Consumer, chan *LogMessage) {
queue := make(chan *LogMessage, GetenvInt("FLUX_INTERNAL_BUFFER", 1000))

consumer := &Consumer{
HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message
MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ...
queue: queue,
BaseConsumer: BaseConsumer{
HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message
MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ...
},
queue: queue,
}

app.Post(route.Name, consumer.Handle)

return queue
return consumer, queue
}

func NewWorkers(queue chan *LogMessage, metrics []*Metric) []*Worker {
Expand Down
76 changes: 76 additions & 0 deletions src/flux/consumer-base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"bufio"
"encoding/json"
"io"

log "github.com/sirupsen/logrus"
)

type BaseConsumer struct {
HostFieldName string // name of "host" field in json log message
MessageFieldName string // name of "message" field in json log message
}

type LogMessage struct {
Host string
Message string

data map[string]interface{}
}

func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []*LogMessage {
messages := make([]*LogMessage, 0) // TODO: change to channel (don't parse all json messages into memory)

reader := bufio.NewReader(body)
for {
bytes, errReader := reader.ReadBytes('\n')

if errReader != nil && errReader != io.EOF {
log.WithError(errReader).Error("can't read line from POST body")
break
}

var data map[string]interface{}

if err := json.Unmarshal(bytes, &data); err != nil {
log.WithError(err).WithField("line", string(bytes)).Error("can't parse line from POST body as JSON")
break
}

var host, message string

if value, ok := data[c.HostFieldName].(string); ok {
host = value
} else {
log.WithField("field_name", c.HostFieldName).
WithField("value", data[c.HostFieldName]).
Error("can't find/convert 'host' field from JSON to string")
break
}

if value, ok := data[c.MessageFieldName].(string); ok {
message = value
} else {
log.WithField("field_name", c.MessageFieldName).
WithField("value", data[c.MessageFieldName]).
Error("can't find/convert 'message' field from JSON to string")
break
}

logMessage := &LogMessage{
Host: host,
Message: message,
data: data,
}

messages = append(messages, logMessage)

if errReader == io.EOF {
break
}
}

return messages
}
39 changes: 39 additions & 0 deletions src/flux/consumer-root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"github.com/kataras/iris"
log "github.com/sirupsen/logrus"
)

type RootConsumer struct {
RouteFieldName string
consumers map[string]*Consumer // route name to consumer map

BaseConsumer
}

func (c *RootConsumer) Handle(ctx iris.Context) {
for _, message := range c.parseJSONs(ctx.Request().Body) {
var route string

if value, ok := message.data[c.RouteFieldName].(string); ok {
route = value
} else {
log.WithField("field_name", c.RouteFieldName).
WithField("value", message.data[c.RouteFieldName]).
Error("can't find/convert 'route' field from JSON to string")
continue
}

if consumer, found := c.consumers[route]; found {
log.WithField("message", message).WithField("route", route).Debug("consumer: sending message to queue of route")
consumer.queue <- message
} else {
log.WithField("message", message).WithField("route", route).Error("consumer: can't found consumer for route")
}
}
}

func (c *RootConsumer) AddConsumer(route string, consumer *Consumer) {
c.consumers[route] = consumer
}
63 changes: 5 additions & 58 deletions src/flux/consumer.go
Original file line number Diff line number Diff line change
@@ -1,72 +1,19 @@
package main

import (
"bufio"
"encoding/json"
"io"

"github.com/kataras/iris/context"
log "github.com/sirupsen/logrus"
)

type LogMessage struct {
Host string
Message string
}

type Consumer struct {
HostFieldName string // name of "host" field in json log message
MessageFieldName string // name of "message" field in json log message
queue chan *LogMessage
queue chan *LogMessage

BaseConsumer
}

func (c *Consumer) Handle(ctx context.Context) {
reader := bufio.NewReader(ctx.Request().Body)
for {
bytes, errReader := reader.ReadBytes('\n')

if errReader != nil && errReader != io.EOF {
log.WithError(errReader).Error("can't read line from POST body")
return
}

var data map[string]interface{}

if err := json.Unmarshal(bytes, &data); err != nil {
log.WithError(err).WithField("line", string(bytes)).Error("can't parse line from POST body as JSON")
return
}

var host, message string

if value, ok := data[c.HostFieldName].(string); ok {
host = value
} else {
log.WithField("field_name", c.HostFieldName).
WithField("value", data[c.HostFieldName]).
Error("can't find/convert 'host' field from JSON to string")
return
}

if value, ok := data[c.MessageFieldName].(string); ok {
message = value
} else {
log.WithField("field_name", c.MessageFieldName).
WithField("value", data[c.MessageFieldName]).
Error("can't find/convert 'message' field from JSON to string")
return
}

logMessage := &LogMessage{
Host: host,
Message: message,
}

for _, message := range c.parseJSONs(ctx.Request().Body) {
log.WithField("message", message).Debug("consumer: sending message to queue")
c.queue <- logMessage

if errReader == io.EOF {
return
}
c.queue <- message
}
}
5 changes: 4 additions & 1 deletion src/flux/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ func main() {

PrintConfig(grammar, *debugConfig, *verbose)

rootConsumer := NewRootConsumer(app)

for _, route := range grammar.Routes {
queue := NewConsumer(app, route)
consumer, queue := NewConsumer(app, route)
rootConsumer.AddConsumer(route.Name, consumer)

workers := NewWorkers(queue, route.Metrics)
for _, worker := range workers {
Expand Down

0 comments on commit 0b5248a

Please sign in to comment.