diff --git a/.chloggen/add-tracing-information-to-all-components.yaml b/.chloggen/add-tracing-information-to-all-components.yaml new file mode 100644 index 00000000000..42281b19b8e --- /dev/null +++ b/.chloggen/add-tracing-information-to-all-components.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: consumer + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add tracing information to all the components" + +# One or more tracking issues or pull requests related to the change +issues: [8804] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/consumer/consumer.go b/consumer/consumer.go index b1b588a85c0..8e45a12af77 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -12,6 +12,9 @@ import ( // Capabilities describes the capabilities of a Processor. type Capabilities = internal.Capabilities +// ObsReport describes the observability report of a consumer. +type ObsReport = internal.ObsReport + var errNilFunc = errors.New("nil consumer func") // Option to construct new consumers. @@ -24,3 +27,9 @@ func WithCapabilities(capabilities Capabilities) Option { o.Cap = capabilities }) } + +func WithObsReport(report ObsReport) Option { + return func(o *internal.BaseImpl) { + o.ObsReport = report + } +} diff --git a/consumer/internal/consumer.go b/consumer/internal/consumer.go index 45c90dd4341..febfc8bd9ae 100644 --- a/consumer/internal/consumer.go +++ b/consumer/internal/consumer.go @@ -18,7 +18,8 @@ type BaseConsumer interface { } type BaseImpl struct { - Cap Capabilities + Cap Capabilities + ObsReport ObsReport } // Option to construct new consumers. @@ -39,7 +40,8 @@ func (bs BaseImpl) Capabilities() Capabilities { func NewBaseImpl(options ...Option) *BaseImpl { bs := &BaseImpl{ - Cap: Capabilities{MutatesData: false}, + Cap: Capabilities{MutatesData: false}, + ObsReport: noopObsReport, } for _, op := range options { diff --git a/consumer/internal/obsreport.go b/consumer/internal/obsreport.go new file mode 100644 index 00000000000..ab7d9927dfb --- /dev/null +++ b/consumer/internal/obsreport.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/consumer/internal" + +import "context" + +// ObsReport contains information required to make an implementor +// of Consumer observable. +type ObsReport interface { + StartTracesOp(context.Context) context.Context + EndTracesOp(context.Context, int, error) +} + +type baseObsReport struct{} + +func (bor baseObsReport) StartTracesOp(ctx context.Context) context.Context { + return ctx +} + +func (bor baseObsReport) EndTracesOp(_ context.Context, _ int, _ error) {} + +var noopObsReport = baseObsReport{} diff --git a/consumer/logs.go b/consumer/logs.go index 15166ef1196..80e1c969432 100644 --- a/consumer/logs.go +++ b/consumer/logs.go @@ -36,8 +36,17 @@ func NewLogs(consume ConsumeLogsFunc, options ...Option) (Logs, error) { if consume == nil { return nil, errNilFunc } + baseImpl := internal.NewBaseImpl(options...) + fn := func(ctx context.Context, ld plog.Logs) error { + baseImpl.ObsReport.StartTracesOp(ctx) + logRecordCount := ld.LogRecordCount() + err := consume(ctx, ld) + baseImpl.ObsReport.EndTracesOp(ctx, logRecordCount, err) + return err + } + return &baseLogs{ - BaseImpl: internal.NewBaseImpl(options...), - ConsumeLogsFunc: consume, + BaseImpl: baseImpl, + ConsumeLogsFunc: fn, }, nil } diff --git a/consumer/logs_test.go b/consumer/logs_test.go index a6ca2eb6f95..eae48301425 100644 --- a/consumer/logs_test.go +++ b/consumer/logs_test.go @@ -14,6 +14,20 @@ import ( "go.opentelemetry.io/collector/pdata/plog" ) +type mockObsReport struct { + StartTracesOpCalled int + EndTracesOpCalled int +} + +func (m *mockObsReport) StartTracesOp(ctx context.Context) context.Context { + m.StartTracesOpCalled++ + return ctx +} + +func (m *mockObsReport) EndTracesOp(_ context.Context, _ int, _ error) { + m.EndTracesOpCalled++ +} + func TestDefaultLogs(t *testing.T) { cp, err := NewLogs(func(context.Context, plog.Logs) error { return nil }) assert.NoError(t, err) @@ -35,6 +49,17 @@ func TestWithCapabilitiesLogs(t *testing.T) { assert.Equal(t, Capabilities{MutatesData: true}, cp.Capabilities()) } +func TestWithObsReportLogs(t *testing.T) { + obsr := &mockObsReport{} + cp, err := NewLogs( + func(context.Context, plog.Logs) error { return nil }, + WithObsReport(obsr)) + assert.NoError(t, err) + assert.NoError(t, cp.ConsumeLogs(context.Background(), plog.NewLogs())) + assert.Equal(t, 1, obsr.StartTracesOpCalled) + assert.Equal(t, 1, obsr.EndTracesOpCalled) +} + func TestConsumeLogs(t *testing.T) { consumeCalled := false cp, err := NewLogs(func(context.Context, plog.Logs) error { consumeCalled = true; return nil }) diff --git a/consumer/metrics.go b/consumer/metrics.go index 47897f9363a..fbf28bc4615 100644 --- a/consumer/metrics.go +++ b/consumer/metrics.go @@ -36,8 +36,17 @@ func NewMetrics(consume ConsumeMetricsFunc, options ...Option) (Metrics, error) if consume == nil { return nil, errNilFunc } + baseImpl := internal.NewBaseImpl(options...) + fn := func(ctx context.Context, ld pmetric.Metrics) error { + ctx = baseImpl.ObsReport.StartTracesOp(ctx) + dataPointCount := ld.DataPointCount() + err := consume(ctx, ld) + baseImpl.ObsReport.EndTracesOp(ctx, dataPointCount, err) + return err + } + return &baseMetrics{ - BaseImpl: internal.NewBaseImpl(options...), - ConsumeMetricsFunc: consume, + BaseImpl: baseImpl, + ConsumeMetricsFunc: fn, }, nil } diff --git a/consumer/metrics_test.go b/consumer/metrics_test.go index 914e96a1529..bed37afad92 100644 --- a/consumer/metrics_test.go +++ b/consumer/metrics_test.go @@ -35,6 +35,17 @@ func TestWithCapabilitiesMetrics(t *testing.T) { assert.Equal(t, Capabilities{MutatesData: true}, cp.Capabilities()) } +func TestWithObsReportMetrics(t *testing.T) { + obsr := &mockObsReport{} + cp, err := NewMetrics( + func(context.Context, pmetric.Metrics) error { return nil }, + WithObsReport(obsr)) + assert.NoError(t, err) + assert.NoError(t, cp.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) + assert.Equal(t, 1, obsr.StartTracesOpCalled) + assert.Equal(t, 1, obsr.EndTracesOpCalled) +} + func TestConsumeMetrics(t *testing.T) { consumeCalled := false cp, err := NewMetrics(func(context.Context, pmetric.Metrics) error { consumeCalled = true; return nil }) diff --git a/consumer/traces.go b/consumer/traces.go index 60df2d04536..27e5fc842dc 100644 --- a/consumer/traces.go +++ b/consumer/traces.go @@ -36,8 +36,18 @@ func NewTraces(consume ConsumeTracesFunc, options ...Option) (Traces, error) { if consume == nil { return nil, errNilFunc } + + baseImpl := internal.NewBaseImpl(options...) + fn := func(ctx context.Context, td ptrace.Traces) error { + ctx = baseImpl.ObsReport.StartTracesOp(ctx) + spanCount := td.SpanCount() + err := consume(ctx, td) + baseImpl.ObsReport.EndTracesOp(ctx, spanCount, err) + return err + } + return &baseTraces{ - BaseImpl: internal.NewBaseImpl(options...), - ConsumeTracesFunc: consume, + BaseImpl: baseImpl, + ConsumeTracesFunc: fn, }, nil } diff --git a/consumer/traces_test.go b/consumer/traces_test.go index 168db58cef4..f9a346c862a 100644 --- a/consumer/traces_test.go +++ b/consumer/traces_test.go @@ -35,6 +35,17 @@ func TestWithCapabilitiesTraces(t *testing.T) { assert.Equal(t, Capabilities{MutatesData: true}, cp.Capabilities()) } +func TestWithWithObsReportTraces(t *testing.T) { + obsr := &mockObsReport{} + cp, err := NewTraces( + func(context.Context, ptrace.Traces) error { return nil }, + WithObsReport(obsr)) + assert.NoError(t, err) + assert.NoError(t, cp.ConsumeTraces(context.Background(), ptrace.NewTraces())) + assert.Equal(t, 1, obsr.StartTracesOpCalled) + assert.Equal(t, 1, obsr.EndTracesOpCalled) +} + func TestConsumeTraces(t *testing.T) { consumeCalled := false cp, err := NewTraces(func(context.Context, ptrace.Traces) error { consumeCalled = true; return nil })