Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LokI Exporter - Adding a feature for loki exporter to encode JSON for log entry #3874

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions exporter/lokiexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ The following settings can be optionally configured:

- `headers` (no default): Name/value pairs added to the HTTP request headers.

- `format` (default = body): Set the log entry line format. This can be set to 'json' (the entire json encoded log record) or 'body' (the log record body field as a string).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it, finaly it's not too intrusive as potential big change !


Example:

```yaml
Expand Down
2 changes: 2 additions & 0 deletions exporter/lokiexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {

// Labels defines how labels should be applied to log streams sent to Loki.
Labels LabelsConfig `mapstructure:"labels"`
// Allows you to choose the entry format in the exporter
Format string `mapstructure:"format"`
}

func (c *Config) validate() error {
Expand Down
55 changes: 54 additions & 1 deletion exporter/lokiexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, 2, len(cfg.Exporters))
assert.Equal(t, 3, len(cfg.Exporters))

actualCfg := cfg.Exporters[config.NewIDWithName(typeStr, "allsettings")].(*Config)
expectedCfg := Config{
Expand Down Expand Up @@ -87,6 +87,59 @@ func TestLoadConfig(t *testing.T) {
"severity": "severity",
},
},
Format: "body",
}
require.Equal(t, &expectedCfg, actualCfg)
}

func TestJsonLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)

factory := NewFactory()
factories.Exporters[config.Type(typeStr)] = factory
cfg, err := configtest.LoadConfig(path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, 3, len(cfg.Exporters))

actualCfg := cfg.Exporters[config.NewIDWithName(typeStr, "json")].(*Config)
expectedCfg := Config{
ExporterSettings: config.NewExporterSettings(config.NewIDWithName(typeStr, "json")),
HTTPClientSettings: confighttp.HTTPClientSettings{
Headers: map[string]string{},
Endpoint: "https://loki:3100/loki/api/v1/push",
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "",
CertFile: "",
KeyFile: "",
},
Insecure: false,
},
ReadBufferSize: 0,
WriteBufferSize: 524288,
Timeout: 30000000000,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 5000000000,
MaxInterval: 30000000000,
MaxElapsedTime: 300000000000,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 10,
QueueSize: 5000,
},
TenantID: "example",
Labels: LabelsConfig{
Attributes: map[string]string{},
ResourceAttributes: map[string]string{},
},
Format: "json",
}
require.Equal(t, &expectedCfg, actualCfg)
}
Expand Down
38 changes: 38 additions & 0 deletions exporter/lokiexporter/encode_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package lokiexporter

import (
"encoding/json"

"go.opentelemetry.io/collector/consumer/pdata"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)

// JSON representation of the LogRecord as described by https://developers.google.com/protocol-buffers/docs/proto3#json

type lokiEntry struct {
Name string `json:"name,omitempty"`
Body string `json:"body,omitempty"`
TraceID string `json:"traceid,omitempty"`
SpanID string `json:"spanid,omitempty"`
Severity string `json:"severity,omitempty"`
Attributes map[string]interface{} `json:"attributes,omitempty"`
}

func encodeJSON(lr pdata.LogRecord) (string, error) {
var logRecord lokiEntry
var jsonRecord []byte

logRecord = lokiEntry{
Name: lr.Name(),
Body: lr.Body().StringVal(),
TraceID: lr.TraceID().HexString(),
SpanID: lr.SpanID().HexString(),
Severity: lr.SeverityText(),
Attributes: tracetranslator.AttributeMapToMap(lr.Attributes())}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you created a full structure, why not add "Resources" ? Maybe conditionaly ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about nested structures on Grafana / Loki ?
In my memories I tested it and it was not working as expected. I'm not sure if loki is able to do something like {} | json | attributes.attr1 == "xxx" and on Grafana log pannel I think nested objects are not correctly rendered.

Does anyone can confirm ?
Do you think attributes could be flatened at json root (maybe with a key prefix) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on logs enriched by a resourcedetector processor, I confirm the Resources should be converted to structured logs data !
In theory some keys could be the same in attributes and resources (even if it doesn't makes sense...) so convertLogToLokiEntry should prefix them by something in my opinion.


jsonRecord, err := json.Marshal(logRecord)
if err != nil {
return "", err
}
return string(jsonRecord), nil
}
35 changes: 35 additions & 0 deletions exporter/lokiexporter/encode_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package lokiexporter

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/pdata"
)

func exampleLog() pdata.LogRecord {

buffer := pdata.NewLogRecord()
buffer.Body().SetStringVal("Example log")
buffer.SetName("name")
buffer.SetSeverityText("error")
buffer.Attributes().Insert("attr1", pdata.NewAttributeValueString("1"))
buffer.Attributes().Insert("attr2", pdata.NewAttributeValueString("2"))
buffer.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4}))
buffer.SetSpanID(pdata.NewSpanID([8]byte{5, 6, 7, 8}))

return buffer
}

func exampleJson() string {
jsonExample := `{"name":"name","body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"}}`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow some logic and to be more explicit on loki side, why not prefix your arbitrary keys (name, body, traceid...) by someting like "otel_collector_lokiexporter_" ?
It can avoid confusion with keys in logs received by another channel than otel collector for exemple.

I would prefer use "otel.collector.lokiexporter." to be more consistent with OTEL but if you apply | json on it, loki replace all points by underscores anyway at the end...

return jsonExample
}

func TestConvert(t *testing.T) {
in := exampleJson()
out, err := encodeJSON(exampleLog())
t.Log(in)
t.Log(out, err)
assert.Equal(t, in, out)
}
25 changes: 24 additions & 1 deletion exporter/lokiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,19 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n
continue
}
labels := mergedLabels.String()
entry := convertLogToLokiEntry(log)
var entry *logproto.Entry
if l.config.Format == "json" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the decision on format is not made at runtime, but rather at config time, what are your thoughts on making this decision early on and mapping the appropriate method ahead of time? This would remove the additional check per record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gramidt I have just pushed a new commit following your comments.

var err error
entry, err = convertLogToJsonEntry(log)
if err != nil {
// Couldn't convert to JSON so dropping log.
numDroppedLogs++
l.logger.Error("Failed to convert to JSON - Dropping Log", zap.Error(err))
continue
}
} else {
entry = convertLogToLokiEntry(log)
}

if stream, ok := streams[labels]; ok {
stream.Entries = append(stream.Entries, *entry)
Expand Down Expand Up @@ -201,3 +213,14 @@ func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry {
Line: lr.Body().StringVal(),
}
}

func convertLogToJsonEntry(lr pdata.LogRecord) (*logproto.Entry, error) {
line, err := encodeJSON(lr)
if err != nil {
return nil, err
}
return &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: line,
}, nil
}
16 changes: 16 additions & 0 deletions exporter/lokiexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,19 @@ func TestExporter_stopAlwaysReturnsNil(t *testing.T) {
require.NotNil(t, exp)
require.NoError(t, exp.stop(context.Background()))
}

func TestExporter_convertLogtoJsonEntry(t *testing.T) {
ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds())
lr := pdata.NewLogRecord()
lr.Body().SetStringVal("log message")
lr.SetTimestamp(ts)

entry, err := convertLogToJsonEntry(lr)
expEntry := &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: `{"body":"log message"}`,
}
require.Nil(t, err)
require.NotNil(t, entry)
require.Equal(t, expEntry, entry)
}
1 change: 1 addition & 0 deletions exporter/lokiexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func createDefaultConfig() config.Exporter {
RetrySettings: exporterhelper.DefaultRetrySettings(),
QueueSettings: exporterhelper.DefaultQueueSettings(),
TenantID: "",
Format: "body",
Labels: LabelsConfig{
Attributes: map[string]string{},
ResourceAttributes: map[string]string{},
Expand Down
7 changes: 6 additions & 1 deletion exporter/lokiexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ exporters:
container.name: "container_name"
k8s.cluster.name: "k8s_cluster_name"
severity: "severity"
loki/json:
endpoint: "https://loki:3100/loki/api/v1/push"
tenant_id: "example"
format: "json"
loki/allsettings:
endpoint: "https://loki:3100/loki/api/v1/push"
tenant_id: "example"
insecure: true
format: "body"
ca_file: /var/lib/mycert.pem
cert_file: certfile
key_file: keyfile
Expand Down Expand Up @@ -47,4 +52,4 @@ service:
logs:
receivers: [ nop ]
processors: [ nop ]
exporters: [ loki, loki/allsettings ]
exporters: [ loki, loki/allsettings, loki/json ]