Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki Exporter - Adding a feature for loki exporter to encode JSON for log entry #5846

Merged
merged 28 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
491e6e1
initial commit
crearys May 21, 2021
18d1808
revert un-needed changes
crearys May 21, 2021
403b6a1
test json encode as entry
crearys May 21, 2021
4c695b7
add encoding files
crearys May 21, 2021
caa4507
merge loki encoding with the resource atttribute branch
crearys May 24, 2021
071a13c
add format option to loki exporter config
crearys May 25, 2021
bb37818
add test for loki/json test data
crearys May 25, 2021
728f4bf
improve variable names and comments
crearys Jun 4, 2021
bf30cc5
pull from upstream main and update loki exporter files to fix tests
crearys Jun 23, 2021
bb253b3
Create feature for loki exporter to encode json
crearys Jun 24, 2021
11d5439
mapping convert functions
crearys Jul 22, 2021
fcdb38b
Fix linting errors
crearys Jul 29, 2021
e01ed0f
Merge remote-tracking branch 'upstream/main' into lokiencoding-pr
crearys Jul 29, 2021
ebdcccb
update pdata package
crearys Jul 29, 2021
45a52eb
pull upstream
crearys Sep 23, 2021
e80de58
update to use otel 0.36
crearys Sep 23, 2021
3d9ef4f
Merge branch 'lokiencoding-pr' of https://github.com/crearys/opentele…
gillg Oct 13, 2021
fc0c481
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
gillg Oct 20, 2021
3139761
Add resources in encoded json structure
gillg Oct 20, 2021
7fd92f5
Add missing licence
gillg Oct 21, 2021
a855093
Fix review
gillg Oct 25, 2021
30edcbe
Group similar errors
gillg Oct 26, 2021
0edc8fb
Fix serialization
gillg Dec 4, 2021
42d664c
Add a test for unsuported body type
gillg Dec 4, 2021
f2f50d6
Fix tests
gillg Dec 4, 2021
7e7fa2f
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
gillg Dec 4, 2021
3d11fb3
Fix lint
gillg Dec 6, 2021
119a6a6
Change error level to debug for dropped logs to avoid flood, keeping …
gillg Dec 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions exporter/lokiexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ The following settings can be optionally configured:

- `headers` (no default): Name/value pairs added to the HTTP request headers.

- `format` (default = body): Set the log entry line format. This can be set to 'json' (the entire JSON encoded log record) or 'body' (the log record body field as a string).

Example:

```yaml
Expand Down
2 changes: 2 additions & 0 deletions exporter/lokiexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {

// Labels defines how labels should be applied to log streams sent to Loki.
Labels LabelsConfig `mapstructure:"labels"`
// Allows you to choose the entry format in the exporter
Format string `mapstructure:"format"`
}

func (c *Config) validate() error {
Expand Down
55 changes: 54 additions & 1 deletion exporter/lokiexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, 2, len(cfg.Exporters))
assert.Equal(t, 3, len(cfg.Exporters))

actualCfg := cfg.Exporters[config.NewComponentIDWithName(typeStr, "allsettings")].(*Config)
expectedCfg := Config{
Expand Down Expand Up @@ -87,6 +87,59 @@ func TestLoadConfig(t *testing.T) {
"severity": "severity",
},
},
Format: "body",
}
require.Equal(t, &expectedCfg, actualCfg)
}

func TestJSONLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)

factory := NewFactory()
factories.Exporters[config.Type(typeStr)] = factory
cfg, err := configtest.LoadConfig(path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, 3, len(cfg.Exporters))

actualCfg := cfg.Exporters[config.NewComponentIDWithName(typeStr, "json")].(*Config)
expectedCfg := Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "json")),
HTTPClientSettings: confighttp.HTTPClientSettings{
Headers: map[string]string{},
Endpoint: "https://loki:3100/loki/api/v1/push",
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "",
CertFile: "",
KeyFile: "",
},
Insecure: false,
},
ReadBufferSize: 0,
WriteBufferSize: 524288,
Timeout: time.Second * 10,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 10,
QueueSize: 5000,
},
TenantID: "example",
Labels: LabelsConfig{
Attributes: map[string]string{},
ResourceAttributes: map[string]string{},
},
Format: "json",
}
require.Equal(t, &expectedCfg, actualCfg)
}
Expand Down
54 changes: 54 additions & 0 deletions exporter/lokiexporter/encode_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lokiexporter

import (
"encoding/json"

"go.opentelemetry.io/collector/model/pdata"
)

// JSON representation of the LogRecord as described by https://developers.google.com/protocol-buffers/docs/proto3#json

type lokiEntry struct {
Name string `json:"name,omitempty"`
Body string `json:"body,omitempty"`
TraceID string `json:"traceid,omitempty"`
SpanID string `json:"spanid,omitempty"`
Severity string `json:"severity,omitempty"`
Attributes map[string]interface{} `json:"attributes,omitempty"`
Resources map[string]interface{} `json:"resources,omitempty"`
}

func encodeJSON(lr pdata.LogRecord, res pdata.Resource) (string, error) {
var logRecord lokiEntry
var jsonRecord []byte

logRecord = lokiEntry{
Name: lr.Name(),
Body: lr.Body().StringVal(),
TraceID: lr.TraceID().HexString(),
SpanID: lr.SpanID().HexString(),
Severity: lr.SeverityText(),
Attributes: lr.Attributes().AsRaw(),
Resources: res.Attributes().AsRaw(),
}

jsonRecord, err := json.Marshal(logRecord)
if err != nil {
return "", err
}
return string(jsonRecord), nil
}
52 changes: 52 additions & 0 deletions exporter/lokiexporter/encode_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lokiexporter

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
)

func exampleLog() (pdata.LogRecord, pdata.Resource) {

buffer := pdata.NewLogRecord()
buffer.Body().SetStringVal("Example log")
buffer.SetName("name")
buffer.SetSeverityText("error")
buffer.Attributes().Insert("attr1", pdata.NewAttributeValueString("1"))
buffer.Attributes().Insert("attr2", pdata.NewAttributeValueString("2"))
buffer.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4}))
buffer.SetSpanID(pdata.NewSpanID([8]byte{5, 6, 7, 8}))

resource := pdata.NewResource()
resource.Attributes().Insert("host.name", pdata.NewAttributeValueString("something"))

return buffer, resource
}

func exampleJSON() string {
jsonExample := `{"name":"name","body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"},"resources":{"host.name":"something"}}`
return jsonExample
}

func TestConvert(t *testing.T) {
in := exampleJSON()
out, err := encodeJSON(exampleLog())
t.Log(in)
t.Log(out, err)
assert.Equal(t, in, out)
}
2 changes: 0 additions & 2 deletions exporter/lokiexporter/example/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ services:
container_name: otel
command:
- "--config=/etc/otel-collector-config.yml"
# Memory Ballast size should be max 1/3 to 1/2 of memory.
- "--mem-ballast-size-mib=683"
volumes:
- ./otel-collector-config.yml:/etc/otel-collector-config.yml
ports:
Expand Down
8 changes: 8 additions & 0 deletions exporter/lokiexporter/example/otel-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,23 @@ processors:
exporters:
loki:
endpoint: "http://loki:3100/loki/api/v1/push"
# Will encode the whole OTEL message in json
# This format happen after extracting labels
format: json
labels:
attributes:
container_name: ""
source: ""
resources:
host.name: "hostname"

extensions:
health_check:
pprof:
zpages:
memory_ballast:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you actually need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but was to be consistent with the previous exemple. We can remove it if you think it's better.
Does memory ballast shouldn't always be used when we receive pushed data like logs ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We recommend that people always use things like the batch processor and memory ballast, but I find it confusing to add something not relevant to what is being demonstrated.

# Memory Ballast size should be max 1/3 to 1/2 of memory.
size_mib: 64

service:
extensions: [pprof, zpages, health_check]
Expand Down
55 changes: 48 additions & 7 deletions exporter/lokiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package lokiexporter
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -30,23 +31,31 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/third_party/loki/logproto"
)

type lokiExporter struct {
config *Config
logger *zap.Logger
client *http.Client
wg sync.WaitGroup
config *Config
logger *zap.Logger
client *http.Client
wg sync.WaitGroup
convert func(pdata.LogRecord, pdata.Resource) (*logproto.Entry, error)
}

func newExporter(config *Config, logger *zap.Logger) *lokiExporter {
return &lokiExporter{
lokiexporter := &lokiExporter{
config: config,
logger: logger,
}
if config.Format == "json" {
lokiexporter.convert = convertLogToJSONEntry
} else {
lokiexporter.convert = convertLogBodyToEntry
}
return lokiexporter
}

func (l *lokiExporter) pushLogData(ctx context.Context, ld pdata.Logs) error {
Expand Down Expand Up @@ -119,6 +128,8 @@ func (l *lokiExporter) stop(context.Context) (err error) {
}

func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, numDroppedLogs int) {
var errs error

streams := make(map[string]*logproto.Stream)
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
Expand All @@ -135,7 +146,24 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n
continue
}
labels := mergedLabels.String()
entry := convertLogToLokiEntry(log)
var entry *logproto.Entry
var err error
entry, err = l.convert(log, resource)
if err != nil {
// Couldn't convert so dropping log.
numDroppedLogs++
errs = multierr.Append(
errs,
errors.New(
fmt.Sprint(
"failed to convert, dropping log",
zap.String("format", l.config.Format),
zap.Error(err),
),
),
)
continue
}

if stream, ok := streams[labels]; ok {
stream.Entries = append(stream.Entries, *entry)
Expand All @@ -150,6 +178,8 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n
}
}

l.logger.Error("some logs has been dropped", zap.Error(errs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be lost here. Shouldn't you wrap it in a conditional?


pr = &logproto.PushRequest{
Streams: make([]logproto.Stream, len(streams)),
}
Expand Down Expand Up @@ -195,9 +225,20 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap,
return ls
}

func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry {
func convertLogBodyToEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) {
return &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: lr.Body().StringVal(),
}, nil
}

func convertLogToJSONEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) {
line, err := encodeJSON(lr, res)
if err != nil {
return nil, err
}
return &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about what is the behavior when the timestamp is bigger than the upper boundary for int64, given that lr.Timestamp is a uint64, assuming that lr.Timestamp is user input. Not for this PR, as this pattern has been used before in this codebase already.

Line: line,
}, nil
}
24 changes: 22 additions & 2 deletions exporter/lokiexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,15 @@ func TestExporter_convertAttributesToLabels(t *testing.T) {
})
}

func TestExporter_convertLogToLokiEntry(t *testing.T) {
func TestExporter_convertLogBodyToEntry(t *testing.T) {
ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds())
lr := pdata.NewLogRecord()
lr.Body().SetStringVal("log message")
lr.SetTimestamp(ts)
res := pdata.NewResource()
res.Attributes().Insert("host.name", pdata.NewAttributeValueString("something"))

entry := convertLogToLokiEntry(lr)
entry, _ := convertLogBodyToEntry(lr, res)

expEntry := &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Expand Down Expand Up @@ -576,3 +578,21 @@ func TestExporter_stopAlwaysReturnsNil(t *testing.T) {
require.NotNil(t, exp)
require.NoError(t, exp.stop(context.Background()))
}

func TestExporter_convertLogtoJSONEntry(t *testing.T) {
ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds())
lr := pdata.NewLogRecord()
lr.Body().SetStringVal("log message")
lr.SetTimestamp(ts)
res := pdata.NewResource()
res.Attributes().Insert("host.name", pdata.NewAttributeValueString("something"))

entry, err := convertLogToJSONEntry(lr, res)
expEntry := &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: `{"body":"log message","resources":{"host.name":"something"}}`,
}
require.Nil(t, err)
require.NotNil(t, entry)
require.Equal(t, expEntry, entry)
}
1 change: 1 addition & 0 deletions exporter/lokiexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func createDefaultConfig() config.Exporter {
RetrySettings: exporterhelper.DefaultRetrySettings(),
QueueSettings: exporterhelper.DefaultQueueSettings(),
TenantID: "",
Format: "body",
Labels: LabelsConfig{
Attributes: map[string]string{},
ResourceAttributes: map[string]string{},
Expand Down
Loading