diff --git a/exporter/lokiexporter/README.md b/exporter/lokiexporter/README.md index 24fe932e4325..d63251a9edcf 100644 --- a/exporter/lokiexporter/README.md +++ b/exporter/lokiexporter/README.md @@ -44,6 +44,8 @@ The following settings can be optionally configured: - `headers` (no default): Name/value pairs added to the HTTP request headers. +- `format` (default = body): Set the log entry line format. This can be set to 'json' (the entire JSON encoded log record) or 'body' (the log record body field as a string). + Example: ```yaml diff --git a/exporter/lokiexporter/config.go b/exporter/lokiexporter/config.go index 9be4c7df3e71..48b33dc5d07f 100644 --- a/exporter/lokiexporter/config.go +++ b/exporter/lokiexporter/config.go @@ -36,6 +36,8 @@ type Config struct { // Labels defines how labels should be applied to log streams sent to Loki. Labels LabelsConfig `mapstructure:"labels"` + // Allows you to choose the entry format in the exporter + Format string `mapstructure:"format"` } func (c *Config) validate() error { diff --git a/exporter/lokiexporter/config_test.go b/exporter/lokiexporter/config_test.go index dfed214e0ee3..3dc7d40069d1 100644 --- a/exporter/lokiexporter/config_test.go +++ b/exporter/lokiexporter/config_test.go @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) - assert.Equal(t, 2, len(cfg.Exporters)) + assert.Equal(t, 3, len(cfg.Exporters)) actualCfg := cfg.Exporters[config.NewComponentIDWithName(typeStr, "allsettings")].(*Config) expectedCfg := Config{ @@ -87,6 +87,59 @@ func TestLoadConfig(t *testing.T) { "severity": "severity", }, }, + Format: "body", + } + require.Equal(t, &expectedCfg, actualCfg) +} + +func TestJSONLoadConfig(t *testing.T) { + factories, err := componenttest.NopFactories() + assert.Nil(t, err) + + factory := NewFactory() + factories.Exporters[config.Type(typeStr)] = factory + cfg, err := configtest.LoadConfig(path.Join(".", "testdata", "config.yaml"), factories) + + require.NoError(t, err) + require.NotNil(t, cfg) + + assert.Equal(t, 3, len(cfg.Exporters)) + + actualCfg := cfg.Exporters[config.NewComponentIDWithName(typeStr, "json")].(*Config) + expectedCfg := Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "json")), + HTTPClientSettings: confighttp.HTTPClientSettings{ + Headers: map[string]string{}, + Endpoint: "https://loki:3100/loki/api/v1/push", + TLSSetting: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: "", + CertFile: "", + KeyFile: "", + }, + Insecure: false, + }, + ReadBufferSize: 0, + WriteBufferSize: 524288, + Timeout: time.Second * 30, + }, + RetrySettings: exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, + }, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 10, + QueueSize: 5000, + }, + TenantID: "example", + Labels: LabelsConfig{ + Attributes: map[string]string{}, + ResourceAttributes: map[string]string{}, + }, + Format: "json", } require.Equal(t, &expectedCfg, actualCfg) } diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go new file mode 100644 index 000000000000..5c09a5e2dee6 --- /dev/null +++ b/exporter/lokiexporter/encode_json.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lokiexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter" + +import ( + "encoding/json" + "fmt" + + "go.opentelemetry.io/collector/model/pdata" +) + +// JSON representation of the LogRecord as described by https://developers.google.com/protocol-buffers/docs/proto3#json + +type lokiEntry struct { + Name string `json:"name,omitempty"` + Body string `json:"body,omitempty"` + TraceID string `json:"traceid,omitempty"` + SpanID string `json:"spanid,omitempty"` + Severity string `json:"severity,omitempty"` + Attributes map[string]interface{} `json:"attributes,omitempty"` + Resources map[string]interface{} `json:"resources,omitempty"` +} + +func serializeBody(body pdata.AttributeValue) (string, error) { + str := "" + var err error + if body.Type() == pdata.AttributeValueTypeString { + str = body.StringVal() + } else { + err = fmt.Errorf("unsuported body type to serialize") + } + return str, err +} + +func encodeJSON(lr pdata.LogRecord, res pdata.Resource) (string, error) { + var logRecord lokiEntry + var jsonRecord []byte + var err error + var body string + + body, err = serializeBody(lr.Body()) + if err != nil { + return "", err + } + logRecord = lokiEntry{ + Name: lr.Name(), + Body: body, + TraceID: lr.TraceID().HexString(), + SpanID: lr.SpanID().HexString(), + Severity: lr.SeverityText(), + Attributes: lr.Attributes().AsRaw(), + Resources: res.Attributes().AsRaw(), + } + lr.Body().Type() + + jsonRecord, err = json.Marshal(logRecord) + if err != nil { + return "", err + } + return string(jsonRecord), nil +} diff --git a/exporter/lokiexporter/encode_json_test.go b/exporter/lokiexporter/encode_json_test.go new file mode 100644 index 000000000000..d54f234bb185 --- /dev/null +++ b/exporter/lokiexporter/encode_json_test.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lokiexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" +) + +func exampleLog() (pdata.LogRecord, pdata.Resource) { + + buffer := pdata.NewLogRecord() + buffer.Body().SetStringVal("Example log") + buffer.SetName("name") + buffer.SetSeverityText("error") + buffer.Attributes().Insert("attr1", pdata.NewAttributeValueString("1")) + buffer.Attributes().Insert("attr2", pdata.NewAttributeValueString("2")) + buffer.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4})) + buffer.SetSpanID(pdata.NewSpanID([8]byte{5, 6, 7, 8})) + + resource := pdata.NewResource() + resource.Attributes().Insert("host.name", pdata.NewAttributeValueString("something")) + + return buffer, resource +} + +func exampleJSON() string { + jsonExample := `{"name":"name","body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"},"resources":{"host.name":"something"}}` + return jsonExample +} + +func TestConvertString(t *testing.T) { + in := exampleJSON() + out, err := encodeJSON(exampleLog()) + t.Log(in) + t.Log(out, err) + assert.Equal(t, in, out) +} + +func TestConvertNonString(t *testing.T) { + in := exampleJSON() + log, resource := exampleLog() + mapVal := pdata.NewAttributeValueMap() + mapVal.MapVal().Insert("key1", pdata.NewAttributeValueString("value")) + mapVal.MapVal().Insert("key2", pdata.NewAttributeValueString("value")) + mapVal.CopyTo(log.Body()) + + out, err := encodeJSON(log, resource) + t.Log(in) + t.Log(out, err) + assert.EqualError(t, err, "unsuported body type to serialize") +} diff --git a/exporter/lokiexporter/example/docker-compose.yml b/exporter/lokiexporter/example/docker-compose.yml index cf67afdae94f..115ca13ce59c 100644 --- a/exporter/lokiexporter/example/docker-compose.yml +++ b/exporter/lokiexporter/example/docker-compose.yml @@ -13,8 +13,6 @@ services: container_name: otel command: - "--config=/etc/otel-collector-config.yml" - # Memory Ballast size should be max 1/3 to 1/2 of memory. - - "--mem-ballast-size-mib=683" volumes: - ./otel-collector-config.yml:/etc/otel-collector-config.yml ports: diff --git a/exporter/lokiexporter/example/otel-collector-config.yml b/exporter/lokiexporter/example/otel-collector-config.yml index 63ebda9bca8e..a605485449ec 100644 --- a/exporter/lokiexporter/example/otel-collector-config.yml +++ b/exporter/lokiexporter/example/otel-collector-config.yml @@ -24,15 +24,23 @@ processors: exporters: loki: endpoint: "http://loki:3100/loki/api/v1/push" + # Will encode the whole OTEL message in json + # This format happen after extracting labels + format: json labels: attributes: container_name: "" source: "" + resources: + host.name: "hostname" extensions: health_check: pprof: zpages: + memory_ballast: + # Memory Ballast size should be max 1/3 to 1/2 of memory. + size_mib: 64 service: extensions: [pprof, zpages, health_check] diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 37bcbfff69f4..ce2eae4aab1e 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -17,6 +17,7 @@ package lokiexporter // import "github.com/open-telemetry/opentelemetry-collecto import ( "bytes" "context" + "errors" "fmt" "io" "io/ioutil" @@ -30,23 +31,31 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/third_party/loki/logproto" ) type lokiExporter struct { - config *Config - logger *zap.Logger - client *http.Client - wg sync.WaitGroup + config *Config + logger *zap.Logger + client *http.Client + wg sync.WaitGroup + convert func(pdata.LogRecord, pdata.Resource) (*logproto.Entry, error) } func newExporter(config *Config, logger *zap.Logger) *lokiExporter { - return &lokiExporter{ + lokiexporter := &lokiExporter{ config: config, logger: logger, } + if config.Format == "json" { + lokiexporter.convert = convertLogToJSONEntry + } else { + lokiexporter.convert = convertLogBodyToEntry + } + return lokiexporter } func (l *lokiExporter) pushLogData(ctx context.Context, ld pdata.Logs) error { @@ -119,6 +128,8 @@ func (l *lokiExporter) stop(context.Context) (err error) { } func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, numDroppedLogs int) { + var errs error + streams := make(map[string]*logproto.Stream) rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { @@ -135,7 +146,24 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n continue } labels := mergedLabels.String() - entry := convertLogToLokiEntry(log) + var entry *logproto.Entry + var err error + entry, err = l.convert(log, resource) + if err != nil { + // Couldn't convert so dropping log. + numDroppedLogs++ + errs = multierr.Append( + errs, + errors.New( + fmt.Sprint( + "failed to convert, dropping log", + zap.String("format", l.config.Format), + zap.Error(err), + ), + ), + ) + continue + } if stream, ok := streams[labels]; ok { stream.Entries = append(stream.Entries, *entry) @@ -150,6 +178,10 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n } } + if errs != nil { + l.logger.Debug("some logs has been dropped", zap.Error(errs)) + } + pr = &logproto.PushRequest{ Streams: make([]logproto.Stream, len(streams)), } @@ -195,9 +227,20 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap, return ls } -func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry { +func convertLogBodyToEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) { return &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), Line: lr.Body().StringVal(), + }, nil +} + +func convertLogToJSONEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) { + line, err := encodeJSON(lr, res) + if err != nil { + return nil, err } + return &logproto.Entry{ + Timestamp: time.Unix(0, int64(lr.Timestamp())), + Line: line, + }, nil } diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index fcf0fe8a3b64..2d7cb7218cb5 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -472,13 +472,15 @@ func TestExporter_convertAttributesToLabels(t *testing.T) { }) } -func TestExporter_convertLogToLokiEntry(t *testing.T) { +func TestExporter_convertLogBodyToEntry(t *testing.T) { ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds()) lr := pdata.NewLogRecord() lr.Body().SetStringVal("log message") lr.SetTimestamp(ts) + res := pdata.NewResource() + res.Attributes().Insert("host.name", pdata.NewAttributeValueString("something")) - entry := convertLogToLokiEntry(lr) + entry, _ := convertLogBodyToEntry(lr, res) expEntry := &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), @@ -576,3 +578,21 @@ func TestExporter_stopAlwaysReturnsNil(t *testing.T) { require.NotNil(t, exp) require.NoError(t, exp.stop(context.Background())) } + +func TestExporter_convertLogtoJSONEntry(t *testing.T) { + ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds()) + lr := pdata.NewLogRecord() + lr.Body().SetStringVal("log message") + lr.SetTimestamp(ts) + res := pdata.NewResource() + res.Attributes().Insert("host.name", pdata.NewAttributeValueString("something")) + + entry, err := convertLogToJSONEntry(lr, res) + expEntry := &logproto.Entry{ + Timestamp: time.Unix(0, int64(lr.Timestamp())), + Line: `{"body":"log message","resources":{"host.name":"something"}}`, + } + require.Nil(t, err) + require.NotNil(t, entry) + require.Equal(t, expEntry, entry) +} diff --git a/exporter/lokiexporter/factory.go b/exporter/lokiexporter/factory.go index 4ecb536381a3..053259226008 100644 --- a/exporter/lokiexporter/factory.go +++ b/exporter/lokiexporter/factory.go @@ -48,6 +48,7 @@ func createDefaultConfig() config.Exporter { RetrySettings: exporterhelper.DefaultRetrySettings(), QueueSettings: exporterhelper.DefaultQueueSettings(), TenantID: "", + Format: "body", Labels: LabelsConfig{ Attributes: map[string]string{}, ResourceAttributes: map[string]string{}, diff --git a/exporter/lokiexporter/testdata/config.yaml b/exporter/lokiexporter/testdata/config.yaml index f66a79190566..00d57568036f 100644 --- a/exporter/lokiexporter/testdata/config.yaml +++ b/exporter/lokiexporter/testdata/config.yaml @@ -13,6 +13,10 @@ exporters: container.name: "container_name" k8s.cluster.name: "k8s_cluster_name" severity: "severity" + loki/json: + endpoint: "https://loki:3100/loki/api/v1/push" + tenant_id: "example" + format: "json" loki/allsettings: endpoint: "https://loki:3100/loki/api/v1/push" tenant_id: "example" @@ -48,4 +52,4 @@ service: logs: receivers: [ nop ] processors: [ nop ] - exporters: [ loki, loki/allsettings ] + exporters: [ loki, loki/allsettings, loki/json ] \ No newline at end of file