Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LokI Exporter - Adding a feature for loki exporter to encode JSON for log entry #3874

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions exporter/lokiexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ The following settings can be optionally configured:

- `headers` (no default): Name/value pairs added to the HTTP request headers.

- `format` (default = body): Set the log entry line format. This can be set to 'json' (the entire json encoded log record) or 'body' (the log record body field as a string).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it, finaly it's not too intrusive as potential big change !


Example:

```yaml
Expand Down
2 changes: 2 additions & 0 deletions exporter/lokiexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {

// Labels defines how labels should be applied to log streams sent to Loki.
Labels LabelsConfig `mapstructure:"labels"`
// Allows you to choose the entry format in the exporter
Format string `mapstructure:"format"`
}

func (c *Config) validate() error {
Expand Down
55 changes: 54 additions & 1 deletion exporter/lokiexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, 2, len(cfg.Exporters))
assert.Equal(t, 3, len(cfg.Exporters))

actualCfg := cfg.Exporters[config.NewIDWithName(typeStr, "allsettings")].(*Config)
expectedCfg := Config{
Expand Down Expand Up @@ -87,6 +87,59 @@ func TestLoadConfig(t *testing.T) {
"severity": "severity",
},
},
Format: "body",
}
require.Equal(t, &expectedCfg, actualCfg)
}

func TestJsonLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)

factory := NewFactory()
factories.Exporters[config.Type(typeStr)] = factory
cfg, err := configtest.LoadConfig(path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, 3, len(cfg.Exporters))

actualCfg := cfg.Exporters[config.NewIDWithName(typeStr, "json")].(*Config)
expectedCfg := Config{
ExporterSettings: config.NewExporterSettings(config.NewIDWithName(typeStr, "json")),
HTTPClientSettings: confighttp.HTTPClientSettings{
Headers: map[string]string{},
Endpoint: "https://loki:3100/loki/api/v1/push",
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "",
CertFile: "",
KeyFile: "",
},
Insecure: false,
},
ReadBufferSize: 0,
WriteBufferSize: 524288,
Timeout: 30000000000,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 5000000000,
MaxInterval: 30000000000,
MaxElapsedTime: 300000000000,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 10,
QueueSize: 5000,
},
TenantID: "example",
Labels: LabelsConfig{
Attributes: map[string]string{},
ResourceAttributes: map[string]string{},
},
Format: "json",
}
require.Equal(t, &expectedCfg, actualCfg)
}
Expand Down
37 changes: 37 additions & 0 deletions exporter/lokiexporter/encode_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package lokiexporter

import (
"encoding/json"

"go.opentelemetry.io/collector/model/pdata"
)

// JSON representation of the LogRecord as described by https://developers.google.com/protocol-buffers/docs/proto3#json

type lokiEntry struct {
Name string `json:"name,omitempty"`
Body string `json:"body,omitempty"`
TraceID string `json:"traceid,omitempty"`
SpanID string `json:"spanid,omitempty"`
Severity string `json:"severity,omitempty"`
Attributes map[string]interface{} `json:"attributes,omitempty"`
}

func encodeJSON(lr pdata.LogRecord) (string, error) {
var logRecord lokiEntry
var jsonRecord []byte

logRecord = lokiEntry{
Name: lr.Name(),
Body: lr.Body().StringVal(),
TraceID: lr.TraceID().HexString(),
SpanID: lr.SpanID().HexString(),
Severity: lr.SeverityText(),
Attributes: lr.Attributes().AsRaw()}

jsonRecord, err := json.Marshal(logRecord)
if err != nil {
return "", err
}
return string(jsonRecord), nil
}
35 changes: 35 additions & 0 deletions exporter/lokiexporter/encode_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package lokiexporter

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
)

func exampleLog() pdata.LogRecord {

buffer := pdata.NewLogRecord()
buffer.Body().SetStringVal("Example log")
buffer.SetName("name")
buffer.SetSeverityText("error")
buffer.Attributes().Insert("attr1", pdata.NewAttributeValueString("1"))
buffer.Attributes().Insert("attr2", pdata.NewAttributeValueString("2"))
buffer.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4}))
buffer.SetSpanID(pdata.NewSpanID([8]byte{5, 6, 7, 8}))

return buffer
}

func exampleJSON() string {
jsonExample := `{"name":"name","body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"}}`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow some logic and to be more explicit on loki side, why not prefix your arbitrary keys (name, body, traceid...) by someting like "otel_collector_lokiexporter_" ?
It can avoid confusion with keys in logs received by another channel than otel collector for exemple.

I would prefer use "otel.collector.lokiexporter." to be more consistent with OTEL but if you apply | json on it, loki replace all points by underscores anyway at the end...

return jsonExample
}

func TestConvert(t *testing.T) {
in := exampleJSON()
out, err := encodeJSON(exampleLog())
t.Log(in)
t.Log(out, err)
assert.Equal(t, in, out)
}
40 changes: 33 additions & 7 deletions exporter/lokiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,24 @@ import (
)

type lokiExporter struct {
config *Config
logger *zap.Logger
client *http.Client
wg sync.WaitGroup
config *Config
logger *zap.Logger
client *http.Client
wg sync.WaitGroup
convert func(pdata.LogRecord) (*logproto.Entry, error)
}

func newExporter(config *Config, logger *zap.Logger) *lokiExporter {
return &lokiExporter{
lokiexporter := &lokiExporter{
config: config,
logger: logger,
}
if config.Format == "json" {
lokiexporter.convert = convertLogToJSONEntry
} else {
lokiexporter.convert = convertLogToLokiEntry
}
return lokiexporter
}

func (l *lokiExporter) pushLogData(ctx context.Context, ld pdata.Logs) error {
Expand Down Expand Up @@ -135,7 +142,15 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n
continue
}
labels := mergedLabels.String()
entry := convertLogToLokiEntry(log)
var entry *logproto.Entry
var err error
entry, err = l.convert(log)
if err != nil {
// Couldn't convert to JSON so dropping log.
numDroppedLogs++
l.logger.Error("Failed to convert to JSON - Dropping Log", zap.Error(err))
continue
}

if stream, ok := streams[labels]; ok {
stream.Entries = append(stream.Entries, *entry)
Expand Down Expand Up @@ -195,9 +210,20 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap,
return ls
}

func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry {
func convertLogToLokiEntry(lr pdata.LogRecord) (*logproto.Entry, error) {
return &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: lr.Body().StringVal(),
}, nil
}

func convertLogToJSONEntry(lr pdata.LogRecord) (*logproto.Entry, error) {
line, err := encodeJSON(lr)
if err != nil {
return nil, err
}
return &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: line,
}, nil
}
18 changes: 17 additions & 1 deletion exporter/lokiexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func TestExporter_convertLogToLokiEntry(t *testing.T) {
lr.Body().SetStringVal("log message")
lr.SetTimestamp(ts)

entry := convertLogToLokiEntry(lr)
entry, _ := convertLogToLokiEntry(lr)

expEntry := &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Expand Down Expand Up @@ -575,3 +575,19 @@ func TestExporter_stopAlwaysReturnsNil(t *testing.T) {
require.NotNil(t, exp)
require.NoError(t, exp.stop(context.Background()))
}

func TestExporter_convertLogtoJSONEntry(t *testing.T) {
ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds())
lr := pdata.NewLogRecord()
lr.Body().SetStringVal("log message")
lr.SetTimestamp(ts)

entry, err := convertLogToJSONEntry(lr)
expEntry := &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: `{"body":"log message"}`,
}
require.Nil(t, err)
require.NotNil(t, entry)
require.Equal(t, expEntry, entry)
}
1 change: 1 addition & 0 deletions exporter/lokiexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func createDefaultConfig() config.Exporter {
RetrySettings: exporterhelper.DefaultRetrySettings(),
QueueSettings: exporterhelper.DefaultQueueSettings(),
TenantID: "",
Format: "body",
Labels: LabelsConfig{
Attributes: map[string]string{},
ResourceAttributes: map[string]string{},
Expand Down
6 changes: 5 additions & 1 deletion exporter/lokiexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ exporters:
container.name: "container_name"
k8s.cluster.name: "k8s_cluster_name"
severity: "severity"
loki/json:
endpoint: "https://loki:3100/loki/api/v1/push"
tenant_id: "example"
format: "json"
loki/allsettings:
endpoint: "https://loki:3100/loki/api/v1/push"
tenant_id: "example"
Expand Down Expand Up @@ -48,4 +52,4 @@ service:
logs:
receivers: [ nop ]
processors: [ nop ]
exporters: [ loki, loki/allsettings ]
exporters: [ loki, loki/allsettings, loki/json ]