From 491e6e1861523bd8089f7780005c60f46bb46a42 Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Fri, 21 May 2021 14:59:59 +0100 Subject: [PATCH 01/22] initial commit --- exporter/lokiexporter/config.go | 34 ++++++++++++ exporter/lokiexporter/exporter.go | 63 +++++++++++++++++----- exporter/lokiexporter/exporter_test.go | 41 ++++++++------ exporter/lokiexporter/testdata/config.yaml | 2 + 4 files changed, 111 insertions(+), 29 deletions(-) diff --git a/exporter/lokiexporter/config.go b/exporter/lokiexporter/config.go index 42a2f96b77e7..552a54757b19 100644 --- a/exporter/lokiexporter/config.go +++ b/exporter/lokiexporter/config.go @@ -50,6 +50,8 @@ func (c *Config) validate() error { type LabelsConfig struct { // Attributes are the attributes that are allowed to be added as labels on a log stream. Attributes map[string]string `mapstructure:"attributes"` + // ResourceAttributes are the resource attributes that are allowed to be added as labels on a log stream. + ResourceAttributes map[string]string `mapstructure:"resource"` } func (c *LabelsConfig) validate() error { @@ -84,3 +86,35 @@ func (c *LabelsConfig) getAttributes() map[string]model.LabelName { return attributes } + +// getLogRecordAttributes creates a lookup of allowed attributes to valid Loki label names. +func (c *LabelsConfig) getLogRecordAttributes() map[string]model.LabelName { + attributes := map[string]model.LabelName{} + + for attrName, lblName := range c.Attributes { + if len(lblName) > 0 { + attributes[attrName] = model.LabelName(lblName) + continue + } + + attributes[attrName] = model.LabelName(attrName) + } + + return attributes +} + +// getResourceAttributes creates a lookup of allowed attributes to valid Loki label names. +func (c *LabelsConfig) getResourceAttributes() map[string]model.LabelName { + attributes := map[string]model.LabelName{} + + for attrName, lblName := range c.ResourceAttributes { + if len(lblName) > 0 { + attributes[attrName] = model.LabelName(lblName) + continue + } + + attributes[attrName] = model.LabelName(attrName) + } + + return attributes +} diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 33cf864920df..7bfb5854d13b 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -36,11 +36,12 @@ import ( ) type lokiExporter struct { - config *Config - logger *zap.Logger - client *http.Client - attributesToLabels map[string]model.LabelName - wg sync.WaitGroup + config *Config + logger *zap.Logger + client *http.Client + attribLogsToLabels map[string]model.LabelName + attribResoucesToLabels map[string]model.LabelName + wg sync.WaitGroup } func newExporter(config *Config, logger *zap.Logger) (*lokiExporter, error) { @@ -110,7 +111,8 @@ func encode(pb proto.Message) ([]byte, error) { } func (l *lokiExporter) start(context.Context, component.Host) (err error) { - l.attributesToLabels = l.config.Labels.getAttributes() + l.attribLogsToLabels = l.config.Labels.getLogRecordAttributes() + l.attribResoucesToLabels = l.config.Labels.getResourceAttributes() return nil } @@ -124,19 +126,20 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { ills := rls.At(i).InstrumentationLibraryLogs() + resource := rls.At(i).Resource() for j := 0; j < ills.Len(); j++ { logs := ills.At(j).Logs() for k := 0; k < logs.Len(); k++ { log := logs.At(k) - attribLabels, ok := l.convertAttributesToLabels(log.Attributes()) + attribLabels, ok := l.convertAttributesAndMerge(log.Attributes(), resource.Attributes()) if !ok { numDroppedLogs++ continue } labels := attribLabels.String() - - entry := convertLogToLokiEntry(log) + // entry := convertLogToLokiEntry(log) + entry := convertLogToJsonEntry(log) if stream, ok := streams[labels]; ok { stream.Entries = append(stream.Entries, *entry) @@ -164,10 +167,21 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n return pr, numDroppedLogs } -func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap) (model.LabelSet, bool) { +func (l *lokiExporter) convertAttributesAndMerge(logAttrs pdata.AttributeMap, resourceAttrs pdata.AttributeMap) (mergedLabels model.LabelSet, dropped bool) { + attribLabels := l.convertLogAttributesToLabels(logAttrs) + attribResources := l.convertResourceAttributesToLabels(resourceAttrs) + mergedLabels = attribResources.Merge(attribLabels) + // TODO: decide on ordering and deduplication + if len(mergedLabels) == 0 { + return nil, true + } + return mergedLabels, false +} + +func (l *lokiExporter) convertLogAttributesToLabels(attributes pdata.AttributeMap) model.LabelSet { ls := model.LabelSet{} - for attr, attrLabelName := range l.attributesToLabels { + for attr, attrLabelName := range l.attribLogsToLabels { av, ok := attributes.Get(attr) if ok { if av.Type() != pdata.AttributeValueTypeString { @@ -178,11 +192,24 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap) } } - if len(ls) == 0 { - return nil, false + return ls +} + +func (l *lokiExporter) convertResourceAttributesToLabels(attributes pdata.AttributeMap) model.LabelSet { + ls := model.LabelSet{} + + for attr, attrLabelName := range l.attribResoucesToLabels { + av, ok := attributes.Get(attr) + if ok { + if av.Type() != pdata.AttributeValueTypeString { + l.logger.Debug("Failed to convert attribute value to Loki label value, value is not a string", zap.String("attribute", attr)) + continue + } + ls[attrLabelName] = model.LabelValue(av.StringVal()) + } } - return ls, true + return ls } func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry { @@ -191,3 +218,11 @@ func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry { Line: lr.Body().StringVal(), } } + +func convertLogToJsonEntry(lr pdata.LogRecord) *logproto.Entry { + line := encodeJSON(lr) + return &logproto.Entry{ + Timestamp: time.Unix(0, int64(lr.Timestamp())), + Line: line, + } +} diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index 929fe0f79df3..59ba295c383b 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -387,50 +387,45 @@ func TestExporter_convertAttributesToLabels(t *testing.T) { am.InsertString(conventions.AttributeContainerName, "mycontainer") am.InsertString(conventions.AttributeK8sCluster, "mycluster") am.InsertString("severity", "debug") - - ls, ok := exp.convertAttributesToLabels(am) + ram := pdata.NewAttributeMap() + ls, _ := exp.convertAttributesAndMerge(am, ram) expLs := model.LabelSet{ model.LabelName("container_name"): model.LabelValue("mycontainer"), model.LabelName("k8s_cluster_name"): model.LabelValue("mycluster"), model.LabelName("severity"): model.LabelValue("debug"), } - require.True(t, ok) require.Equal(t, expLs, ls) }) t.Run("with attribute matches and the value is a boolean", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertBool("severity", false) - - ls, ok := exp.convertAttributesToLabels(am) - require.False(t, ok) + ram := pdata.NewAttributeMap() + ls, _ := exp.convertAttributesAndMerge(am, ram) require.Nil(t, ls) }) t.Run("with attribute that matches and the value is a double", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertDouble("severity", float64(0)) - - ls, ok := exp.convertAttributesToLabels(am) - require.False(t, ok) + ram := pdata.NewAttributeMap() + ls, _ := exp.convertAttributesAndMerge(am, ram) require.Nil(t, ls) }) t.Run("with attribute that matches and the value is an int", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertInt("severity", 0) - - ls, ok := exp.convertAttributesToLabels(am) - require.False(t, ok) + ram := pdata.NewAttributeMap() + ls, _ := exp.convertAttributesAndMerge(am, ram) require.Nil(t, ls) }) t.Run("with attribute that matches and the value is null", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertNull("severity") - - ls, ok := exp.convertAttributesToLabels(am) - require.False(t, ok) + ram := pdata.NewAttributeMap() + ls, _ := exp.convertAttributesAndMerge(am, ram) require.Nil(t, ls) }) } @@ -523,3 +518,19 @@ func TestExporter_stopAlwaysReturnsNil(t *testing.T) { require.NoError(t, err) require.NoError(t, e.stop(context.Background())) } + +func TestExporter_convertLogtoJsonEntry(t *testing.T) { + ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds()) + lr := pdata.NewLogRecord() + lr.Body().SetStringVal("log message") + lr.SetTimestamp(ts) + + entry := convertLogToJsonEntry(lr) + + expEntry := &logproto.Entry{ + Timestamp: time.Unix(0, int64(lr.Timestamp())), + Line: `{"body":"log message"}`, + } + require.NotNil(t, entry) + require.Equal(t, expEntry, entry) +} diff --git a/exporter/lokiexporter/testdata/config.yaml b/exporter/lokiexporter/testdata/config.yaml index 4a522e58562f..5281035b9fb3 100644 --- a/exporter/lokiexporter/testdata/config.yaml +++ b/exporter/lokiexporter/testdata/config.yaml @@ -5,6 +5,8 @@ processors: nop: exporters: + loki/test: + json: enabled loki: endpoint: "https://loki:3100/loki/api/v1/push" tenant_id: "example" From 18d1808ab028a018d1c6fd5ba9cc5d3159f2bced Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Fri, 21 May 2021 15:05:25 +0100 Subject: [PATCH 02/22] revert un-needed changes --- exporter/lokiexporter/config.go | 34 ------------- exporter/lokiexporter/exporter.go | 55 ++++++---------------- exporter/lokiexporter/exporter_test.go | 25 ++++++---- exporter/lokiexporter/testdata/config.yaml | 2 - 4 files changed, 29 insertions(+), 87 deletions(-) diff --git a/exporter/lokiexporter/config.go b/exporter/lokiexporter/config.go index 552a54757b19..42a2f96b77e7 100644 --- a/exporter/lokiexporter/config.go +++ b/exporter/lokiexporter/config.go @@ -50,8 +50,6 @@ func (c *Config) validate() error { type LabelsConfig struct { // Attributes are the attributes that are allowed to be added as labels on a log stream. Attributes map[string]string `mapstructure:"attributes"` - // ResourceAttributes are the resource attributes that are allowed to be added as labels on a log stream. - ResourceAttributes map[string]string `mapstructure:"resource"` } func (c *LabelsConfig) validate() error { @@ -86,35 +84,3 @@ func (c *LabelsConfig) getAttributes() map[string]model.LabelName { return attributes } - -// getLogRecordAttributes creates a lookup of allowed attributes to valid Loki label names. -func (c *LabelsConfig) getLogRecordAttributes() map[string]model.LabelName { - attributes := map[string]model.LabelName{} - - for attrName, lblName := range c.Attributes { - if len(lblName) > 0 { - attributes[attrName] = model.LabelName(lblName) - continue - } - - attributes[attrName] = model.LabelName(attrName) - } - - return attributes -} - -// getResourceAttributes creates a lookup of allowed attributes to valid Loki label names. -func (c *LabelsConfig) getResourceAttributes() map[string]model.LabelName { - attributes := map[string]model.LabelName{} - - for attrName, lblName := range c.ResourceAttributes { - if len(lblName) > 0 { - attributes[attrName] = model.LabelName(lblName) - continue - } - - attributes[attrName] = model.LabelName(attrName) - } - - return attributes -} diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 7bfb5854d13b..6f088674e6bc 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -36,12 +36,11 @@ import ( ) type lokiExporter struct { - config *Config - logger *zap.Logger - client *http.Client - attribLogsToLabels map[string]model.LabelName - attribResoucesToLabels map[string]model.LabelName - wg sync.WaitGroup + config *Config + logger *zap.Logger + client *http.Client + attributesToLabels map[string]model.LabelName + wg sync.WaitGroup } func newExporter(config *Config, logger *zap.Logger) (*lokiExporter, error) { @@ -111,8 +110,7 @@ func encode(pb proto.Message) ([]byte, error) { } func (l *lokiExporter) start(context.Context, component.Host) (err error) { - l.attribLogsToLabels = l.config.Labels.getLogRecordAttributes() - l.attribResoucesToLabels = l.config.Labels.getResourceAttributes() + l.attributesToLabels = l.config.Labels.getAttributes() return nil } @@ -126,20 +124,19 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { ills := rls.At(i).InstrumentationLibraryLogs() - resource := rls.At(i).Resource() for j := 0; j < ills.Len(); j++ { logs := ills.At(j).Logs() for k := 0; k < logs.Len(); k++ { log := logs.At(k) - attribLabels, ok := l.convertAttributesAndMerge(log.Attributes(), resource.Attributes()) + attribLabels, ok := l.convertAttributesToLabels(log.Attributes()) if !ok { numDroppedLogs++ continue } labels := attribLabels.String() - // entry := convertLogToLokiEntry(log) - entry := convertLogToJsonEntry(log) + + entry := convertLogToLokiEntry(log) if stream, ok := streams[labels]; ok { stream.Entries = append(stream.Entries, *entry) @@ -167,21 +164,10 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n return pr, numDroppedLogs } -func (l *lokiExporter) convertAttributesAndMerge(logAttrs pdata.AttributeMap, resourceAttrs pdata.AttributeMap) (mergedLabels model.LabelSet, dropped bool) { - attribLabels := l.convertLogAttributesToLabels(logAttrs) - attribResources := l.convertResourceAttributesToLabels(resourceAttrs) - mergedLabels = attribResources.Merge(attribLabels) - // TODO: decide on ordering and deduplication - if len(mergedLabels) == 0 { - return nil, true - } - return mergedLabels, false -} - -func (l *lokiExporter) convertLogAttributesToLabels(attributes pdata.AttributeMap) model.LabelSet { +func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap) (model.LabelSet, bool) { ls := model.LabelSet{} - for attr, attrLabelName := range l.attribLogsToLabels { + for attr, attrLabelName := range l.attributesToLabels { av, ok := attributes.Get(attr) if ok { if av.Type() != pdata.AttributeValueTypeString { @@ -192,24 +178,11 @@ func (l *lokiExporter) convertLogAttributesToLabels(attributes pdata.AttributeMa } } - return ls -} - -func (l *lokiExporter) convertResourceAttributesToLabels(attributes pdata.AttributeMap) model.LabelSet { - ls := model.LabelSet{} - - for attr, attrLabelName := range l.attribResoucesToLabels { - av, ok := attributes.Get(attr) - if ok { - if av.Type() != pdata.AttributeValueTypeString { - l.logger.Debug("Failed to convert attribute value to Loki label value, value is not a string", zap.String("attribute", attr)) - continue - } - ls[attrLabelName] = model.LabelValue(av.StringVal()) - } + if len(ls) == 0 { + return nil, false } - return ls + return ls, true } func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry { diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index 59ba295c383b..cc4537b2f974 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -387,45 +387,50 @@ func TestExporter_convertAttributesToLabels(t *testing.T) { am.InsertString(conventions.AttributeContainerName, "mycontainer") am.InsertString(conventions.AttributeK8sCluster, "mycluster") am.InsertString("severity", "debug") - ram := pdata.NewAttributeMap() - ls, _ := exp.convertAttributesAndMerge(am, ram) + + ls, ok := exp.convertAttributesToLabels(am) expLs := model.LabelSet{ model.LabelName("container_name"): model.LabelValue("mycontainer"), model.LabelName("k8s_cluster_name"): model.LabelValue("mycluster"), model.LabelName("severity"): model.LabelValue("debug"), } + require.True(t, ok) require.Equal(t, expLs, ls) }) t.Run("with attribute matches and the value is a boolean", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertBool("severity", false) - ram := pdata.NewAttributeMap() - ls, _ := exp.convertAttributesAndMerge(am, ram) + + ls, ok := exp.convertAttributesToLabels(am) + require.False(t, ok) require.Nil(t, ls) }) t.Run("with attribute that matches and the value is a double", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertDouble("severity", float64(0)) - ram := pdata.NewAttributeMap() - ls, _ := exp.convertAttributesAndMerge(am, ram) + + ls, ok := exp.convertAttributesToLabels(am) + require.False(t, ok) require.Nil(t, ls) }) t.Run("with attribute that matches and the value is an int", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertInt("severity", 0) - ram := pdata.NewAttributeMap() - ls, _ := exp.convertAttributesAndMerge(am, ram) + + ls, ok := exp.convertAttributesToLabels(am) + require.False(t, ok) require.Nil(t, ls) }) t.Run("with attribute that matches and the value is null", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertNull("severity") - ram := pdata.NewAttributeMap() - ls, _ := exp.convertAttributesAndMerge(am, ram) + + ls, ok := exp.convertAttributesToLabels(am) + require.False(t, ok) require.Nil(t, ls) }) } diff --git a/exporter/lokiexporter/testdata/config.yaml b/exporter/lokiexporter/testdata/config.yaml index 5281035b9fb3..4a522e58562f 100644 --- a/exporter/lokiexporter/testdata/config.yaml +++ b/exporter/lokiexporter/testdata/config.yaml @@ -5,8 +5,6 @@ processors: nop: exporters: - loki/test: - json: enabled loki: endpoint: "https://loki:3100/loki/api/v1/push" tenant_id: "example" From 403b6a1ac24f6fb7b788d32470508cbc84fbde0c Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Fri, 21 May 2021 15:06:56 +0100 Subject: [PATCH 03/22] test json encode as entry --- exporter/lokiexporter/exporter.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 6f088674e6bc..335223d66190 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -136,7 +136,8 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n labels := attribLabels.String() - entry := convertLogToLokiEntry(log) + // entry := convertLogToLokiEntry(log) + entry := convertLogToJsonEntry(log) if stream, ok := streams[labels]; ok { stream.Entries = append(stream.Entries, *entry) From 4c695b7d8be23778e4d29c44df3a9d1a4a7ab9c9 Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Fri, 21 May 2021 15:56:03 +0100 Subject: [PATCH 04/22] add encoding files --- exporter/lokiexporter/encode_json.go | 35 +++++++++++++++++++++++ exporter/lokiexporter/encode_json_test.go | 35 +++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 exporter/lokiexporter/encode_json.go create mode 100644 exporter/lokiexporter/encode_json_test.go diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go new file mode 100644 index 000000000000..97762eed609d --- /dev/null +++ b/exporter/lokiexporter/encode_json.go @@ -0,0 +1,35 @@ +package lokiexporter + +import ( + "encoding/json" + "fmt" + + "go.opentelemetry.io/collector/consumer/pdata" + tracetranslator "go.opentelemetry.io/collector/translator/trace" +) + +type jsonRecord struct { + Name string `json:"name,omitempty"` + Body string `json:"body,omitempty"` + TraceID string `json:"traceid,omitempty"` + SpanID string `json:"spanid,omitempty"` + Severity string `json:"severity,omitempty"` + Attributes map[string]interface{} `json:"attributes,omitempty"` +} + +func encodeJSON(lr pdata.LogRecord) string { + // json1 := jsonRecord{lr.Name(), lr.Body().StringVal(), lr.TraceID(), lr.SpanID(), lr.SeverityText()} + var json1 jsonRecord + var json2 []byte + + json1 = jsonRecord{Name: lr.Name(), Body: lr.Body().StringVal(), TraceID: lr.TraceID().HexString(), SpanID: lr.SpanID().HexString(), Severity: lr.SeverityText(), Attributes: tracetranslator.AttributeMapToMap(lr.Attributes())} + // json = {name: lr.Name(), body: lr.Body().StringVal(), traceid: lr.TraceID().StringVal(), spanid: lr.SpanID().StringVal(), severity: lr.Severity().StringVal(), attributes: {lr.AttributeMap().Get()}} + fmt.Printf("JSON 1: %v", json1) + + json2, err := json.Marshal(json1) + if err != nil { + fmt.Println("error:", err) + } + fmt.Println("JSON 2" + string(json2)) + return string(json2) +} diff --git a/exporter/lokiexporter/encode_json_test.go b/exporter/lokiexporter/encode_json_test.go new file mode 100644 index 000000000000..fe0c9d151800 --- /dev/null +++ b/exporter/lokiexporter/encode_json_test.go @@ -0,0 +1,35 @@ +package lokiexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/consumer/pdata" +) + +func exampleLog() pdata.LogRecord { + + buffer := pdata.NewLogRecord() + buffer.Body().SetStringVal("Example log") + buffer.SetName("name") + buffer.SetSeverityText("error") + buffer.Attributes().Insert("attr1", pdata.NewAttributeValueString("1")) + buffer.Attributes().Insert("attr2", pdata.NewAttributeValueString("2")) + buffer.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4})) + buffer.SetSpanID(pdata.NewSpanID([8]byte{5, 6, 7, 8})) + + return buffer +} + +func exampleJson() string { + jsonExample := `{"name":"name","body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"}}` + return jsonExample +} + +func TestConvert(t *testing.T) { + in := exampleJson() + out := encodeJSON(exampleLog()) + t.Log(in) + t.Log(out) + assert.Equal(t, in, out) +} From caa45075186ed2af7da3cd888177ec2e8ea61ecd Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Mon, 24 May 2021 15:49:56 +0100 Subject: [PATCH 05/22] merge loki encoding with the resource atttribute branch --- exporter/lokiexporter/config.go | 44 ++++++-- exporter/lokiexporter/config_test.go | 114 ++++++++++++++++++--- exporter/lokiexporter/exporter.go | 64 ++++++++---- exporter/lokiexporter/exporter_test.go | 91 ++++++++++++---- exporter/lokiexporter/factory_test.go | 3 +- exporter/lokiexporter/testdata/config.yaml | 5 +- 6 files changed, 259 insertions(+), 62 deletions(-) diff --git a/exporter/lokiexporter/config.go b/exporter/lokiexporter/config.go index 42a2f96b77e7..3adba8ffdf37 100644 --- a/exporter/lokiexporter/config.go +++ b/exporter/lokiexporter/config.go @@ -48,29 +48,41 @@ func (c *Config) validate() error { // LabelsConfig defines the labels-related configuration type LabelsConfig struct { - // Attributes are the attributes that are allowed to be added as labels on a log stream. + // Attributes are the log record attributes that are allowed to be added as labels on a log stream. Attributes map[string]string `mapstructure:"attributes"` + + // ResourceAttributes are the resource attributes that are allowed to be added as labels on a log stream. + ResourceAttributes map[string]string `mapstructure:"resource"` } func (c *LabelsConfig) validate() error { - if len(c.Attributes) == 0 { - return fmt.Errorf("\"labels.attributes\" must be configured with at least one attribute") + if len(c.Attributes) == 0 && len(c.ResourceAttributes) == 0 { + return fmt.Errorf("\"labels.attributes\" or \"labels.resource\" must be configured with at least one attribute") } - labelNameInvalidErr := "the label `%s` in \"labels.attributes\" is not a valid label name. Label names must match " + model.LabelNameRE.String() + logRecordNameInvalidErr := "the label `%s` in \"labels.attributes\" is not a valid label name. Label names must match " + model.LabelNameRE.String() for l, v := range c.Attributes { if len(v) > 0 && !model.LabelName(v).IsValid() { - return fmt.Errorf(labelNameInvalidErr, v) + return fmt.Errorf(logRecordNameInvalidErr, v) + } else if len(v) == 0 && !model.LabelName(l).IsValid() { + return fmt.Errorf(logRecordNameInvalidErr, l) + } + } + + resourceNameInvalidErr := "the label `%s` in \"labels.resource\" is not a valid label name. Label names must match " + model.LabelNameRE.String() + for l, v := range c.ResourceAttributes { + if len(v) > 0 && !model.LabelName(v).IsValid() { + return fmt.Errorf(resourceNameInvalidErr, v) } else if len(v) == 0 && !model.LabelName(l).IsValid() { - return fmt.Errorf(labelNameInvalidErr, l) + return fmt.Errorf(resourceNameInvalidErr, l) } } return nil } -// getAttributes creates a lookup of allowed attributes to valid Loki label names. -func (c *LabelsConfig) getAttributes() map[string]model.LabelName { +// getLogRecordAttributes creates a lookup of allowed attributes to valid Loki label names. +func (c *LabelsConfig) getLogRecordAttributes() map[string]model.LabelName { attributes := map[string]model.LabelName{} for attrName, lblName := range c.Attributes { @@ -84,3 +96,19 @@ func (c *LabelsConfig) getAttributes() map[string]model.LabelName { return attributes } + +// getResourceAttributes creates a lookup of allowed attributes to valid Loki label names. +func (c *LabelsConfig) getResourceAttributes() map[string]model.LabelName { + attributes := map[string]model.LabelName{} + + for attrName, lblName := range c.ResourceAttributes { + if len(lblName) > 0 { + attributes[attrName] = model.LabelName(lblName) + continue + } + + attributes[attrName] = model.LabelName(attrName) + } + + return attributes +} diff --git a/exporter/lokiexporter/config_test.go b/exporter/lokiexporter/config_test.go index 9568d2ae13f6..681a39b08a0d 100644 --- a/exporter/lokiexporter/config_test.go +++ b/exporter/lokiexporter/config_test.go @@ -82,6 +82,10 @@ func TestLoadConfig(t *testing.T) { conventions.AttributeK8sCluster: "k8s_cluster_name", "severity": "severity", }, + ResourceAttributes: map[string]string{ + "resource.name": "resource_name", + "severity": "severity", + }, }, } require.Equal(t, &expectedCfg, actualCfg) @@ -90,9 +94,12 @@ func TestLoadConfig(t *testing.T) { func TestConfig_validate(t *testing.T) { const validEndpoint = "https://validendpoint.local" - validLabelsConfig := LabelsConfig{ + validAttribLabelsConfig := LabelsConfig{ Attributes: testValidAttributesWithMapping, } + validResourceLabelsConfig := LabelsConfig{ + ResourceAttributes: testValidResourceWithMapping, + } type fields struct { Endpoint string @@ -111,7 +118,7 @@ func TestConfig_validate(t *testing.T) { name: "with valid endpoint", fields: fields{ Endpoint: validEndpoint, - Labels: validLabelsConfig, + Labels: validAttribLabelsConfig, }, shouldError: false, }, @@ -119,7 +126,7 @@ func TestConfig_validate(t *testing.T) { name: "with missing endpoint", fields: fields{ Endpoint: "", - Labels: validLabelsConfig, + Labels: validAttribLabelsConfig, }, errorMessage: "\"endpoint\" must be a valid URL", shouldError: true, @@ -128,37 +135,36 @@ func TestConfig_validate(t *testing.T) { name: "with invalid endpoint", fields: fields{ Endpoint: "this://is:an:invalid:endpoint.com", - Labels: validLabelsConfig, + Labels: validAttribLabelsConfig, }, errorMessage: "\"endpoint\" must be a valid URL", shouldError: true, }, { - name: "with missing `labels.attributes`", + name: "with missing `labels.attributes` and missing `labels.resource`", fields: fields{ Endpoint: validEndpoint, Labels: LabelsConfig{ - Attributes: nil, + Attributes: nil, + ResourceAttributes: nil, }, }, - errorMessage: "\"labels.attributes\" must be configured with at least one attribute", + errorMessage: "\"labels.attributes\" or \"labels.resource\" must be configured with at least one attribute", shouldError: true, }, { - name: "with `labels.attributes` set", + name: "with missing `labels.attributes`", fields: fields{ Endpoint: validEndpoint, - Labels: LabelsConfig{ - Attributes: testValidAttributesWithMapping, - }, + Labels: validResourceLabelsConfig, }, shouldError: false, }, { - name: "with valid `labels` config", + name: "with missing `labels.resource`", fields: fields{ Endpoint: validEndpoint, - Labels: validLabelsConfig, + Labels: validAttribLabelsConfig, }, shouldError: false, }, @@ -198,9 +204,10 @@ func TestLabelsConfig_validate(t *testing.T) { { name: "with no attributes", labels: LabelsConfig{ - Attributes: map[string]string{}, + Attributes: map[string]string{}, + ResourceAttributes: map[string]string{}, }, - errorMessage: "\"labels.attributes\" must be configured with at least one attribute", + errorMessage: "\"labels.attributes\" or \"labels.resource\" must be configured with at least one attribute", shouldError: true, }, { @@ -212,6 +219,15 @@ func TestLabelsConfig_validate(t *testing.T) { }, shouldError: false, }, + { + name: "with valid resource label map", + labels: LabelsConfig{ + ResourceAttributes: map[string]string{ + "other.attribute": "other", + }, + }, + shouldError: false, + }, { name: "with invalid attribute label map", labels: LabelsConfig{ @@ -222,6 +238,16 @@ func TestLabelsConfig_validate(t *testing.T) { errorMessage: "the label `invalid.label.name` in \"labels.attributes\" is not a valid label name. Label names must match " + model.LabelNameRE.String(), shouldError: true, }, + { + name: "with invalid resource label map", + labels: LabelsConfig{ + ResourceAttributes: map[string]string{ + "other.attribute": "invalid.label.name", + }, + }, + errorMessage: "the label `invalid.label.name` in \"labels.resource\" is not a valid label name. Label names must match " + model.LabelNameRE.String(), + shouldError: true, + }, { name: "with attribute having an invalid label name and no map configured", labels: LabelsConfig{ @@ -301,7 +327,63 @@ func TestLabelsConfig_getAttributes(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mapping := tt.labels.getAttributes() + mapping := tt.labels.getLogRecordAttributes() + + assert.Equal(t, tt.expectedMapping, mapping) + }) + } +} + +func TestResourcesConfig_getAttributes(t *testing.T) { + tests := []struct { + name string + labels LabelsConfig + expectedMapping map[string]model.LabelName + }{ + { + name: "with attributes without label mapping", + labels: LabelsConfig{ + ResourceAttributes: map[string]string{ + "attribute_1": "", + "attribute_2": "", + }, + }, + expectedMapping: map[string]model.LabelName{ + "attribute_1": model.LabelName("attribute_1"), + "attribute_2": model.LabelName("attribute_2"), + }, + }, + { + name: "with attributes and label mapping", + labels: LabelsConfig{ + ResourceAttributes: map[string]string{ + "attribute.1": "attribute_1", + "attribute.2": "attribute_2", + }, + }, + expectedMapping: map[string]model.LabelName{ + "attribute.1": model.LabelName("attribute_1"), + "attribute.2": model.LabelName("attribute_2"), + }, + }, + { + name: "with attributes and without label mapping", + labels: LabelsConfig{ + ResourceAttributes: map[string]string{ + "attribute.1": "attribute_1", + "attribute2": "", + }, + }, + expectedMapping: map[string]model.LabelName{ + "attribute.1": model.LabelName("attribute_1"), + "attribute2": model.LabelName("attribute2"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mapping := tt.labels.getResourceAttributes() assert.Equal(t, tt.expectedMapping, mapping) }) diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 335223d66190..0c41dc3fce80 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -36,11 +36,12 @@ import ( ) type lokiExporter struct { - config *Config - logger *zap.Logger - client *http.Client - attributesToLabels map[string]model.LabelName - wg sync.WaitGroup + config *Config + logger *zap.Logger + client *http.Client + attribLogsToLabels map[string]model.LabelName + attribResoucesToLabels map[string]model.LabelName + wg sync.WaitGroup } func newExporter(config *Config, logger *zap.Logger) (*lokiExporter, error) { @@ -110,7 +111,8 @@ func encode(pb proto.Message) ([]byte, error) { } func (l *lokiExporter) start(context.Context, component.Host) (err error) { - l.attributesToLabels = l.config.Labels.getAttributes() + l.attribLogsToLabels = l.config.Labels.getLogRecordAttributes() + l.attribResoucesToLabels = l.config.Labels.getResourceAttributes() return nil } @@ -124,20 +126,20 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { ills := rls.At(i).InstrumentationLibraryLogs() + resource := rls.At(i).Resource() for j := 0; j < ills.Len(); j++ { logs := ills.At(j).Logs() for k := 0; k < logs.Len(); k++ { log := logs.At(k) - attribLabels, ok := l.convertAttributesToLabels(log.Attributes()) - if !ok { + + mergedLabels, dropped := l.convertAttributesAndMerge(log.Attributes(), resource.Attributes()) + if dropped { numDroppedLogs++ continue } - - labels := attribLabels.String() - - // entry := convertLogToLokiEntry(log) - entry := convertLogToJsonEntry(log) + labels := mergedLabels.String() + entry := convertLogToLokiEntry(log) + // entry := convertLogToJsonEntry(log) if stream, ok := streams[labels]; ok { stream.Entries = append(stream.Entries, *entry) @@ -165,10 +167,23 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n return pr, numDroppedLogs } -func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap) (model.LabelSet, bool) { +func (l *lokiExporter) convertAttributesAndMerge(logAttrs pdata.AttributeMap, resourceAttrs pdata.AttributeMap) (mergedAttributes model.LabelSet, dropped bool) { + logRecordAttributes := l.convertLogAttributesToLabels(logAttrs) + resourceAttributes := l.convertResourceAttributesToLabels(resourceAttrs) + + // This prometheus model.labelset Merge function overwrites the logRecordAttributes with resourceAttributes + mergedAttributes = logRecordAttributes.Merge(resourceAttributes) + + if len(mergedAttributes) == 0 { + return nil, true + } + return mergedAttributes, false +} + +func (l *lokiExporter) convertLogAttributesToLabels(attributes pdata.AttributeMap) model.LabelSet { ls := model.LabelSet{} - for attr, attrLabelName := range l.attributesToLabels { + for attr, attrLabelName := range l.attribLogsToLabels { av, ok := attributes.Get(attr) if ok { if av.Type() != pdata.AttributeValueTypeString { @@ -179,11 +194,24 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap) } } - if len(ls) == 0 { - return nil, false + return ls +} + +func (l *lokiExporter) convertResourceAttributesToLabels(attributes pdata.AttributeMap) model.LabelSet { + ls := model.LabelSet{} + + for attr, attrLabelName := range l.attribResoucesToLabels { + av, ok := attributes.Get(attr) + if ok { + if av.Type() != pdata.AttributeValueTypeString { + l.logger.Debug("Failed to convert attribute value to Loki label value, value is not a string", zap.String("attribute", attr)) + continue + } + ls[attrLabelName] = model.LabelValue(av.StringVal()) + } } - return ls, true + return ls } func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry { diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index cc4537b2f974..cd69d75c698c 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -48,6 +48,10 @@ var ( conventions.AttributeK8sCluster: "k8s_cluster_name", "severity": "severity", } + testValidResourceWithMapping = map[string]string{ + "resource.name": "resource_name", + "severity": "severity", + } ) func createLogData(numberOfLogs int, attributes pdata.AttributeMap) pdata.Logs { @@ -75,7 +79,8 @@ func TestExporter_new(t *testing.T) { Endpoint: validEndpoint, }, Labels: LabelsConfig{ - Attributes: testValidAttributesWithMapping, + Attributes: testValidAttributesWithMapping, + ResourceAttributes: testValidResourceWithMapping, }, } exp, err := newExporter(config, zap.NewNop()) @@ -119,6 +124,7 @@ func TestExporter_pushLogData(t *testing.T) { pdata.NewAttributeMap().InitFromMap(map[string]pdata.AttributeValue{ conventions.AttributeContainerName: pdata.NewAttributeValueString("api"), conventions.AttributeK8sCluster: pdata.NewAttributeValueString("local"), + "resource.name": pdata.NewAttributeValueString("myresource"), "severity": pdata.NewAttributeValueString("debug"), })) } @@ -137,6 +143,9 @@ func TestExporter_pushLogData(t *testing.T) { conventions.AttributeK8sCluster: "k8s_cluster_name", "severity": "severity", }, + ResourceAttributes: map[string]string{ + "resource.name": "resource_name", + }, }, } @@ -272,6 +281,9 @@ func TestExporter_logDataToLoki(t *testing.T) { conventions.AttributeK8sCluster: "k8s_cluster_name", "severity": "severity", }, + ResourceAttributes: map[string]string{ + "resource.name": "resource_name", + }, }, } exp, err := newExporter(config, zap.NewNop()) @@ -361,6 +373,43 @@ func TestExporter_logDataToLoki(t *testing.T) { require.Len(t, pr.Streams[0].Entries, 1) require.Len(t, pr.Streams[1].Entries, 1) }) + + t.Run("with attributes and resource attributes that match config", func(t *testing.T) { + logs := pdata.NewLogs() + ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds()) + lr := logs.ResourceLogs().AppendEmpty() + lr.Resource().Attributes().InsertString("not.in.config", "not allowed") + + lri := lr.InstrumentationLibraryLogs().AppendEmpty().Logs().AppendEmpty() + lri.Body().SetStringVal("log message") + lri.Attributes().InsertString("not.in.config", "not allowed") + lri.SetTimestamp(ts) + + pr, numDroppedLogs := exp.logDataToLoki(logs) + expectedPr := &logproto.PushRequest{Streams: make([]logproto.Stream, 0)} + require.Equal(t, 1, numDroppedLogs) + require.Equal(t, expectedPr, pr) + }) + + t.Run("with attributes and resource attributes", func(t *testing.T) { + logs := pdata.NewLogs() + ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds()) + lr := logs.ResourceLogs().AppendEmpty() + lr.Resource().Attributes().InsertString("resource.name", "myresource") + + lri := lr.InstrumentationLibraryLogs().AppendEmpty().Logs().AppendEmpty() + lri.Body().SetStringVal("log message") + lri.Attributes().InsertString(conventions.AttributeContainerName, "mycontainer") + lri.Attributes().InsertString("severity", "info") + lri.Attributes().InsertString("random.attribute", "random") + lri.SetTimestamp(ts) + + pr, numDroppedLogs := exp.logDataToLoki(logs) + require.Equal(t, 0, numDroppedLogs) + require.NotNil(t, pr) + require.Len(t, pr.Streams, 1) + }) + } func TestExporter_convertAttributesToLabels(t *testing.T) { @@ -374,6 +423,10 @@ func TestExporter_convertAttributesToLabels(t *testing.T) { conventions.AttributeK8sCluster: "k8s_cluster_name", "severity": "severity", }, + ResourceAttributes: map[string]string{ + "resource.name": "resource_name", + "severity": "severity", + }, }, } exp, err := newExporter(config, zap.NewNop()) @@ -387,50 +440,50 @@ func TestExporter_convertAttributesToLabels(t *testing.T) { am.InsertString(conventions.AttributeContainerName, "mycontainer") am.InsertString(conventions.AttributeK8sCluster, "mycluster") am.InsertString("severity", "debug") + ram := pdata.NewAttributeMap() + ram.InsertString("resource.name", "myresource") + // this should overwrite log attribute of the same name + ram.InsertString("severity", "info") - ls, ok := exp.convertAttributesToLabels(am) + ls, _ := exp.convertAttributesAndMerge(am, ram) expLs := model.LabelSet{ model.LabelName("container_name"): model.LabelValue("mycontainer"), model.LabelName("k8s_cluster_name"): model.LabelValue("mycluster"), - model.LabelName("severity"): model.LabelValue("debug"), + model.LabelName("severity"): model.LabelValue("info"), + model.LabelName("resource_name"): model.LabelValue("myresource"), } - require.True(t, ok) require.Equal(t, expLs, ls) }) t.Run("with attribute matches and the value is a boolean", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertBool("severity", false) - - ls, ok := exp.convertAttributesToLabels(am) - require.False(t, ok) + ram := pdata.NewAttributeMap() + ls, _ := exp.convertAttributesAndMerge(am, ram) require.Nil(t, ls) }) t.Run("with attribute that matches and the value is a double", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertDouble("severity", float64(0)) - - ls, ok := exp.convertAttributesToLabels(am) - require.False(t, ok) + ram := pdata.NewAttributeMap() + ls, _ := exp.convertAttributesAndMerge(am, ram) require.Nil(t, ls) }) t.Run("with attribute that matches and the value is an int", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertInt("severity", 0) - - ls, ok := exp.convertAttributesToLabels(am) - require.False(t, ok) + ram := pdata.NewAttributeMap() + ls, _ := exp.convertAttributesAndMerge(am, ram) require.Nil(t, ls) }) t.Run("with attribute that matches and the value is null", func(t *testing.T) { am := pdata.NewAttributeMap() am.InsertNull("severity") - - ls, ok := exp.convertAttributesToLabels(am) - require.False(t, ok) + ram := pdata.NewAttributeMap() + ls, _ := exp.convertAttributesAndMerge(am, ram) require.Nil(t, ls) }) } @@ -502,7 +555,8 @@ func TestExporter_startAlwaysReturnsNil(t *testing.T) { Endpoint: validEndpoint, }, Labels: LabelsConfig{ - Attributes: testValidAttributesWithMapping, + Attributes: testValidAttributesWithMapping, + ResourceAttributes: testValidResourceWithMapping, }, } e, err := newExporter(config, zap.NewNop()) @@ -516,7 +570,8 @@ func TestExporter_stopAlwaysReturnsNil(t *testing.T) { Endpoint: validEndpoint, }, Labels: LabelsConfig{ - Attributes: testValidAttributesWithMapping, + Attributes: testValidAttributesWithMapping, + ResourceAttributes: testValidResourceWithMapping, }, } e, err := newExporter(config, zap.NewNop()) diff --git a/exporter/lokiexporter/factory_test.go b/exporter/lokiexporter/factory_test.go index b1f768f46a50..2261271541ad 100644 --- a/exporter/lokiexporter/factory_test.go +++ b/exporter/lokiexporter/factory_test.go @@ -63,7 +63,8 @@ func TestFactory_CreateLogExporter(t *testing.T) { Endpoint: "http://" + testutil.GetAvailableLocalAddress(t), }, Labels: LabelsConfig{ - Attributes: testValidAttributesWithMapping, + Attributes: testValidAttributesWithMapping, + ResourceAttributes: testValidResourceWithMapping, }, }, shouldError: false, diff --git a/exporter/lokiexporter/testdata/config.yaml b/exporter/lokiexporter/testdata/config.yaml index 4a522e58562f..5c42018ad39b 100644 --- a/exporter/lokiexporter/testdata/config.yaml +++ b/exporter/lokiexporter/testdata/config.yaml @@ -39,9 +39,12 @@ exporters: container.name: "container_name" k8s.cluster.name: "k8s_cluster_name" severity: "severity" + resource: + resource.name: "resource_name" + severity: "severity" service: pipelines: logs: receivers: [ nop ] processors: [ nop ] - exporters: [ loki, loki/allsettings ] + exporters: [ loki, loki/allsettings ] \ No newline at end of file From 071a13c0aeb954043f2ce6ad5d15a386d583e103 Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Tue, 25 May 2021 11:09:18 +0100 Subject: [PATCH 06/22] add format option to loki exporter config --- exporter/lokiexporter/config.go | 2 ++ exporter/lokiexporter/config_test.go | 3 ++- exporter/lokiexporter/exporter.go | 7 ++++++- exporter/lokiexporter/factory.go | 1 + exporter/lokiexporter/testdata/config.yaml | 11 ++++++++++- 5 files changed, 21 insertions(+), 3 deletions(-) diff --git a/exporter/lokiexporter/config.go b/exporter/lokiexporter/config.go index 3adba8ffdf37..aee52591029c 100644 --- a/exporter/lokiexporter/config.go +++ b/exporter/lokiexporter/config.go @@ -36,6 +36,8 @@ type Config struct { // Labels defines how labels should be applied to log streams sent to Loki. Labels LabelsConfig `mapstructure:"labels"` + //Allows you to choose the entry format in the exporter + Format string `mapstructure:"format"` } func (c *Config) validate() error { diff --git a/exporter/lokiexporter/config_test.go b/exporter/lokiexporter/config_test.go index 681a39b08a0d..faa203658302 100644 --- a/exporter/lokiexporter/config_test.go +++ b/exporter/lokiexporter/config_test.go @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) - assert.Equal(t, 2, len(cfg.Exporters)) + assert.Equal(t, 3, len(cfg.Exporters)) actualCfg := cfg.Exporters[config.NewIDWithName(typeStr, "allsettings")].(*Config) expectedCfg := Config{ @@ -87,6 +87,7 @@ func TestLoadConfig(t *testing.T) { "severity": "severity", }, }, + Format: "loki", } require.Equal(t, &expectedCfg, actualCfg) } diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 0c41dc3fce80..2d68c29966c2 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -138,7 +138,12 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n continue } labels := mergedLabels.String() - entry := convertLogToLokiEntry(log) + var entry *logproto.Entry + if l.config.Format == "json" { + entry = convertLogToJsonEntry(log) + } else { + entry = convertLogToLokiEntry(log) + } // entry := convertLogToJsonEntry(log) if stream, ok := streams[labels]; ok { diff --git a/exporter/lokiexporter/factory.go b/exporter/lokiexporter/factory.go index b8a0219fcec2..2e40d40cd9ea 100644 --- a/exporter/lokiexporter/factory.go +++ b/exporter/lokiexporter/factory.go @@ -48,6 +48,7 @@ func createDefaultConfig() config.Exporter { RetrySettings: exporterhelper.DefaultRetrySettings(), QueueSettings: exporterhelper.DefaultQueueSettings(), TenantID: "", + Format: "loki", Labels: LabelsConfig{ Attributes: map[string]string{}, }, diff --git a/exporter/lokiexporter/testdata/config.yaml b/exporter/lokiexporter/testdata/config.yaml index 5c42018ad39b..93cbfe8dd278 100644 --- a/exporter/lokiexporter/testdata/config.yaml +++ b/exporter/lokiexporter/testdata/config.yaml @@ -13,6 +13,15 @@ exporters: container.name: "container_name" k8s.cluster.name: "k8s_cluster_name" severity: "severity" + loki/json: + endpoint: "https://loki:3100/loki/api/v1/push" + tenant_id: "example" + format: "json" + labels: + attributes: + container.name: "container_name" + k8s.cluster.name: "k8s_cluster_name" + severity: "severity" loki/allsettings: endpoint: "https://loki:3100/loki/api/v1/push" tenant_id: "example" @@ -47,4 +56,4 @@ service: logs: receivers: [ nop ] processors: [ nop ] - exporters: [ loki, loki/allsettings ] \ No newline at end of file + exporters: [ loki, loki/allsettings, loki/json ] \ No newline at end of file From bb378184d9ffe1f3c12b9642f399642815fb492d Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Tue, 25 May 2021 11:45:45 +0100 Subject: [PATCH 07/22] add test for loki/json test data --- exporter/lokiexporter/config_test.go | 56 ++++++++++++++++++++++ exporter/lokiexporter/testdata/config.yaml | 4 +- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/exporter/lokiexporter/config_test.go b/exporter/lokiexporter/config_test.go index faa203658302..e2211292fd56 100644 --- a/exporter/lokiexporter/config_test.go +++ b/exporter/lokiexporter/config_test.go @@ -92,6 +92,62 @@ func TestLoadConfig(t *testing.T) { require.Equal(t, &expectedCfg, actualCfg) } +func TestJsonLoadConfig(t *testing.T) { + factories, err := componenttest.NopFactories() + assert.Nil(t, err) + + factory := NewFactory() + factories.Exporters[config.Type(typeStr)] = factory + cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) + + require.NoError(t, err) + require.NotNil(t, cfg) + + assert.Equal(t, 3, len(cfg.Exporters)) + + actualCfg := cfg.Exporters[config.NewIDWithName(typeStr, "json")].(*Config) + expectedCfg := Config{ + ExporterSettings: config.NewExporterSettings(config.NewIDWithName(typeStr, "json")), + HTTPClientSettings: confighttp.HTTPClientSettings{ + Headers: map[string]string{}, + Endpoint: "https://loki:3100/loki/api/v1/push", + TLSSetting: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: "", + CertFile: "", + KeyFile: "", + }, + Insecure: false, + }, + ReadBufferSize: 0, + WriteBufferSize: 524288, + Timeout: 30000000000, + }, + RetrySettings: exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 5000000000, + MaxInterval: 30000000000, + MaxElapsedTime: 300000000000, + }, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 10, + QueueSize: 5000, + }, + TenantID: "example", + Labels: LabelsConfig{ + Attributes: map[string]string{ + conventions.AttributeContainerName: "container_name", + }, + ResourceAttributes: map[string]string{ + "resource.name": "resource_name", + }, + }, + Format: "json", + } + require.Equal(t, &expectedCfg, actualCfg) +} + func TestConfig_validate(t *testing.T) { const validEndpoint = "https://validendpoint.local" diff --git a/exporter/lokiexporter/testdata/config.yaml b/exporter/lokiexporter/testdata/config.yaml index 93cbfe8dd278..430c97013277 100644 --- a/exporter/lokiexporter/testdata/config.yaml +++ b/exporter/lokiexporter/testdata/config.yaml @@ -20,8 +20,8 @@ exporters: labels: attributes: container.name: "container_name" - k8s.cluster.name: "k8s_cluster_name" - severity: "severity" + resource: + resource.name: "resource_name" loki/allsettings: endpoint: "https://loki:3100/loki/api/v1/push" tenant_id: "example" From 728f4bf6dc801a83ddc4ade129b1014a1ead9aa4 Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Fri, 4 Jun 2021 14:34:30 +0100 Subject: [PATCH 08/22] improve variable names and comments --- exporter/lokiexporter/config.go | 2 +- exporter/lokiexporter/encode_json.go | 18 ++++++------------ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/exporter/lokiexporter/config.go b/exporter/lokiexporter/config.go index aee52591029c..3593e91b9ee0 100644 --- a/exporter/lokiexporter/config.go +++ b/exporter/lokiexporter/config.go @@ -36,7 +36,7 @@ type Config struct { // Labels defines how labels should be applied to log streams sent to Loki. Labels LabelsConfig `mapstructure:"labels"` - //Allows you to choose the entry format in the exporter + // Allows you to choose the entry format in the exporter Format string `mapstructure:"format"` } diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go index 97762eed609d..693f02b1503e 100644 --- a/exporter/lokiexporter/encode_json.go +++ b/exporter/lokiexporter/encode_json.go @@ -2,13 +2,12 @@ package lokiexporter import ( "encoding/json" - "fmt" "go.opentelemetry.io/collector/consumer/pdata" tracetranslator "go.opentelemetry.io/collector/translator/trace" ) -type jsonRecord struct { +type lokiEntry struct { Name string `json:"name,omitempty"` Body string `json:"body,omitempty"` TraceID string `json:"traceid,omitempty"` @@ -18,18 +17,13 @@ type jsonRecord struct { } func encodeJSON(lr pdata.LogRecord) string { - // json1 := jsonRecord{lr.Name(), lr.Body().StringVal(), lr.TraceID(), lr.SpanID(), lr.SeverityText()} - var json1 jsonRecord - var json2 []byte + var logRecord lokiEntry + var jsonRecord []byte - json1 = jsonRecord{Name: lr.Name(), Body: lr.Body().StringVal(), TraceID: lr.TraceID().HexString(), SpanID: lr.SpanID().HexString(), Severity: lr.SeverityText(), Attributes: tracetranslator.AttributeMapToMap(lr.Attributes())} - // json = {name: lr.Name(), body: lr.Body().StringVal(), traceid: lr.TraceID().StringVal(), spanid: lr.SpanID().StringVal(), severity: lr.Severity().StringVal(), attributes: {lr.AttributeMap().Get()}} - fmt.Printf("JSON 1: %v", json1) + logRecord = lokiEntry{Name: lr.Name(), Body: lr.Body().StringVal(), TraceID: lr.TraceID().HexString(), SpanID: lr.SpanID().HexString(), Severity: lr.SeverityText(), Attributes: tracetranslator.AttributeMapToMap(lr.Attributes())} - json2, err := json.Marshal(json1) + jsonRecord, err := json.Marshal(logRecord) if err != nil { - fmt.Println("error:", err) } - fmt.Println("JSON 2" + string(json2)) - return string(json2) + return string(jsonRecord) } From bb253b391d7550f232ddbf12e47a4ba551a8b567 Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Thu, 24 Jun 2021 13:48:46 +0100 Subject: [PATCH 09/22] Create feature for loki exporter to encode json --- exporter/lokiexporter/README.md | 2 ++ exporter/lokiexporter/config.go | 16 ---------------- exporter/lokiexporter/config_test.go | 10 +++------- exporter/lokiexporter/encode_json.go | 15 ++++++++++++--- exporter/lokiexporter/encode_json_test.go | 4 ++-- exporter/lokiexporter/exporter.go | 19 ++++++++++++++----- exporter/lokiexporter/exporter_test.go | 4 ++-- exporter/lokiexporter/factory.go | 2 +- exporter/lokiexporter/testdata/config.yaml | 6 +----- 9 files changed, 37 insertions(+), 41 deletions(-) diff --git a/exporter/lokiexporter/README.md b/exporter/lokiexporter/README.md index af4845835f17..749580ee1697 100644 --- a/exporter/lokiexporter/README.md +++ b/exporter/lokiexporter/README.md @@ -44,6 +44,8 @@ The following settings can be optionally configured: - `headers` (no default): Name/value pairs added to the HTTP request headers. +- `format` (default = body): Set the log entry line format. This can be set to 'json' (the entire json encoded log record) or 'body' (the log record body field as a string). + Example: ```yaml diff --git a/exporter/lokiexporter/config.go b/exporter/lokiexporter/config.go index 8df6937e17e0..59d110ded841 100644 --- a/exporter/lokiexporter/config.go +++ b/exporter/lokiexporter/config.go @@ -99,19 +99,3 @@ func (c *LabelsConfig) getAttributes(labels map[string]string) map[string]model. return attributes } - -// getResourceAttributes creates a lookup of allowed attributes to valid Loki label names. -func (c *LabelsConfig) getResourceAttributes() map[string]model.LabelName { - attributes := map[string]model.LabelName{} - - for attrName, lblName := range c.ResourceAttributes { - if len(lblName) > 0 { - attributes[attrName] = model.LabelName(lblName) - continue - } - - attributes[attrName] = model.LabelName(attrName) - } - - return attributes -} diff --git a/exporter/lokiexporter/config_test.go b/exporter/lokiexporter/config_test.go index f981d1b5ee44..fee8106a3a3a 100644 --- a/exporter/lokiexporter/config_test.go +++ b/exporter/lokiexporter/config_test.go @@ -87,7 +87,7 @@ func TestLoadConfig(t *testing.T) { "severity": "severity", }, }, - Format: "loki", + Format: "body", } require.Equal(t, &expectedCfg, actualCfg) } @@ -136,12 +136,8 @@ func TestJsonLoadConfig(t *testing.T) { }, TenantID: "example", Labels: LabelsConfig{ - Attributes: map[string]string{ - conventions.AttributeContainerName: "container_name", - }, - ResourceAttributes: map[string]string{ - "resource.name": "resource_name", - }, + Attributes: map[string]string{}, + ResourceAttributes: map[string]string{}, }, Format: "json", } diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go index 693f02b1503e..76785d46509c 100644 --- a/exporter/lokiexporter/encode_json.go +++ b/exporter/lokiexporter/encode_json.go @@ -7,6 +7,8 @@ import ( tracetranslator "go.opentelemetry.io/collector/translator/trace" ) +// JSON representation of the LogRecord as described by https://developers.google.com/protocol-buffers/docs/proto3#json + type lokiEntry struct { Name string `json:"name,omitempty"` Body string `json:"body,omitempty"` @@ -16,14 +18,21 @@ type lokiEntry struct { Attributes map[string]interface{} `json:"attributes,omitempty"` } -func encodeJSON(lr pdata.LogRecord) string { +func encodeJSON(lr pdata.LogRecord) (string, error) { var logRecord lokiEntry var jsonRecord []byte - logRecord = lokiEntry{Name: lr.Name(), Body: lr.Body().StringVal(), TraceID: lr.TraceID().HexString(), SpanID: lr.SpanID().HexString(), Severity: lr.SeverityText(), Attributes: tracetranslator.AttributeMapToMap(lr.Attributes())} + logRecord = lokiEntry{ + Name: lr.Name(), + Body: lr.Body().StringVal(), + TraceID: lr.TraceID().HexString(), + SpanID: lr.SpanID().HexString(), + Severity: lr.SeverityText(), + Attributes: tracetranslator.AttributeMapToMap(lr.Attributes())} jsonRecord, err := json.Marshal(logRecord) if err != nil { + return "", err } - return string(jsonRecord) + return string(jsonRecord), nil } diff --git a/exporter/lokiexporter/encode_json_test.go b/exporter/lokiexporter/encode_json_test.go index fe0c9d151800..756915312e1c 100644 --- a/exporter/lokiexporter/encode_json_test.go +++ b/exporter/lokiexporter/encode_json_test.go @@ -28,8 +28,8 @@ func exampleJson() string { func TestConvert(t *testing.T) { in := exampleJson() - out := encodeJSON(exampleLog()) + out, err := encodeJSON(exampleLog()) t.Log(in) - t.Log(out) + t.Log(out, err) assert.Equal(t, in, out) } diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index deaf1dc67d6a..e340f6fe8917 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -137,11 +137,17 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n labels := mergedLabels.String() var entry *logproto.Entry if l.config.Format == "json" { - entry = convertLogToJsonEntry(log) + var err error + entry, err = convertLogToJsonEntry(log) + if err != nil { + // Couldn't convert to JSON so dropping log. + numDroppedLogs++ + l.logger.Error("Failed to convert to JSON - Dropping Log", zap.Error(err)) + continue + } } else { entry = convertLogToLokiEntry(log) } - // entry := convertLogToJsonEntry(log) if stream, ok := streams[labels]; ok { stream.Entries = append(stream.Entries, *entry) @@ -208,10 +214,13 @@ func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry { } } -func convertLogToJsonEntry(lr pdata.LogRecord) *logproto.Entry { - line := encodeJSON(lr) +func convertLogToJsonEntry(lr pdata.LogRecord) (*logproto.Entry, error) { + line, err := encodeJSON(lr) + if err != nil { + return nil, err + } return &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), Line: line, - } + }, nil } diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index dcd904ed3788..7bc23cfed38b 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -582,12 +582,12 @@ func TestExporter_convertLogtoJsonEntry(t *testing.T) { lr.Body().SetStringVal("log message") lr.SetTimestamp(ts) - entry := convertLogToJsonEntry(lr) - + entry, err := convertLogToJsonEntry(lr) expEntry := &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), Line: `{"body":"log message"}`, } + require.Nil(t, err) require.NotNil(t, entry) require.Equal(t, expEntry, entry) } diff --git a/exporter/lokiexporter/factory.go b/exporter/lokiexporter/factory.go index b25980f7515f..676a77bb1b1c 100644 --- a/exporter/lokiexporter/factory.go +++ b/exporter/lokiexporter/factory.go @@ -48,7 +48,7 @@ func createDefaultConfig() config.Exporter { RetrySettings: exporterhelper.DefaultRetrySettings(), QueueSettings: exporterhelper.DefaultQueueSettings(), TenantID: "", - Format: "loki", + Format: "body", Labels: LabelsConfig{ Attributes: map[string]string{}, ResourceAttributes: map[string]string{}, diff --git a/exporter/lokiexporter/testdata/config.yaml b/exporter/lokiexporter/testdata/config.yaml index 430c97013277..95884e671fe1 100644 --- a/exporter/lokiexporter/testdata/config.yaml +++ b/exporter/lokiexporter/testdata/config.yaml @@ -17,15 +17,11 @@ exporters: endpoint: "https://loki:3100/loki/api/v1/push" tenant_id: "example" format: "json" - labels: - attributes: - container.name: "container_name" - resource: - resource.name: "resource_name" loki/allsettings: endpoint: "https://loki:3100/loki/api/v1/push" tenant_id: "example" insecure: true + format: "body" ca_file: /var/lib/mycert.pem cert_file: certfile key_file: keyfile From 11d5439eabab353ac46b803879a75b82cc789b27 Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Thu, 22 Jul 2021 16:35:22 +0100 Subject: [PATCH 10/22] mapping convert functions --- exporter/lokiexporter/exporter.go | 39 ++++++++++++++------------ exporter/lokiexporter/exporter_test.go | 2 +- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index e340f6fe8917..1d749f8627bc 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -36,17 +36,24 @@ import ( ) type lokiExporter struct { - config *Config - logger *zap.Logger - client *http.Client - wg sync.WaitGroup + config *Config + logger *zap.Logger + client *http.Client + wg sync.WaitGroup + convert func(pdata.LogRecord) (*logproto.Entry, error) } func newExporter(config *Config, logger *zap.Logger) *lokiExporter { - return &lokiExporter{ + lokiexporter := &lokiExporter{ config: config, logger: logger, } + if config.Format == "json" { + lokiexporter.convert = convertLogToJsonEntry + } else { + lokiexporter.convert = convertLogToLokiEntry + } + return lokiexporter } func (l *lokiExporter) pushLogData(ctx context.Context, ld pdata.Logs) error { @@ -136,17 +143,13 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n } labels := mergedLabels.String() var entry *logproto.Entry - if l.config.Format == "json" { - var err error - entry, err = convertLogToJsonEntry(log) - if err != nil { - // Couldn't convert to JSON so dropping log. - numDroppedLogs++ - l.logger.Error("Failed to convert to JSON - Dropping Log", zap.Error(err)) - continue - } - } else { - entry = convertLogToLokiEntry(log) + var err error + entry, err = l.convert(log) + if err != nil { + // Couldn't convert to JSON so dropping log. + numDroppedLogs++ + l.logger.Error("Failed to convert to JSON - Dropping Log", zap.Error(err)) + continue } if stream, ok := streams[labels]; ok { @@ -207,11 +210,11 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap, return ls } -func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry { +func convertLogToLokiEntry(lr pdata.LogRecord) (*logproto.Entry, error) { return &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), Line: lr.Body().StringVal(), - } + }, nil } func convertLogToJsonEntry(lr pdata.LogRecord) (*logproto.Entry, error) { diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index 7bc23cfed38b..e215035de12d 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -477,7 +477,7 @@ func TestExporter_convertLogToLokiEntry(t *testing.T) { lr.Body().SetStringVal("log message") lr.SetTimestamp(ts) - entry := convertLogToLokiEntry(lr) + entry, _ := convertLogToLokiEntry(lr) expEntry := &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), From fcdb38b5db85d65f9af88ddd3389dc088eb6c538 Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Thu, 29 Jul 2021 11:30:27 +0100 Subject: [PATCH 11/22] Fix linting errors --- exporter/lokiexporter/encode_json_test.go | 4 ++-- exporter/lokiexporter/exporter.go | 4 ++-- exporter/lokiexporter/exporter_test.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/exporter/lokiexporter/encode_json_test.go b/exporter/lokiexporter/encode_json_test.go index 756915312e1c..f5147ca5db9b 100644 --- a/exporter/lokiexporter/encode_json_test.go +++ b/exporter/lokiexporter/encode_json_test.go @@ -21,13 +21,13 @@ func exampleLog() pdata.LogRecord { return buffer } -func exampleJson() string { +func exampleJSON() string { jsonExample := `{"name":"name","body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"}}` return jsonExample } func TestConvert(t *testing.T) { - in := exampleJson() + in := exampleJSON() out, err := encodeJSON(exampleLog()) t.Log(in) t.Log(out, err) diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 1d749f8627bc..f9148deb6448 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -49,7 +49,7 @@ func newExporter(config *Config, logger *zap.Logger) *lokiExporter { logger: logger, } if config.Format == "json" { - lokiexporter.convert = convertLogToJsonEntry + lokiexporter.convert = convertLogToJSONEntry } else { lokiexporter.convert = convertLogToLokiEntry } @@ -217,7 +217,7 @@ func convertLogToLokiEntry(lr pdata.LogRecord) (*logproto.Entry, error) { }, nil } -func convertLogToJsonEntry(lr pdata.LogRecord) (*logproto.Entry, error) { +func convertLogToJSONEntry(lr pdata.LogRecord) (*logproto.Entry, error) { line, err := encodeJSON(lr) if err != nil { return nil, err diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index e215035de12d..21376e75616d 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -576,13 +576,13 @@ func TestExporter_stopAlwaysReturnsNil(t *testing.T) { require.NoError(t, exp.stop(context.Background())) } -func TestExporter_convertLogtoJsonEntry(t *testing.T) { +func TestExporter_convertLogtoJSONEntry(t *testing.T) { ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds()) lr := pdata.NewLogRecord() lr.Body().SetStringVal("log message") lr.SetTimestamp(ts) - entry, err := convertLogToJsonEntry(lr) + entry, err := convertLogToJSONEntry(lr) expEntry := &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), Line: `{"body":"log message"}`, From ebdcccb873a6ed380a8d40c41b5933f730d199c3 Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Thu, 29 Jul 2021 12:52:40 +0100 Subject: [PATCH 12/22] update pdata package --- exporter/lokiexporter/encode_json.go | 2 +- exporter/lokiexporter/encode_json_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go index 76785d46509c..a7cd7c6ef362 100644 --- a/exporter/lokiexporter/encode_json.go +++ b/exporter/lokiexporter/encode_json.go @@ -3,7 +3,7 @@ package lokiexporter import ( "encoding/json" - "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/model/pdata" tracetranslator "go.opentelemetry.io/collector/translator/trace" ) diff --git a/exporter/lokiexporter/encode_json_test.go b/exporter/lokiexporter/encode_json_test.go index f5147ca5db9b..c93105453cbb 100644 --- a/exporter/lokiexporter/encode_json_test.go +++ b/exporter/lokiexporter/encode_json_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/model/pdata" ) func exampleLog() pdata.LogRecord { From e80de58da6f20e13325133b5e94795db7debbc00 Mon Sep 17 00:00:00 2001 From: Shaun Creary Date: Thu, 23 Sep 2021 15:27:29 +0100 Subject: [PATCH 13/22] update to use otel 0.36 --- exporter/lokiexporter/encode_json.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go index a7cd7c6ef362..9bd9ce2411d3 100644 --- a/exporter/lokiexporter/encode_json.go +++ b/exporter/lokiexporter/encode_json.go @@ -4,7 +4,6 @@ import ( "encoding/json" "go.opentelemetry.io/collector/model/pdata" - tracetranslator "go.opentelemetry.io/collector/translator/trace" ) // JSON representation of the LogRecord as described by https://developers.google.com/protocol-buffers/docs/proto3#json @@ -28,7 +27,7 @@ func encodeJSON(lr pdata.LogRecord) (string, error) { TraceID: lr.TraceID().HexString(), SpanID: lr.SpanID().HexString(), Severity: lr.SeverityText(), - Attributes: tracetranslator.AttributeMapToMap(lr.Attributes())} + Attributes: lr.Attributes().AsRaw()} jsonRecord, err := json.Marshal(logRecord) if err != nil { From 3139761094214eb285070b1fd6d04f6f6d91aa46 Mon Sep 17 00:00:00 2001 From: Guillaume Gill Date: Thu, 21 Oct 2021 00:38:41 +0200 Subject: [PATCH 14/22] Add resources in encoded json structure --- exporter/lokiexporter/config_test.go | 4 ++-- exporter/lokiexporter/encode_json.go | 7 +++++-- exporter/lokiexporter/encode_json_test.go | 9 ++++++--- exporter/lokiexporter/example/docker-compose.yml | 2 -- .../lokiexporter/example/otel-collector-config.yml | 8 ++++++++ exporter/lokiexporter/exporter.go | 10 +++++----- exporter/lokiexporter/exporter_test.go | 10 +++++++--- 7 files changed, 33 insertions(+), 17 deletions(-) diff --git a/exporter/lokiexporter/config_test.go b/exporter/lokiexporter/config_test.go index f5c892b0f365..a5ae10b680c1 100644 --- a/exporter/lokiexporter/config_test.go +++ b/exporter/lokiexporter/config_test.go @@ -105,9 +105,9 @@ func TestJsonLoadConfig(t *testing.T) { assert.Equal(t, 3, len(cfg.Exporters)) - actualCfg := cfg.Exporters[config.NewIDWithName(typeStr, "json")].(*Config) + actualCfg := cfg.Exporters[config.NewComponentIDWithName(typeStr, "json")].(*Config) expectedCfg := Config{ - ExporterSettings: config.NewExporterSettings(config.NewIDWithName(typeStr, "json")), + ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "json")), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: map[string]string{}, Endpoint: "https://loki:3100/loki/api/v1/push", diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go index 9bd9ce2411d3..2fcda794f734 100644 --- a/exporter/lokiexporter/encode_json.go +++ b/exporter/lokiexporter/encode_json.go @@ -15,9 +15,10 @@ type lokiEntry struct { SpanID string `json:"spanid,omitempty"` Severity string `json:"severity,omitempty"` Attributes map[string]interface{} `json:"attributes,omitempty"` + Resources map[string]interface{} `json:"resources,omitempty"` } -func encodeJSON(lr pdata.LogRecord) (string, error) { +func encodeJSON(lr pdata.LogRecord, res pdata.Resource) (string, error) { var logRecord lokiEntry var jsonRecord []byte @@ -27,7 +28,9 @@ func encodeJSON(lr pdata.LogRecord) (string, error) { TraceID: lr.TraceID().HexString(), SpanID: lr.SpanID().HexString(), Severity: lr.SeverityText(), - Attributes: lr.Attributes().AsRaw()} + Attributes: lr.Attributes().AsRaw(), + Resources: res.Attributes().AsRaw(), + } jsonRecord, err := json.Marshal(logRecord) if err != nil { diff --git a/exporter/lokiexporter/encode_json_test.go b/exporter/lokiexporter/encode_json_test.go index c93105453cbb..742b6a9ef7d6 100644 --- a/exporter/lokiexporter/encode_json_test.go +++ b/exporter/lokiexporter/encode_json_test.go @@ -7,7 +7,7 @@ import ( "go.opentelemetry.io/collector/model/pdata" ) -func exampleLog() pdata.LogRecord { +func exampleLog() (pdata.LogRecord, pdata.Resource) { buffer := pdata.NewLogRecord() buffer.Body().SetStringVal("Example log") @@ -18,11 +18,14 @@ func exampleLog() pdata.LogRecord { buffer.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4})) buffer.SetSpanID(pdata.NewSpanID([8]byte{5, 6, 7, 8})) - return buffer + resource := pdata.NewResource() + resource.Attributes().Insert("host.name", pdata.NewAttributeValueString("something")) + + return buffer, resource } func exampleJSON() string { - jsonExample := `{"name":"name","body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"}}` + jsonExample := `{"name":"name","body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"},"resources":{"host.name":"something"}}` return jsonExample } diff --git a/exporter/lokiexporter/example/docker-compose.yml b/exporter/lokiexporter/example/docker-compose.yml index cf67afdae94f..115ca13ce59c 100644 --- a/exporter/lokiexporter/example/docker-compose.yml +++ b/exporter/lokiexporter/example/docker-compose.yml @@ -13,8 +13,6 @@ services: container_name: otel command: - "--config=/etc/otel-collector-config.yml" - # Memory Ballast size should be max 1/3 to 1/2 of memory. - - "--mem-ballast-size-mib=683" volumes: - ./otel-collector-config.yml:/etc/otel-collector-config.yml ports: diff --git a/exporter/lokiexporter/example/otel-collector-config.yml b/exporter/lokiexporter/example/otel-collector-config.yml index 63ebda9bca8e..a605485449ec 100644 --- a/exporter/lokiexporter/example/otel-collector-config.yml +++ b/exporter/lokiexporter/example/otel-collector-config.yml @@ -24,15 +24,23 @@ processors: exporters: loki: endpoint: "http://loki:3100/loki/api/v1/push" + # Will encode the whole OTEL message in json + # This format happen after extracting labels + format: json labels: attributes: container_name: "" source: "" + resources: + host.name: "hostname" extensions: health_check: pprof: zpages: + memory_ballast: + # Memory Ballast size should be max 1/3 to 1/2 of memory. + size_mib: 64 service: extensions: [pprof, zpages, health_check] diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 9f65e722fe0f..b031e8fdf9a8 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -40,7 +40,7 @@ type lokiExporter struct { logger *zap.Logger client *http.Client wg sync.WaitGroup - convert func(pdata.LogRecord) (*logproto.Entry, error) + convert func(pdata.LogRecord, pdata.Resource) (*logproto.Entry, error) } func newExporter(config *Config, logger *zap.Logger) *lokiExporter { @@ -144,7 +144,7 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n labels := mergedLabels.String() var entry *logproto.Entry var err error - entry, err = l.convert(log) + entry, err = l.convert(log, resource) if err != nil { // Couldn't convert to JSON so dropping log. numDroppedLogs++ @@ -210,15 +210,15 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap, return ls } -func convertLogToLokiEntry(lr pdata.LogRecord) (*logproto.Entry, error) { +func convertLogToLokiEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) { return &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), Line: lr.Body().StringVal(), }, nil } -func convertLogToJSONEntry(lr pdata.LogRecord) (*logproto.Entry, error) { - line, err := encodeJSON(lr) +func convertLogToJSONEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) { + line, err := encodeJSON(lr, res) if err != nil { return nil, err } diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index 77bc03f75cba..5ad8183981d8 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -477,8 +477,10 @@ func TestExporter_convertLogToLokiEntry(t *testing.T) { lr := pdata.NewLogRecord() lr.Body().SetStringVal("log message") lr.SetTimestamp(ts) + res := pdata.NewResource() + res.Attributes().Insert("host.name", pdata.NewAttributeValueString("something")) - entry, _ := convertLogToLokiEntry(lr) + entry, _ := convertLogToLokiEntry(lr, res) expEntry := &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), @@ -582,11 +584,13 @@ func TestExporter_convertLogtoJSONEntry(t *testing.T) { lr := pdata.NewLogRecord() lr.Body().SetStringVal("log message") lr.SetTimestamp(ts) + res := pdata.NewResource() + res.Attributes().Insert("host.name", pdata.NewAttributeValueString("something")) - entry, err := convertLogToJSONEntry(lr) + entry, err := convertLogToJSONEntry(lr, res) expEntry := &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), - Line: `{"body":"log message"}`, + Line: `{"body":"log message","resources":{"host.name":"something"}}`, } require.Nil(t, err) require.NotNil(t, entry) From 7fd92f5eba23f074c9236b44db5252d4dbbefe40 Mon Sep 17 00:00:00 2001 From: Guillaume Gill Date: Thu, 21 Oct 2021 14:06:33 +0200 Subject: [PATCH 15/22] Add missing licence --- exporter/lokiexporter/encode_json.go | 14 ++++++++++++++ exporter/lokiexporter/encode_json_test.go | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go index 2fcda794f734..a946d61bbae1 100644 --- a/exporter/lokiexporter/encode_json.go +++ b/exporter/lokiexporter/encode_json.go @@ -1,3 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package lokiexporter import ( diff --git a/exporter/lokiexporter/encode_json_test.go b/exporter/lokiexporter/encode_json_test.go index 742b6a9ef7d6..82a0fea93d5e 100644 --- a/exporter/lokiexporter/encode_json_test.go +++ b/exporter/lokiexporter/encode_json_test.go @@ -1,3 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package lokiexporter import ( From a8550932c32479214b76ead74d86c5873aa05cca Mon Sep 17 00:00:00 2001 From: Guillaume GILL Date: Tue, 26 Oct 2021 00:15:08 +0200 Subject: [PATCH 16/22] Fix review --- exporter/lokiexporter/README.md | 2 +- exporter/lokiexporter/config_test.go | 10 +++++----- exporter/lokiexporter/exporter.go | 6 +++--- exporter/lokiexporter/exporter_test.go | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/exporter/lokiexporter/README.md b/exporter/lokiexporter/README.md index 7c531d78059a..d63251a9edcf 100644 --- a/exporter/lokiexporter/README.md +++ b/exporter/lokiexporter/README.md @@ -44,7 +44,7 @@ The following settings can be optionally configured: - `headers` (no default): Name/value pairs added to the HTTP request headers. -- `format` (default = body): Set the log entry line format. This can be set to 'json' (the entire json encoded log record) or 'body' (the log record body field as a string). +- `format` (default = body): Set the log entry line format. This can be set to 'json' (the entire JSON encoded log record) or 'body' (the log record body field as a string). Example: diff --git a/exporter/lokiexporter/config_test.go b/exporter/lokiexporter/config_test.go index a5ae10b680c1..57746e19c64f 100644 --- a/exporter/lokiexporter/config_test.go +++ b/exporter/lokiexporter/config_test.go @@ -92,7 +92,7 @@ func TestLoadConfig(t *testing.T) { require.Equal(t, &expectedCfg, actualCfg) } -func TestJsonLoadConfig(t *testing.T) { +func TestJSONLoadConfig(t *testing.T) { factories, err := componenttest.NopFactories() assert.Nil(t, err) @@ -121,13 +121,13 @@ func TestJsonLoadConfig(t *testing.T) { }, ReadBufferSize: 0, WriteBufferSize: 524288, - Timeout: 30000000000, + Timeout: time.Second * 10, }, RetrySettings: exporterhelper.RetrySettings{ Enabled: true, - InitialInterval: 5000000000, - MaxInterval: 30000000000, - MaxElapsedTime: 300000000000, + InitialInterval: 10 * time.Second, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, }, QueueSettings: exporterhelper.QueueSettings{ Enabled: true, diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index b031e8fdf9a8..581febb52db9 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -51,7 +51,7 @@ func newExporter(config *Config, logger *zap.Logger) *lokiExporter { if config.Format == "json" { lokiexporter.convert = convertLogToJSONEntry } else { - lokiexporter.convert = convertLogToLokiEntry + lokiexporter.convert = convertLogBodyToEntry } return lokiexporter } @@ -148,7 +148,7 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n if err != nil { // Couldn't convert to JSON so dropping log. numDroppedLogs++ - l.logger.Error("Failed to convert to JSON - Dropping Log", zap.Error(err)) + l.logger.Error("failed to convert, dropping log", zap.String("format", l.config.Format), zap.Error(err)) continue } @@ -210,7 +210,7 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap, return ls } -func convertLogToLokiEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) { +func convertLogBodyToEntry(lr pdata.LogRecord, res pdata.Resource) (*logproto.Entry, error) { return &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), Line: lr.Body().StringVal(), diff --git a/exporter/lokiexporter/exporter_test.go b/exporter/lokiexporter/exporter_test.go index 5ad8183981d8..2d7cb7218cb5 100644 --- a/exporter/lokiexporter/exporter_test.go +++ b/exporter/lokiexporter/exporter_test.go @@ -472,7 +472,7 @@ func TestExporter_convertAttributesToLabels(t *testing.T) { }) } -func TestExporter_convertLogToLokiEntry(t *testing.T) { +func TestExporter_convertLogBodyToEntry(t *testing.T) { ts := pdata.Timestamp(int64(1) * time.Millisecond.Nanoseconds()) lr := pdata.NewLogRecord() lr.Body().SetStringVal("log message") @@ -480,7 +480,7 @@ func TestExporter_convertLogToLokiEntry(t *testing.T) { res := pdata.NewResource() res.Attributes().Insert("host.name", pdata.NewAttributeValueString("something")) - entry, _ := convertLogToLokiEntry(lr, res) + entry, _ := convertLogBodyToEntry(lr, res) expEntry := &logproto.Entry{ Timestamp: time.Unix(0, int64(lr.Timestamp())), From 30edcbed4fe842b498fca5a697a492c4811168f3 Mon Sep 17 00:00:00 2001 From: Guillaume GILL Date: Tue, 26 Oct 2021 22:47:24 +0200 Subject: [PATCH 17/22] Group similar errors --- exporter/lokiexporter/exporter.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index 581febb52db9..d21149cb221d 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -17,6 +17,7 @@ package lokiexporter import ( "bytes" "context" + "errors" "fmt" "io" "io/ioutil" @@ -30,6 +31,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter/internal/third_party/loki/logproto" @@ -126,6 +128,8 @@ func (l *lokiExporter) stop(context.Context) (err error) { } func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, numDroppedLogs int) { + var errs error + streams := make(map[string]*logproto.Stream) rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { @@ -146,9 +150,18 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n var err error entry, err = l.convert(log, resource) if err != nil { - // Couldn't convert to JSON so dropping log. + // Couldn't convert so dropping log. numDroppedLogs++ - l.logger.Error("failed to convert, dropping log", zap.String("format", l.config.Format), zap.Error(err)) + errs = multierr.Append( + errs, + errors.New( + fmt.Sprint( + "failed to convert, dropping log", + zap.String("format", l.config.Format), + zap.Error(err), + ), + ), + ) continue } @@ -165,6 +178,8 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n } } + l.logger.Error("some logs has been dropped", zap.Error(errs)) + pr = &logproto.PushRequest{ Streams: make([]logproto.Stream, len(streams)), } From 0edc8fb3a5f5142d24daa9e444fd87e57cdfd305 Mon Sep 17 00:00:00 2001 From: Guillaume GILL Date: Sat, 4 Dec 2021 15:10:40 +0100 Subject: [PATCH 18/22] Fix serialization --- exporter/lokiexporter/encode_json.go | 25 +++++++++++++++++++++++-- exporter/lokiexporter/exporter.go | 4 +++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go index a946d61bbae1..2b014c030a77 100644 --- a/exporter/lokiexporter/encode_json.go +++ b/exporter/lokiexporter/encode_json.go @@ -16,6 +16,7 @@ package lokiexporter import ( "encoding/json" + "fmt" "go.opentelemetry.io/collector/model/pdata" ) @@ -32,21 +33,41 @@ type lokiEntry struct { Resources map[string]interface{} `json:"resources,omitempty"` } +func serializeBody(body pdata.AttributeValue) (string, error) { + str := "" + var err error + switch body.Type() { + case pdata.AttributeValueTypeString: + str = body.StringVal() + case pdata.AttributeValueTypeMap: + str = "" + err = fmt.Errorf("Unsuported body type to serialize") + } + return str, err +} + func encodeJSON(lr pdata.LogRecord, res pdata.Resource) (string, error) { var logRecord lokiEntry var jsonRecord []byte + var err error + var body string + body, err = serializeBody(lr.Body()) + if err != nil { + return "", err + } logRecord = lokiEntry{ Name: lr.Name(), - Body: lr.Body().StringVal(), + Body: body, TraceID: lr.TraceID().HexString(), SpanID: lr.SpanID().HexString(), Severity: lr.SeverityText(), Attributes: lr.Attributes().AsRaw(), Resources: res.Attributes().AsRaw(), } + lr.Body().Type() - jsonRecord, err := json.Marshal(logRecord) + jsonRecord, err = json.Marshal(logRecord) if err != nil { return "", err } diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index d21149cb221d..cb42b8d9e98f 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -178,7 +178,9 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n } } - l.logger.Error("some logs has been dropped", zap.Error(errs)) + if errs != nil { + l.logger.Error("some logs has been dropped", zap.Error(errs)) + } pr = &logproto.PushRequest{ Streams: make([]logproto.Stream, len(streams)), From 42d664c9b97567d501a0b1a52b5352d6f1015fd9 Mon Sep 17 00:00:00 2001 From: Guillaume GILL Date: Sat, 4 Dec 2021 15:49:11 +0100 Subject: [PATCH 19/22] Add a test for unsuported body type --- exporter/lokiexporter/encode_json.go | 8 +++----- exporter/lokiexporter/encode_json_test.go | 16 +++++++++++++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go index 2b014c030a77..94fc2b7fb24d 100644 --- a/exporter/lokiexporter/encode_json.go +++ b/exporter/lokiexporter/encode_json.go @@ -36,12 +36,10 @@ type lokiEntry struct { func serializeBody(body pdata.AttributeValue) (string, error) { str := "" var err error - switch body.Type() { - case pdata.AttributeValueTypeString: + if body.Type() == pdata.AttributeValueTypeString { str = body.StringVal() - case pdata.AttributeValueTypeMap: - str = "" - err = fmt.Errorf("Unsuported body type to serialize") + } else { + err = fmt.Errorf("unsuported body type to serialize") } return str, err } diff --git a/exporter/lokiexporter/encode_json_test.go b/exporter/lokiexporter/encode_json_test.go index 82a0fea93d5e..d54f234bb185 100644 --- a/exporter/lokiexporter/encode_json_test.go +++ b/exporter/lokiexporter/encode_json_test.go @@ -43,10 +43,24 @@ func exampleJSON() string { return jsonExample } -func TestConvert(t *testing.T) { +func TestConvertString(t *testing.T) { in := exampleJSON() out, err := encodeJSON(exampleLog()) t.Log(in) t.Log(out, err) assert.Equal(t, in, out) } + +func TestConvertNonString(t *testing.T) { + in := exampleJSON() + log, resource := exampleLog() + mapVal := pdata.NewAttributeValueMap() + mapVal.MapVal().Insert("key1", pdata.NewAttributeValueString("value")) + mapVal.MapVal().Insert("key2", pdata.NewAttributeValueString("value")) + mapVal.CopyTo(log.Body()) + + out, err := encodeJSON(log, resource) + t.Log(in) + t.Log(out, err) + assert.EqualError(t, err, "unsuported body type to serialize") +} From f2f50d64ffe9f28fd39ce2ffef7521a7c7422d53 Mon Sep 17 00:00:00 2001 From: Guillaume GILL Date: Sat, 4 Dec 2021 16:18:56 +0100 Subject: [PATCH 20/22] Fix tests --- exporter/lokiexporter/config_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/exporter/lokiexporter/config_test.go b/exporter/lokiexporter/config_test.go index 57746e19c64f..3dc7d40069d1 100644 --- a/exporter/lokiexporter/config_test.go +++ b/exporter/lokiexporter/config_test.go @@ -121,13 +121,13 @@ func TestJSONLoadConfig(t *testing.T) { }, ReadBufferSize: 0, WriteBufferSize: 524288, - Timeout: time.Second * 10, + Timeout: time.Second * 30, }, RetrySettings: exporterhelper.RetrySettings{ Enabled: true, - InitialInterval: 10 * time.Second, - MaxInterval: 1 * time.Minute, - MaxElapsedTime: 10 * time.Minute, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, }, QueueSettings: exporterhelper.QueueSettings{ Enabled: true, From 3d11fb394b70828eab5124ef87cd348624a7bcd4 Mon Sep 17 00:00:00 2001 From: Guillaume GILL Date: Mon, 6 Dec 2021 13:57:15 +0100 Subject: [PATCH 21/22] Fix lint --- exporter/lokiexporter/encode_json.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/lokiexporter/encode_json.go b/exporter/lokiexporter/encode_json.go index 94fc2b7fb24d..5c09a5e2dee6 100644 --- a/exporter/lokiexporter/encode_json.go +++ b/exporter/lokiexporter/encode_json.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package lokiexporter +package lokiexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter" import ( "encoding/json" From 119a6a69645694397d106349d4cd6d1f5459f718 Mon Sep 17 00:00:00 2001 From: Guillaume GILL Date: Mon, 6 Dec 2021 18:05:49 +0100 Subject: [PATCH 22/22] Change error level to debug for dropped logs to avoid flood, keeping a trace of the reason --- exporter/lokiexporter/exporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/lokiexporter/exporter.go b/exporter/lokiexporter/exporter.go index e67b77130bfb..ce2eae4aab1e 100644 --- a/exporter/lokiexporter/exporter.go +++ b/exporter/lokiexporter/exporter.go @@ -179,7 +179,7 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n } if errs != nil { - l.logger.Error("some logs has been dropped", zap.Error(errs)) + l.logger.Debug("some logs has been dropped", zap.Error(errs)) } pr = &logproto.PushRequest{