diff --git a/src/flux/consumer-base.go b/src/flux/consumer-base.go index c42735d..bf75c41 100644 --- a/src/flux/consumer-base.go +++ b/src/flux/consumer-base.go @@ -33,21 +33,21 @@ func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []*LogMessage { } var data map[string]interface{} + var host, message string + var logMessage *LogMessage if err := json.Unmarshal(bytes, &data); err != nil { log.WithError(err).WithField("line", string(bytes)).Error("can't parse line from POST body as JSON") - break + goto eofcheck } - var host, message string - if value, ok := data[c.HostFieldName].(string); ok { host = value } else { log.WithField("field_name", c.HostFieldName). WithField("value", data[c.HostFieldName]). Error("can't find/convert 'host' field from JSON to string") - break + goto eofcheck } if value, ok := data[c.MessageFieldName].(string); ok { @@ -56,10 +56,10 @@ func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []*LogMessage { log.WithField("field_name", c.MessageFieldName). WithField("value", data[c.MessageFieldName]). Error("can't find/convert 'message' field from JSON to string") - break + goto eofcheck } - logMessage := &LogMessage{ + logMessage = &LogMessage{ Host: host, Message: message, data: data, @@ -67,6 +67,7 @@ func (c *BaseConsumer) parseJSONs(body io.ReadCloser) []*LogMessage { messages = append(messages, logMessage) + eofcheck: if errReader == io.EOF { break }