diff --git a/.chloggen/receiver-otlpjson-token-attr.yaml b/.chloggen/receiver-otlpjson-token-attr.yaml new file mode 100644 index 000000000000..fb44f06e8959 --- /dev/null +++ b/.chloggen/receiver-otlpjson-token-attr.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otlpjsonfilereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Parse token.attributes and append it to the log record + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36641] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/otlpjsonfilereceiver/file.go b/receiver/otlpjsonfilereceiver/file.go index c5a049dd271a..5a88e56f6248 100644 --- a/receiver/otlpjsonfilereceiver/file.go +++ b/receiver/otlpjsonfilereceiver/file.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pprofile" @@ -87,6 +88,17 @@ func createLogsReceiver(_ context.Context, settings receiver.Settings, configura ctx = obsrecv.StartLogsOp(ctx) var l plog.Logs l, err = logsUnmarshaler.UnmarshalLogs(token.Body) + // Appends token.Attributes + for i := 0; i < l.ResourceLogs().Len(); i++ { + resourceLog := l.ResourceLogs().At(i) + for j := 0; j < resourceLog.ScopeLogs().Len(); j++ { + scopeLog := resourceLog.ScopeLogs().At(j) + for k := 0; k < scopeLog.LogRecords().Len(); k++ { + LogRecords := scopeLog.LogRecords().At(k) + appendToMap(token, LogRecords.Attributes()) + } + } + } if err != nil { obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, err) } else { @@ -124,6 +136,17 @@ func createMetricsReceiver(_ context.Context, settings receiver.Settings, config ctx = obsrecv.StartMetricsOp(ctx) var m pmetric.Metrics m, err = metricsUnmarshaler.UnmarshalMetrics(token.Body) + // Appends token.Attributes + for i := 0; i < m.ResourceMetrics().Len(); i++ { + resourceMetric := m.ResourceMetrics().At(i) + for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ { + ScopeMetric := resourceMetric.ScopeMetrics().At(j) + for k := 0; k < ScopeMetric.Metrics().Len(); k++ { + metric := ScopeMetric.Metrics().At(k) + appendToMap(token, metric.Metadata()) + } + } + } if err != nil { obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, err) } else { @@ -160,6 +183,17 @@ func createTracesReceiver(_ context.Context, settings receiver.Settings, configu ctx = obsrecv.StartTracesOp(ctx) var t ptrace.Traces t, err = tracesUnmarshaler.UnmarshalTraces(token.Body) + // Appends token.Attributes + for i := 0; i < t.ResourceSpans().Len(); i++ { + resourceSpan := t.ResourceSpans().At(i) + for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ { + scopeSpan := resourceSpan.ScopeSpans().At(j) + for k := 0; k < scopeSpan.Spans().Len(); k++ { + spans := scopeSpan.Spans().At(k) + appendToMap(token, spans.Attributes()) + } + } + } if err != nil { obsrecv.EndTracesOp(ctx, metadata.Type.String(), 0, err) } else { @@ -186,6 +220,17 @@ func createProfilesReceiver(_ context.Context, settings receiver.Settings, confi } input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error { p, _ := profilesUnmarshaler.UnmarshalProfiles(token.Body) + // Appends token.Attributes + for i := 0; i < p.ResourceProfiles().Len(); i++ { + resourceProfile := p.ResourceProfiles().At(i) + for j := 0; j < resourceProfile.ScopeProfiles().Len(); j++ { + scopeProfile := resourceProfile.ScopeProfiles().At(j) + for k := 0; k < scopeProfile.Profiles().Len(); k++ { + profile := scopeProfile.Profiles().At(k) + appendToMap(token, profile.Attributes()) + } + } + } if p.ResourceProfiles().Len() != 0 { _ = profiles.ConsumeProfiles(ctx, p) } @@ -197,3 +242,18 @@ func createProfilesReceiver(_ context.Context, settings receiver.Settings, confi return &otlpjsonfilereceiver{input: input, id: settings.ID, storageID: cfg.StorageID}, nil } + +func appendToMap(token emit.Token, attr pcommon.Map) { + for key, value := range token.Attributes { + switch v := value.(type) { + case string: + attr.PutStr(key, v) + case int: + attr.PutInt(key, int64(v)) + case float64: + attr.PutDouble(key, float64(v)) + case bool: + attr.PutBool(key, v) + } + } +} diff --git a/receiver/otlpjsonfilereceiver/file_test.go b/receiver/otlpjsonfilereceiver/file_test.go index 582c1dcb1c8e..3870632f38db 100644 --- a/receiver/otlpjsonfilereceiver/file_test.go +++ b/receiver/otlpjsonfilereceiver/file_test.go @@ -49,7 +49,7 @@ func TestFileProfilesReceiver(t *testing.T) { err = receiver.Start(context.Background(), nil) require.NoError(t, err) - pd := testdata.GenerateProfiles(5) + pd := testdata.GenerateProfiles(1) marshaler := &pprofile.JSONMarshaler{} b, err := marshaler.MarshalProfiles(pd) assert.NoError(t, err) @@ -58,6 +58,9 @@ func TestFileProfilesReceiver(t *testing.T) { assert.NoError(t, err) time.Sleep(1 * time.Second) + // include_file_name is true by default + pd.ResourceProfiles().At(0).ScopeProfiles().At(0).Profiles().At(0).Attributes().PutStr("log.file.name", "profiles.json") + require.Len(t, sink.AllProfiles(), 1) assert.EqualValues(t, pd, sink.AllProfiles()[0]) err = receiver.Shutdown(context.Background()) @@ -76,7 +79,7 @@ func TestFileTracesReceiver(t *testing.T) { err = receiver.Start(context.Background(), nil) require.NoError(t, err) - td := testdata.GenerateTraces(2) + td := testdata.GenerateTraces(1) marshaler := &ptrace.JSONMarshaler{} b, err := marshaler.MarshalTraces(td) assert.NoError(t, err) @@ -85,6 +88,9 @@ func TestFileTracesReceiver(t *testing.T) { assert.NoError(t, err) time.Sleep(1 * time.Second) + // include_file_name is true by default + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("log.file.name", "traces.json") + require.Len(t, sink.AllTraces(), 1) assert.EqualValues(t, td, sink.AllTraces()[0]) err = receiver.Shutdown(context.Background()) @@ -103,7 +109,7 @@ func TestFileMetricsReceiver(t *testing.T) { err = receiver.Start(context.Background(), nil) assert.NoError(t, err) - md := testdata.GenerateMetrics(5) + md := testdata.GenerateMetrics(1) marshaler := &pmetric.JSONMarshaler{} b, err := marshaler.MarshalMetrics(md) assert.NoError(t, err) @@ -112,6 +118,9 @@ func TestFileMetricsReceiver(t *testing.T) { assert.NoError(t, err) time.Sleep(1 * time.Second) + // include_file_name is true by default + md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Metadata().PutStr("log.file.name", "metrics.json") + require.Len(t, sink.AllMetrics(), 1) assert.EqualValues(t, md, sink.AllMetrics()[0]) err = receiver.Shutdown(context.Background()) @@ -126,6 +135,7 @@ func TestFileMetricsReceiverWithReplay(t *testing.T) { cfg.Config.StartAt = "beginning" cfg.ReplayFile = true cfg.Config.PollInterval = 5 * time.Second + cfg.IncludeFileName = false sink := new(consumertest.MetricsSink) receiver, err := factory.CreateMetrics(context.Background(), receivertest.NewNopSettings(), cfg, sink) @@ -168,7 +178,7 @@ func TestFileLogsReceiver(t *testing.T) { err = receiver.Start(context.Background(), nil) assert.NoError(t, err) - ld := testdata.GenerateLogs(5) + ld := testdata.GenerateLogs(1) marshaler := &plog.JSONMarshaler{} b, err := marshaler.MarshalLogs(ld) assert.NoError(t, err) @@ -177,6 +187,9 @@ func TestFileLogsReceiver(t *testing.T) { assert.NoError(t, err) time.Sleep(1 * time.Second) + // include_file_name is true by default + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("log.file.name", "logs.json") + require.Len(t, sink.AllLogs(), 1) assert.EqualValues(t, ld, sink.AllLogs()[0]) err = receiver.Shutdown(context.Background()) @@ -226,6 +239,7 @@ func TestFileMixedSignals(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Config.Include = []string{filepath.Join(tempFolder, "*")} cfg.Config.StartAt = "beginning" + cfg.IncludeFileName = false cs := receivertest.NewNopSettings() ms := new(consumertest.MetricsSink) mr, err := factory.CreateMetrics(context.Background(), cs, cfg, ms)