diff --git a/src/flux/config.go b/src/flux/config.go index 4482f5a..b4d1985 100644 --- a/src/flux/config.go +++ b/src/flux/config.go @@ -9,18 +9,35 @@ import ( log "github.com/sirupsen/logrus" ) -func NewConsumer(app *iris.Application, route *Route) chan *LogMessage { +func NewRootConsumer(app *iris.Application) *RootConsumer { + consumer := &RootConsumer{ + BaseConsumer: BaseConsumer{ + HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message + MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ... + }, + RouteFieldName: GetenvStr("FLUX_ROUTE_FIELD_NAME", "ROUTE"), + consumers: make(map[string]*Consumer), + } + + app.Post("/", consumer.Handle) + + return consumer +} + +func NewConsumer(app *iris.Application, route *Route) (*Consumer, chan *LogMessage) { queue := make(chan *LogMessage, GetenvInt("FLUX_INTERNAL_BUFFER", 1000)) consumer := &Consumer{ - HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message - MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ... - queue: queue, + BaseConsumer: BaseConsumer{ + HostFieldName: GetenvStr("FLUX_HOST_FIELD_NAME", "HOST"), // default value as in syslog message + MessageFieldName: GetenvStr("FLUX_MESSAGE_FIELD_NAME", "MESSAGE"), // ... + }, + queue: queue, } app.Post(route.Name, consumer.Handle) - return queue + return consumer, queue } func NewWorkers(queue chan *LogMessage, metrics []*Metric) []*Worker { diff --git a/src/flux/consumer-base.go b/src/flux/consumer-base.go new file mode 100644 index 0000000..c42735d --- /dev/null +++ b/src/flux/consumer-base.go @@ -0,0 +1,76 @@ +package main + +import ( + "bufio" + "encoding/json" + "io" + + log "github.com/sirupsen/logrus" +) + +type BaseConsumer struct { + HostFieldName string // name of "host" field in json log message + MessageFieldName string // name of "message" field in json log message +} + +type LogMessage struct { + Host string + Message string + + data map[string]interface{} +} + +func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []*LogMessage { + messages := make([]*LogMessage, 0) // TODO: change to channel (don't parse all json messages into memory) + + reader := bufio.NewReader(body) + for { + bytes, errReader := reader.ReadBytes('\n') + + if errReader != nil && errReader != io.EOF { + log.WithError(errReader).Error("can't read line from POST body") + break + } + + var data map[string]interface{} + + if err := json.Unmarshal(bytes, &data); err != nil { + log.WithError(err).WithField("line", string(bytes)).Error("can't parse line from POST body as JSON") + break + } + + var host, message string + + if value, ok := data[c.HostFieldName].(string); ok { + host = value + } else { + log.WithField("field_name", c.HostFieldName). + WithField("value", data[c.HostFieldName]). + Error("can't find/convert 'host' field from JSON to string") + break + } + + if value, ok := data[c.MessageFieldName].(string); ok { + message = value + } else { + log.WithField("field_name", c.MessageFieldName). + WithField("value", data[c.MessageFieldName]). + Error("can't find/convert 'message' field from JSON to string") + break + } + + logMessage := &LogMessage{ + Host: host, + Message: message, + data: data, + } + + messages = append(messages, logMessage) + + if errReader == io.EOF { + break + } + } + + return messages +} diff --git a/src/flux/consumer-root.go b/src/flux/consumer-root.go new file mode 100644 index 0000000..c360b27 --- /dev/null +++ b/src/flux/consumer-root.go @@ -0,0 +1,39 @@ +package main + +import ( + "github.com/kataras/iris" + log "github.com/sirupsen/logrus" +) + +type RootConsumer struct { + RouteFieldName string + consumers map[string]*Consumer // route name to consumer map + + BaseConsumer +} + +func (c *RootConsumer) Handle(ctx iris.Context) { + for _, message := range c.parseJSONs(ctx.Request().Body) { + var route string + + if value, ok := message.data[c.RouteFieldName].(string); ok { + route = value + } else { + log.WithField("field_name", c.RouteFieldName). + WithField("value", message.data[c.RouteFieldName]). + Error("can't find/convert 'route' field from JSON to string") + continue + } + + if consumer, found := c.consumers[route]; found { + log.WithField("message", message).WithField("route", route).Debug("consumer: sending message to queue of route") + consumer.queue <- message + } else { + log.WithField("message", message).WithField("route", route).Error("consumer: can't found consumer for route") + } + } +} + +func (c *RootConsumer) AddConsumer(route string, consumer *Consumer) { + c.consumers[route] = consumer +} diff --git a/src/flux/consumer.go b/src/flux/consumer.go index 6cb89d3..c3e4ff2 100644 --- a/src/flux/consumer.go +++ b/src/flux/consumer.go @@ -1,72 +1,19 @@ package main import ( - "bufio" - "encoding/json" - "io" - "github.com/kataras/iris/context" log "github.com/sirupsen/logrus" ) -type LogMessage struct { - Host string - Message string -} - type Consumer struct { - HostFieldName string // name of "host" field in json log message - MessageFieldName string // name of "message" field in json log message - queue chan *LogMessage + queue chan *LogMessage + + BaseConsumer } func (c *Consumer) Handle(ctx context.Context) { - reader := bufio.NewReader(ctx.Request().Body) - for { - bytes, errReader := reader.ReadBytes('\n') - - if errReader != nil && errReader != io.EOF { - log.WithError(errReader).Error("can't read line from POST body") - return - } - - var data map[string]interface{} - - if err := json.Unmarshal(bytes, &data); err != nil { - log.WithError(err).WithField("line", string(bytes)).Error("can't parse line from POST body as JSON") - return - } - - var host, message string - - if value, ok := data[c.HostFieldName].(string); ok { - host = value - } else { - log.WithField("field_name", c.HostFieldName). - WithField("value", data[c.HostFieldName]). - Error("can't find/convert 'host' field from JSON to string") - return - } - - if value, ok := data[c.MessageFieldName].(string); ok { - message = value - } else { - log.WithField("field_name", c.MessageFieldName). - WithField("value", data[c.MessageFieldName]). - Error("can't find/convert 'message' field from JSON to string") - return - } - - logMessage := &LogMessage{ - Host: host, - Message: message, - } - + for _, message := range c.parseJSONs(ctx.Request().Body) { log.WithField("message", message).Debug("consumer: sending message to queue") - c.queue <- logMessage - - if errReader == io.EOF { - return - } + c.queue <- message } } diff --git a/src/flux/main.go b/src/flux/main.go index af6da2c..01538b0 100644 --- a/src/flux/main.go +++ b/src/flux/main.go @@ -25,8 +25,11 @@ func main() { PrintConfig(grammar, *debugConfig, *verbose) + rootConsumer := NewRootConsumer(app) + for _, route := range grammar.Routes { - queue := NewConsumer(app, route) + consumer, queue := NewConsumer(app, route) + rootConsumer.AddConsumer(route.Name, consumer) workers := NewWorkers(queue, route.Metrics) for _, worker := range workers {