diff --git a/.chloggen/codeboten_obsreport-scraper.yaml b/.chloggen/codeboten_obsreport-scraper.yaml new file mode 100755 index 00000000000..33dff252304 --- /dev/null +++ b/.chloggen/codeboten_obsreport-scraper.yaml @@ -0,0 +1,32 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: obsreport + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Deprecating obsreport scraper and receiver API" + +# One or more tracking issues or pull requests related to the change +issues: [8492] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + These deprecated methods/structs have been moved to receiverhelper and scraperhelper: + - `obsreport.Receiver` -> `receiverhelper.ObsReport` + - `obsreport.ReceiverSettings` -> `receiverhelper.ObsReportSettings` + - `obsreport.NewReceiver` -> `receiverhelper.NewObsReport` + - `obsreport.Scraper` -> `scraperhelper.ObsReport` + - `obsreport.ScraperSettings` -> `scraperhelper.ObsReportSettings` + - `obsreport.NewScraper` -> `scraperhelper.NewObsReport` + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] \ No newline at end of file diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index 7702bbb2e0c..0429d675cd3 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -54,7 +54,6 @@ require ( go.opentelemetry.io/collector/extension v0.85.0 // indirect go.opentelemetry.io/collector/extension/auth v0.85.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect - go.opentelemetry.io/collector/processor v0.85.0 // indirect go.opentelemetry.io/collector/service v0.0.0-20230915215502-07938f20fcc7 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.44.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 // indirect diff --git a/go.mod b/go.mod index c3e10316e0b..8be7b8a441d 100644 --- a/go.mod +++ b/go.mod @@ -22,12 +22,9 @@ require ( go.opentelemetry.io/collector/service v0.0.0-20230915215502-07938f20fcc7 go.opentelemetry.io/otel v1.18.0 go.opentelemetry.io/otel/exporters/prometheus v0.41.0 - go.opentelemetry.io/otel/metric v1.18.0 go.opentelemetry.io/otel/sdk v1.18.0 go.opentelemetry.io/otel/sdk/metric v0.41.0 - go.opentelemetry.io/otel/trace v1.18.0 go.uber.org/multierr v1.11.0 - go.uber.org/zap v1.26.0 ) require ( @@ -63,7 +60,10 @@ require ( github.com/yusufpapurcu/wmi v1.2.3 // indirect go.opentelemetry.io/collector/confmap v0.85.0 // indirect go.opentelemetry.io/collector/extension v0.85.0 // indirect + go.opentelemetry.io/otel/metric v1.18.0 // indirect + go.opentelemetry.io/otel/trace v1.18.0 // indirect go.uber.org/goleak v1.2.1 // indirect + go.uber.org/zap v1.26.0 // indirect golang.org/x/net v0.15.0 // indirect golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.13.0 // indirect diff --git a/obsreport/obsreport.go b/obsreport/obsreport.go deleted file mode 100644 index c22493b31f4..00000000000 --- a/obsreport/obsreport.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package obsreport // import "go.opentelemetry.io/collector/obsreport" - -import ( - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" -) - -const ( - scopeName = "go.opentelemetry.io/collector/obsreport" - - nameSep = "/" -) - -func recordError(span trace.Span, err error) { - if err != nil { - span.SetStatus(codes.Error, err.Error()) - } -} diff --git a/obsreport/obsreport_receiver.go b/obsreport/obsreport_receiver.go index 2d8a621f3a6..e816434d895 100644 --- a/obsreport/obsreport_receiver.go +++ b/obsreport/obsreport_receiver.go @@ -3,323 +3,21 @@ package obsreport // import "go.opentelemetry.io/collector/obsreport" -import ( - "context" - "errors" - - "go.opencensus.io/stats" - "go.opencensus.io/tag" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/trace" - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/internal/obsreportconfig" - "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" - "go.opentelemetry.io/collector/receiver" -) - -const ( - receiverName = "receiver" - - receiverScope = scopeName + nameSep + receiverName -) +import "go.opentelemetry.io/collector/receiver/receiverhelper" // Receiver is a helper to add observability to a receiver. -type Receiver struct { - level configtelemetry.Level - spanNamePrefix string - transport string - longLivedCtx bool - mutators []tag.Mutator - tracer trace.Tracer - meter metric.Meter - logger *zap.Logger - - useOtelForMetrics bool - otelAttrs []attribute.KeyValue - - acceptedSpansCounter metric.Int64Counter - refusedSpansCounter metric.Int64Counter - acceptedMetricPointsCounter metric.Int64Counter - refusedMetricPointsCounter metric.Int64Counter - acceptedLogRecordsCounter metric.Int64Counter - refusedLogRecordsCounter metric.Int64Counter -} +// +// Deprecated: [0.85.0] Use receiverhelper.ObsReport instead. +type Receiver = receiverhelper.ObsReport // ReceiverSettings are settings for creating an Receiver. -type ReceiverSettings struct { - ReceiverID component.ID - Transport string - // LongLivedCtx when true indicates that the context passed in the call - // outlives the individual receive operation. - // Typically the long lived context is associated to a connection, - // eg.: a gRPC stream, for which many batches of data are received in individual - // operations without a corresponding new context per operation. - LongLivedCtx bool - ReceiverCreateSettings receiver.CreateSettings -} +// +// Deprecated: [0.85.0] Use receiverhelper.ObsReportSettings instead. +type ReceiverSettings = receiverhelper.ObsReportSettings // NewReceiver creates a new Receiver. +// +// Deprecated: [0.85.0] Use receiverhelper.NewObsReport instead. func NewReceiver(cfg ReceiverSettings) (*Receiver, error) { - return newReceiver(cfg, obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled()) -} - -func newReceiver(cfg ReceiverSettings, useOtel bool) (*Receiver, error) { - rec := &Receiver{ - level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel, - spanNamePrefix: obsmetrics.ReceiverPrefix + cfg.ReceiverID.String(), - transport: cfg.Transport, - longLivedCtx: cfg.LongLivedCtx, - mutators: []tag.Mutator{ - tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)), - tag.Upsert(obsmetrics.TagKeyTransport, cfg.Transport, tag.WithTTL(tag.TTLNoPropagation)), - }, - tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.ReceiverID.String()), - meter: cfg.ReceiverCreateSettings.MeterProvider.Meter(receiverScope), - logger: cfg.ReceiverCreateSettings.Logger, - - useOtelForMetrics: useOtel, - otelAttrs: []attribute.KeyValue{ - attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()), - attribute.String(obsmetrics.TransportKey, cfg.Transport), - }, - } - - // ignore instrument name error as per workaround in https://github.com/open-telemetry/opentelemetry-collector/issues/8346 - // if err := rec.createOtelMetrics(); err != nil { - // return nil, err - // } - if err := rec.createOtelMetrics(); err != nil && !errors.Is(err, sdkmetric.ErrInstrumentName) { - return nil, err - } - - return rec, nil -} - -func (rec *Receiver) createOtelMetrics() error { - if !rec.useOtelForMetrics { - return nil - } - - var errors, err error - - rec.acceptedSpansCounter, err = rec.meter.Int64Counter( - obsmetrics.ReceiverPrefix+obsmetrics.AcceptedSpansKey, - metric.WithDescription("Number of spans successfully pushed into the pipeline."), - metric.WithUnit("1"), - ) - errors = multierr.Append(errors, err) - - rec.refusedSpansCounter, err = rec.meter.Int64Counter( - obsmetrics.ReceiverPrefix+obsmetrics.RefusedSpansKey, - metric.WithDescription("Number of spans that could not be pushed into the pipeline."), - metric.WithUnit("1"), - ) - errors = multierr.Append(errors, err) - - rec.acceptedMetricPointsCounter, err = rec.meter.Int64Counter( - obsmetrics.ReceiverPrefix+obsmetrics.AcceptedMetricPointsKey, - metric.WithDescription("Number of metric points successfully pushed into the pipeline."), - metric.WithUnit("1"), - ) - errors = multierr.Append(errors, err) - - rec.refusedMetricPointsCounter, err = rec.meter.Int64Counter( - obsmetrics.ReceiverPrefix+obsmetrics.RefusedMetricPointsKey, - metric.WithDescription("Number of metric points that could not be pushed into the pipeline."), - metric.WithUnit("1"), - ) - errors = multierr.Append(errors, err) - - rec.acceptedLogRecordsCounter, err = rec.meter.Int64Counter( - obsmetrics.ReceiverPrefix+obsmetrics.AcceptedLogRecordsKey, - metric.WithDescription("Number of log records successfully pushed into the pipeline."), - metric.WithUnit("1"), - ) - errors = multierr.Append(errors, err) - - rec.refusedLogRecordsCounter, err = rec.meter.Int64Counter( - obsmetrics.ReceiverPrefix+obsmetrics.RefusedLogRecordsKey, - metric.WithDescription("Number of log records that could not be pushed into the pipeline."), - metric.WithUnit("1"), - ) - errors = multierr.Append(errors, err) - - return errors -} - -// StartTracesOp is called when a request is received from a client. -// The returned context should be used in other calls to the obsreport functions -// dealing with the same receive operation. -func (rec *Receiver) StartTracesOp(operationCtx context.Context) context.Context { - return rec.startOp(operationCtx, obsmetrics.ReceiveTraceDataOperationSuffix) -} - -// EndTracesOp completes the receive operation that was started with -// StartTracesOp. -func (rec *Receiver) EndTracesOp( - receiverCtx context.Context, - format string, - numReceivedSpans int, - err error, -) { - rec.endOp(receiverCtx, format, numReceivedSpans, err, component.DataTypeTraces) -} - -// StartLogsOp is called when a request is received from a client. -// The returned context should be used in other calls to the obsreport functions -// dealing with the same receive operation. -func (rec *Receiver) StartLogsOp(operationCtx context.Context) context.Context { - return rec.startOp(operationCtx, obsmetrics.ReceiverLogsOperationSuffix) -} - -// EndLogsOp completes the receive operation that was started with -// StartLogsOp. -func (rec *Receiver) EndLogsOp( - receiverCtx context.Context, - format string, - numReceivedLogRecords int, - err error, -) { - rec.endOp(receiverCtx, format, numReceivedLogRecords, err, component.DataTypeLogs) -} - -// StartMetricsOp is called when a request is received from a client. -// The returned context should be used in other calls to the obsreport functions -// dealing with the same receive operation. -func (rec *Receiver) StartMetricsOp(operationCtx context.Context) context.Context { - return rec.startOp(operationCtx, obsmetrics.ReceiverMetricsOperationSuffix) -} - -// EndMetricsOp completes the receive operation that was started with -// StartMetricsOp. -func (rec *Receiver) EndMetricsOp( - receiverCtx context.Context, - format string, - numReceivedPoints int, - err error, -) { - rec.endOp(receiverCtx, format, numReceivedPoints, err, component.DataTypeMetrics) -} - -// startOp creates the span used to trace the operation. Returning -// the updated context with the created span. -func (rec *Receiver) startOp(receiverCtx context.Context, operationSuffix string) context.Context { - ctx, _ := tag.New(receiverCtx, rec.mutators...) - var span trace.Span - spanName := rec.spanNamePrefix + operationSuffix - if !rec.longLivedCtx { - ctx, span = rec.tracer.Start(ctx, spanName) - } else { - // Since the receiverCtx is long lived do not use it to start the span. - // This way this trace ends when the EndTracesOp is called. - // Here is safe to ignore the returned context since it is not used below. - _, span = rec.tracer.Start(context.Background(), spanName, trace.WithLinks(trace.Link{ - SpanContext: trace.SpanContextFromContext(receiverCtx), - })) - - ctx = trace.ContextWithSpan(ctx, span) - } - - if rec.transport != "" { - span.SetAttributes(attribute.String(obsmetrics.TransportKey, rec.transport)) - } - return ctx -} - -// endOp records the observability signals at the end of an operation. -func (rec *Receiver) endOp( - receiverCtx context.Context, - format string, - numReceivedItems int, - err error, - dataType component.DataType, -) { - numAccepted := numReceivedItems - numRefused := 0 - if err != nil { - numAccepted = 0 - numRefused = numReceivedItems - } - - span := trace.SpanFromContext(receiverCtx) - - if rec.level != configtelemetry.LevelNone { - rec.recordMetrics(receiverCtx, dataType, numAccepted, numRefused) - } - - // end span according to errors - if span.IsRecording() { - var acceptedItemsKey, refusedItemsKey string - switch dataType { - case component.DataTypeTraces: - acceptedItemsKey = obsmetrics.AcceptedSpansKey - refusedItemsKey = obsmetrics.RefusedSpansKey - case component.DataTypeMetrics: - acceptedItemsKey = obsmetrics.AcceptedMetricPointsKey - refusedItemsKey = obsmetrics.RefusedMetricPointsKey - case component.DataTypeLogs: - acceptedItemsKey = obsmetrics.AcceptedLogRecordsKey - refusedItemsKey = obsmetrics.RefusedLogRecordsKey - } - - span.SetAttributes( - attribute.String(obsmetrics.FormatKey, format), - attribute.Int64(acceptedItemsKey, int64(numAccepted)), - attribute.Int64(refusedItemsKey, int64(numRefused)), - ) - recordError(span, err) - } - span.End() -} - -func (rec *Receiver) recordMetrics(receiverCtx context.Context, dataType component.DataType, numAccepted, numRefused int) { - if rec.useOtelForMetrics { - rec.recordWithOtel(receiverCtx, dataType, numAccepted, numRefused) - } else { - rec.recordWithOC(receiverCtx, dataType, numAccepted, numRefused) - } -} - -func (rec *Receiver) recordWithOtel(receiverCtx context.Context, dataType component.DataType, numAccepted, numRefused int) { - var acceptedMeasure, refusedMeasure metric.Int64Counter - switch dataType { - case component.DataTypeTraces: - acceptedMeasure = rec.acceptedSpansCounter - refusedMeasure = rec.refusedSpansCounter - case component.DataTypeMetrics: - acceptedMeasure = rec.acceptedMetricPointsCounter - refusedMeasure = rec.refusedMetricPointsCounter - case component.DataTypeLogs: - acceptedMeasure = rec.acceptedLogRecordsCounter - refusedMeasure = rec.refusedLogRecordsCounter - } - - acceptedMeasure.Add(receiverCtx, int64(numAccepted), metric.WithAttributes(rec.otelAttrs...)) - refusedMeasure.Add(receiverCtx, int64(numRefused), metric.WithAttributes(rec.otelAttrs...)) -} - -func (rec *Receiver) recordWithOC(receiverCtx context.Context, dataType component.DataType, numAccepted, numRefused int) { - var acceptedMeasure, refusedMeasure *stats.Int64Measure - switch dataType { - case component.DataTypeTraces: - acceptedMeasure = obsmetrics.ReceiverAcceptedSpans - refusedMeasure = obsmetrics.ReceiverRefusedSpans - case component.DataTypeMetrics: - acceptedMeasure = obsmetrics.ReceiverAcceptedMetricPoints - refusedMeasure = obsmetrics.ReceiverRefusedMetricPoints - case component.DataTypeLogs: - acceptedMeasure = obsmetrics.ReceiverAcceptedLogRecords - refusedMeasure = obsmetrics.ReceiverRefusedLogRecords - } - - stats.Record( - receiverCtx, - acceptedMeasure.M(int64(numAccepted)), - refusedMeasure.M(int64(numRefused))) + return receiverhelper.NewObsReport(cfg) } diff --git a/obsreport/obsreport_scraper.go b/obsreport/obsreport_scraper.go index c107e8735ba..f6ae510b20f 100644 --- a/obsreport/obsreport_scraper.go +++ b/obsreport/obsreport_scraper.go @@ -3,170 +3,21 @@ package obsreport // import "go.opentelemetry.io/collector/obsreport" -import ( - "context" - "errors" - - "go.opencensus.io/stats" - "go.opencensus.io/tag" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/trace" - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/internal/obsreportconfig" - "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/scrapererror" -) - -var ( - scraperName = "scraper" - scraperScope = scopeName + nameSep + scraperName -) +import "go.opentelemetry.io/collector/receiver/scraperhelper" // Scraper is a helper to add observability to a scraper. -type Scraper struct { - level configtelemetry.Level - receiverID component.ID - scraper component.ID - mutators []tag.Mutator - tracer trace.Tracer - - logger *zap.Logger - - useOtelForMetrics bool - otelAttrs []attribute.KeyValue - scrapedMetricsPoints metric.Int64Counter - erroredMetricsPoints metric.Int64Counter -} +// +// Deprecated: [0.85.0] Use scraperhelper.ObsReport instead. +type Scraper = scraperhelper.ObsReport // ScraperSettings are settings for creating a Scraper. -type ScraperSettings struct { - ReceiverID component.ID - Scraper component.ID - ReceiverCreateSettings receiver.CreateSettings -} +// +// Deprecated: [0.85.0] Use scraperhelper.ObsReportSettings instead. +type ScraperSettings = scraperhelper.ObsReportSettings // NewScraper creates a new Scraper. +// +// Deprecated: [0.85.0] Use scraperhelper.NewObsReport instead. func NewScraper(cfg ScraperSettings) (*Scraper, error) { - return newScraper(cfg, obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled()) -} - -func newScraper(cfg ScraperSettings, useOtel bool) (*Scraper, error) { - scraper := &Scraper{ - level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel, - receiverID: cfg.ReceiverID, - scraper: cfg.Scraper, - mutators: []tag.Mutator{ - tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)), - tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))}, - tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()), - - logger: cfg.ReceiverCreateSettings.Logger, - useOtelForMetrics: useOtel, - otelAttrs: []attribute.KeyValue{ - attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()), - attribute.String(obsmetrics.ScraperKey, cfg.Scraper.String()), - }, - } - - // ignore instrument name error as per workaround in https://github.com/open-telemetry/opentelemetry-collector/issues/8346 - // if err := scraper.createOtelMetrics(cfg); err != nil { - // return nil, err - // } - if err := scraper.createOtelMetrics(cfg); err != nil && !errors.Is(err, sdkmetric.ErrInstrumentName) { - return nil, err - } - - return scraper, nil -} - -func (s *Scraper) createOtelMetrics(cfg ScraperSettings) error { - if !s.useOtelForMetrics { - return nil - } - meter := cfg.ReceiverCreateSettings.MeterProvider.Meter(scraperScope) - - var errors, err error - - s.scrapedMetricsPoints, err = meter.Int64Counter( - obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey, - metric.WithDescription("Number of metric points successfully scraped."), - metric.WithUnit("1"), - ) - errors = multierr.Append(errors, err) - - s.erroredMetricsPoints, err = meter.Int64Counter( - obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey, - metric.WithDescription("Number of metric points that were unable to be scraped."), - metric.WithUnit("1"), - ) - errors = multierr.Append(errors, err) - - return errors -} - -// StartMetricsOp is called when a scrape operation is started. The -// returned context should be used in other calls to the obsreport functions -// dealing with the same scrape operation. -func (s *Scraper) StartMetricsOp(ctx context.Context) context.Context { - ctx, _ = tag.New(ctx, s.mutators...) - - spanName := obsmetrics.ScraperPrefix + s.receiverID.String() + obsmetrics.NameSep + s.scraper.String() + obsmetrics.ScraperMetricsOperationSuffix - ctx, _ = s.tracer.Start(ctx, spanName) - return ctx -} - -// EndMetricsOp completes the scrape operation that was started with -// StartMetricsOp. -func (s *Scraper) EndMetricsOp( - scraperCtx context.Context, - numScrapedMetrics int, - err error, -) { - numErroredMetrics := 0 - if err != nil { - var partialErr scrapererror.PartialScrapeError - if errors.As(err, &partialErr) { - numErroredMetrics = partialErr.Failed - } else { - numErroredMetrics = numScrapedMetrics - numScrapedMetrics = 0 - } - } - - span := trace.SpanFromContext(scraperCtx) - - if s.level != configtelemetry.LevelNone { - s.recordMetrics(scraperCtx, numScrapedMetrics, numErroredMetrics) - } - - // end span according to errors - if span.IsRecording() { - span.SetAttributes( - attribute.String(obsmetrics.FormatKey, string(component.DataTypeMetrics)), - attribute.Int64(obsmetrics.ScrapedMetricPointsKey, int64(numScrapedMetrics)), - attribute.Int64(obsmetrics.ErroredMetricPointsKey, int64(numErroredMetrics)), - ) - recordError(span, err) - } - - span.End() -} - -func (s *Scraper) recordMetrics(scraperCtx context.Context, numScrapedMetrics, numErroredMetrics int) { - if s.useOtelForMetrics { - s.scrapedMetricsPoints.Add(scraperCtx, int64(numScrapedMetrics), metric.WithAttributes(s.otelAttrs...)) - s.erroredMetricsPoints.Add(scraperCtx, int64(numErroredMetrics), metric.WithAttributes(s.otelAttrs...)) - } else { // OC for metrics - stats.Record( - scraperCtx, - obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)), - obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics))) - } + return scraperhelper.NewObsReport(cfg) } diff --git a/obsreport/obsreporttest/obsreporttest_test.go b/obsreport/obsreporttest/obsreporttest_test.go index e1336596b2c..2ff4b850705 100644 --- a/obsreport/obsreporttest/obsreporttest_test.go +++ b/obsreport/obsreporttest/obsreporttest_test.go @@ -12,9 +12,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/obsreport/obsreporttest" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/receiver/scraperhelper" ) const ( @@ -34,7 +35,7 @@ func TestCheckScraperMetricsViews(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - s, err := obsreport.NewScraper(obsreport.ScraperSettings{ + s, err := scraperhelper.NewObsReport(scraperhelper.ObsReportSettings{ ReceiverID: receiver, Scraper: scraper, ReceiverCreateSettings: tt.ToReceiverCreateSettings(), @@ -55,7 +56,7 @@ func TestCheckReceiverTracesViews(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - rec, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + rec, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: receiver, Transport: transport, ReceiverCreateSettings: tt.ToReceiverCreateSettings(), @@ -76,7 +77,7 @@ func TestCheckReceiverMetricsViews(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - rec, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + rec, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: receiver, Transport: transport, ReceiverCreateSettings: tt.ToReceiverCreateSettings(), @@ -97,7 +98,7 @@ func TestCheckReceiverLogsViews(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - rec, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + rec, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: receiver, Transport: transport, ReceiverCreateSettings: tt.ToReceiverCreateSettings(), diff --git a/receiver/go.mod b/receiver/go.mod index 2b52cfdead8..7fea3d1f124 100644 --- a/receiver/go.mod +++ b/receiver/go.mod @@ -4,12 +4,17 @@ go 1.20 require ( github.com/stretchr/testify v1.8.4 + go.opencensus.io v0.24.0 go.opentelemetry.io/collector v0.85.0 go.opentelemetry.io/collector/component v0.85.0 + go.opentelemetry.io/collector/config/configtelemetry v0.85.0 go.opentelemetry.io/collector/consumer v0.85.0 go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 go.opentelemetry.io/otel v1.18.0 + go.opentelemetry.io/otel/metric v1.18.0 go.opentelemetry.io/otel/sdk v1.18.0 + go.opentelemetry.io/otel/sdk/metric v0.41.0 + go.opentelemetry.io/otel/trace v1.18.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 ) @@ -17,7 +22,6 @@ require ( require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-kit/log v0.2.1 // indirect @@ -43,17 +47,11 @@ require ( github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect - go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.85.0 // indirect go.opentelemetry.io/collector/confmap v0.85.0 // indirect go.opentelemetry.io/collector/exporter v0.85.0 // indirect - go.opentelemetry.io/collector/extension v0.85.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect go.opentelemetry.io/collector/processor v0.85.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.41.0 // indirect - go.opentelemetry.io/otel/metric v1.18.0 // indirect - go.opentelemetry.io/otel/sdk/metric v0.41.0 // indirect - go.opentelemetry.io/otel/trace v1.18.0 // indirect golang.org/x/net v0.15.0 // indirect golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.13.0 // indirect diff --git a/receiver/go.sum b/receiver/go.sum index 1e7d2abfab4..15d68a28c23 100644 --- a/receiver/go.sum +++ b/receiver/go.sum @@ -46,7 +46,6 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/receiver/otlpreceiver/go.mod b/receiver/otlpreceiver/go.mod index 3a7d751eac7..9c0850d2eec 100644 --- a/receiver/otlpreceiver/go.mod +++ b/receiver/otlpreceiver/go.mod @@ -28,7 +28,6 @@ require ( cloud.google.com/go/compute/metadata v0.2.4-0.20230617002413-005d2dfb6b68 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect diff --git a/receiver/otlpreceiver/go.sum b/receiver/otlpreceiver/go.sum index 092b5971bb1..81ac3062768 100644 --- a/receiver/otlpreceiver/go.sum +++ b/receiver/otlpreceiver/go.sum @@ -50,7 +50,6 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/receiver/otlpreceiver/internal/logs/otlp.go b/receiver/otlpreceiver/internal/logs/otlp.go index bf521699043..faf92372d51 100644 --- a/receiver/otlpreceiver/internal/logs/otlp.go +++ b/receiver/otlpreceiver/internal/logs/otlp.go @@ -7,8 +7,8 @@ import ( "context" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) const dataFormatProtobuf = "protobuf" @@ -17,14 +17,14 @@ const dataFormatProtobuf = "protobuf" type Receiver struct { plogotlp.UnimplementedGRPCServer nextConsumer consumer.Logs - obsrecv *obsreport.Receiver + obsreport *receiverhelper.ObsReport } // New creates a new Receiver reference. -func New(nextConsumer consumer.Logs, obsrecv *obsreport.Receiver) *Receiver { +func New(nextConsumer consumer.Logs, obsreport *receiverhelper.ObsReport) *Receiver { return &Receiver{ nextConsumer: nextConsumer, - obsrecv: obsrecv, + obsreport: obsreport, } } @@ -36,9 +36,9 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog return plogotlp.NewExportResponse(), nil } - ctx = r.obsrecv.StartLogsOp(ctx) + ctx = r.obsreport.StartLogsOp(ctx) err := r.nextConsumer.ConsumeLogs(ctx, ld) - r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err) + r.obsreport.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err) return plogotlp.NewExportResponse(), err } diff --git a/receiver/otlpreceiver/internal/logs/otlp_test.go b/receiver/otlpreceiver/internal/logs/otlp_test.go index 147163b95d3..beb64c159e1 100644 --- a/receiver/otlpreceiver/internal/logs/otlp_test.go +++ b/receiver/otlpreceiver/internal/logs/otlp_test.go @@ -18,8 +18,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -78,13 +78,13 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { set := receivertest.NewNopCreateSettings() set.ID = component.NewIDWithName("otlp", "log") - obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + obsreport, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "grpc", ReceiverCreateSettings: set, }) require.NoError(t, err) - r := New(lc, obsrecv) + r := New(lc, obsreport) // Now run it as a gRPC server srv := grpc.NewServer() plogotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otlpreceiver/internal/metrics/otlp.go b/receiver/otlpreceiver/internal/metrics/otlp.go index 4b437913f44..59330dcc318 100644 --- a/receiver/otlpreceiver/internal/metrics/otlp.go +++ b/receiver/otlpreceiver/internal/metrics/otlp.go @@ -7,8 +7,8 @@ import ( "context" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) const dataFormatProtobuf = "protobuf" @@ -17,14 +17,14 @@ const dataFormatProtobuf = "protobuf" type Receiver struct { pmetricotlp.UnimplementedGRPCServer nextConsumer consumer.Metrics - obsrecv *obsreport.Receiver + obsreport *receiverhelper.ObsReport } // New creates a new Receiver reference. -func New(nextConsumer consumer.Metrics, obsrecv *obsreport.Receiver) *Receiver { +func New(nextConsumer consumer.Metrics, obsreport *receiverhelper.ObsReport) *Receiver { return &Receiver{ nextConsumer: nextConsumer, - obsrecv: obsrecv, + obsreport: obsreport, } } @@ -36,9 +36,9 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p return pmetricotlp.NewExportResponse(), nil } - ctx = r.obsrecv.StartMetricsOp(ctx) + ctx = r.obsreport.StartMetricsOp(ctx) err := r.nextConsumer.ConsumeMetrics(ctx, md) - r.obsrecv.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) + r.obsreport.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) return pmetricotlp.NewExportResponse(), err } diff --git a/receiver/otlpreceiver/internal/metrics/otlp_test.go b/receiver/otlpreceiver/internal/metrics/otlp_test.go index 4dd7b70bd98..d8691daaba1 100644 --- a/receiver/otlpreceiver/internal/metrics/otlp_test.go +++ b/receiver/otlpreceiver/internal/metrics/otlp_test.go @@ -18,8 +18,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -79,13 +79,13 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { set := receivertest.NewNopCreateSettings() set.ID = component.NewIDWithName("otlp", "metrics") - obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + obsreport, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "grpc", ReceiverCreateSettings: set, }) require.NoError(t, err) - r := New(mc, obsrecv) + r := New(mc, obsreport) // Now run it as a gRPC server srv := grpc.NewServer() pmetricotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otlpreceiver/internal/trace/otlp.go b/receiver/otlpreceiver/internal/trace/otlp.go index deb605b6127..d14779e712b 100644 --- a/receiver/otlpreceiver/internal/trace/otlp.go +++ b/receiver/otlpreceiver/internal/trace/otlp.go @@ -7,8 +7,8 @@ import ( "context" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) const dataFormatProtobuf = "protobuf" @@ -17,14 +17,14 @@ const dataFormatProtobuf = "protobuf" type Receiver struct { ptraceotlp.UnimplementedGRPCServer nextConsumer consumer.Traces - obsrecv *obsreport.Receiver + obsreport *receiverhelper.ObsReport } // New creates a new Receiver reference. -func New(nextConsumer consumer.Traces, obsrecv *obsreport.Receiver) *Receiver { +func New(nextConsumer consumer.Traces, obsreport *receiverhelper.ObsReport) *Receiver { return &Receiver{ nextConsumer: nextConsumer, - obsrecv: obsrecv, + obsreport: obsreport, } } @@ -37,9 +37,9 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt return ptraceotlp.NewExportResponse(), nil } - ctx = r.obsrecv.StartTracesOp(ctx) + ctx = r.obsreport.StartTracesOp(ctx) err := r.nextConsumer.ConsumeTraces(ctx, td) - r.obsrecv.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err) + r.obsreport.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err) return ptraceotlp.NewExportResponse(), err } diff --git a/receiver/otlpreceiver/internal/trace/otlp_test.go b/receiver/otlpreceiver/internal/trace/otlp_test.go index 4f2a0251c81..fcb7611c75f 100644 --- a/receiver/otlpreceiver/internal/trace/otlp_test.go +++ b/receiver/otlpreceiver/internal/trace/otlp_test.go @@ -18,8 +18,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -76,13 +76,13 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { set := receivertest.NewNopCreateSettings() set.ID = component.NewIDWithName("otlp", "trace") - obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + obsreport, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "grpc", ReceiverCreateSettings: set, }) require.NoError(t, err) - r := New(tc, obsrecv) + r := New(tc, obsreport) // Now run it as a gRPC server srv := grpc.NewServer() ptraceotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index bc00c34413e..faf9a68fe04 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -18,7 +18,6 @@ import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" @@ -26,6 +25,7 @@ import ( "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // otlpReceiver is the type that exposes Trace and Metrics reception. @@ -40,8 +40,8 @@ type otlpReceiver struct { logsReceiver *logs.Receiver shutdownWG sync.WaitGroup - obsrepGRPC *obsreport.Receiver - obsrepHTTP *obsreport.Receiver + obsrepGRPC *receiverhelper.ObsReport + obsrepHTTP *receiverhelper.ObsReport settings receiver.CreateSettings } @@ -59,7 +59,7 @@ func newOtlpReceiver(cfg *Config, set receiver.CreateSettings) (*otlpReceiver, e } var err error - r.obsrepGRPC, err = obsreport.NewReceiver(obsreport.ReceiverSettings{ + r.obsrepGRPC, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "grpc", ReceiverCreateSettings: set, @@ -67,7 +67,7 @@ func newOtlpReceiver(cfg *Config, set receiver.CreateSettings) (*otlpReceiver, e if err != nil { return nil, err } - r.obsrepHTTP, err = obsreport.NewReceiver(obsreport.ReceiverSettings{ + r.obsrepHTTP, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "http", ReceiverCreateSettings: set, diff --git a/receiver/receiverhelper/obsreport.go b/receiver/receiverhelper/obsreport.go new file mode 100644 index 00000000000..40caca0cccd --- /dev/null +++ b/receiver/receiverhelper/obsreport.go @@ -0,0 +1,329 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receiverhelper // import "go.opentelemetry.io/collector/receiver/receiverhelper" + +import ( + "context" + "errors" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" + "go.opentelemetry.io/collector/receiver" +) + +const ( + receiverName = "receiver" + scopeName = "go.opentelemetry.io/collector/obsreport" + nameSep = "/" + receiverScope = scopeName + nameSep + receiverName +) + +// ObsReport is a helper to add observability to a receiver. +type ObsReport struct { + level configtelemetry.Level + spanNamePrefix string + transport string + longLivedCtx bool + mutators []tag.Mutator + tracer trace.Tracer + meter metric.Meter + logger *zap.Logger + + useOtelForMetrics bool + otelAttrs []attribute.KeyValue + + acceptedSpansCounter metric.Int64Counter + refusedSpansCounter metric.Int64Counter + acceptedMetricPointsCounter metric.Int64Counter + refusedMetricPointsCounter metric.Int64Counter + acceptedLogRecordsCounter metric.Int64Counter + refusedLogRecordsCounter metric.Int64Counter +} + +// ObsReportSettings are settings for creating an ObsReport. +type ObsReportSettings struct { + ReceiverID component.ID + Transport string + // LongLivedCtx when true indicates that the context passed in the call + // outlives the individual receive operation. + // Typically the long lived context is associated to a connection, + // eg.: a gRPC stream, for which many batches of data are received in individual + // operations without a corresponding new context per operation. + LongLivedCtx bool + ReceiverCreateSettings receiver.CreateSettings +} + +// NewObsReport creates a new ObsReport. +func NewObsReport(cfg ObsReportSettings) (*ObsReport, error) { + return newReceiver(cfg, obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled()) +} + +func newReceiver(cfg ObsReportSettings, useOtel bool) (*ObsReport, error) { + rec := &ObsReport{ + level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel, + spanNamePrefix: obsmetrics.ReceiverPrefix + cfg.ReceiverID.String(), + transport: cfg.Transport, + longLivedCtx: cfg.LongLivedCtx, + mutators: []tag.Mutator{ + tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)), + tag.Upsert(obsmetrics.TagKeyTransport, cfg.Transport, tag.WithTTL(tag.TTLNoPropagation)), + }, + tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.ReceiverID.String()), + meter: cfg.ReceiverCreateSettings.MeterProvider.Meter(receiverScope), + logger: cfg.ReceiverCreateSettings.Logger, + + useOtelForMetrics: useOtel, + otelAttrs: []attribute.KeyValue{ + attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()), + attribute.String(obsmetrics.TransportKey, cfg.Transport), + }, + } + + // ignore instrument name error as per workaround in https://github.com/open-telemetry/opentelemetry-collector/issues/8346 + // if err := rec.createOtelMetrics(); err != nil { + // return nil, err + // } + if err := rec.createOtelMetrics(); err != nil && !errors.Is(err, sdkmetric.ErrInstrumentName) { + return nil, err + } + + return rec, nil +} + +func (rec *ObsReport) createOtelMetrics() error { + if !rec.useOtelForMetrics { + return nil + } + + var errors, err error + + rec.acceptedSpansCounter, err = rec.meter.Int64Counter( + obsmetrics.ReceiverPrefix+obsmetrics.AcceptedSpansKey, + metric.WithDescription("Number of spans successfully pushed into the pipeline."), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + + rec.refusedSpansCounter, err = rec.meter.Int64Counter( + obsmetrics.ReceiverPrefix+obsmetrics.RefusedSpansKey, + metric.WithDescription("Number of spans that could not be pushed into the pipeline."), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + + rec.acceptedMetricPointsCounter, err = rec.meter.Int64Counter( + obsmetrics.ReceiverPrefix+obsmetrics.AcceptedMetricPointsKey, + metric.WithDescription("Number of metric points successfully pushed into the pipeline."), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + + rec.refusedMetricPointsCounter, err = rec.meter.Int64Counter( + obsmetrics.ReceiverPrefix+obsmetrics.RefusedMetricPointsKey, + metric.WithDescription("Number of metric points that could not be pushed into the pipeline."), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + + rec.acceptedLogRecordsCounter, err = rec.meter.Int64Counter( + obsmetrics.ReceiverPrefix+obsmetrics.AcceptedLogRecordsKey, + metric.WithDescription("Number of log records successfully pushed into the pipeline."), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + + rec.refusedLogRecordsCounter, err = rec.meter.Int64Counter( + obsmetrics.ReceiverPrefix+obsmetrics.RefusedLogRecordsKey, + metric.WithDescription("Number of log records that could not be pushed into the pipeline."), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + + return errors +} + +// StartTracesOp is called when a request is received from a client. +// The returned context should be used in other calls to the obsreport functions +// dealing with the same receive operation. +func (rec *ObsReport) StartTracesOp(operationCtx context.Context) context.Context { + return rec.startOp(operationCtx, obsmetrics.ReceiveTraceDataOperationSuffix) +} + +// EndTracesOp completes the receive operation that was started with +// StartTracesOp. +func (rec *ObsReport) EndTracesOp( + receiverCtx context.Context, + format string, + numReceivedSpans int, + err error, +) { + rec.endOp(receiverCtx, format, numReceivedSpans, err, component.DataTypeTraces) +} + +// StartLogsOp is called when a request is received from a client. +// The returned context should be used in other calls to the obsreport functions +// dealing with the same receive operation. +func (rec *ObsReport) StartLogsOp(operationCtx context.Context) context.Context { + return rec.startOp(operationCtx, obsmetrics.ReceiverLogsOperationSuffix) +} + +// EndLogsOp completes the receive operation that was started with +// StartLogsOp. +func (rec *ObsReport) EndLogsOp( + receiverCtx context.Context, + format string, + numReceivedLogRecords int, + err error, +) { + rec.endOp(receiverCtx, format, numReceivedLogRecords, err, component.DataTypeLogs) +} + +// StartMetricsOp is called when a request is received from a client. +// The returned context should be used in other calls to the obsreport functions +// dealing with the same receive operation. +func (rec *ObsReport) StartMetricsOp(operationCtx context.Context) context.Context { + return rec.startOp(operationCtx, obsmetrics.ReceiverMetricsOperationSuffix) +} + +// EndMetricsOp completes the receive operation that was started with +// StartMetricsOp. +func (rec *ObsReport) EndMetricsOp( + receiverCtx context.Context, + format string, + numReceivedPoints int, + err error, +) { + rec.endOp(receiverCtx, format, numReceivedPoints, err, component.DataTypeMetrics) +} + +// startOp creates the span used to trace the operation. Returning +// the updated context with the created span. +func (rec *ObsReport) startOp(receiverCtx context.Context, operationSuffix string) context.Context { + ctx, _ := tag.New(receiverCtx, rec.mutators...) + var span trace.Span + spanName := rec.spanNamePrefix + operationSuffix + if !rec.longLivedCtx { + ctx, span = rec.tracer.Start(ctx, spanName) + } else { + // Since the receiverCtx is long lived do not use it to start the span. + // This way this trace ends when the EndTracesOp is called. + // Here is safe to ignore the returned context since it is not used below. + _, span = rec.tracer.Start(context.Background(), spanName, trace.WithLinks(trace.Link{ + SpanContext: trace.SpanContextFromContext(receiverCtx), + })) + + ctx = trace.ContextWithSpan(ctx, span) + } + + if rec.transport != "" { + span.SetAttributes(attribute.String(obsmetrics.TransportKey, rec.transport)) + } + return ctx +} + +// endOp records the observability signals at the end of an operation. +func (rec *ObsReport) endOp( + receiverCtx context.Context, + format string, + numReceivedItems int, + err error, + dataType component.DataType, +) { + numAccepted := numReceivedItems + numRefused := 0 + if err != nil { + numAccepted = 0 + numRefused = numReceivedItems + } + + span := trace.SpanFromContext(receiverCtx) + + if rec.level != configtelemetry.LevelNone { + rec.recordMetrics(receiverCtx, dataType, numAccepted, numRefused) + } + + // end span according to errors + if span.IsRecording() { + var acceptedItemsKey, refusedItemsKey string + switch dataType { + case component.DataTypeTraces: + acceptedItemsKey = obsmetrics.AcceptedSpansKey + refusedItemsKey = obsmetrics.RefusedSpansKey + case component.DataTypeMetrics: + acceptedItemsKey = obsmetrics.AcceptedMetricPointsKey + refusedItemsKey = obsmetrics.RefusedMetricPointsKey + case component.DataTypeLogs: + acceptedItemsKey = obsmetrics.AcceptedLogRecordsKey + refusedItemsKey = obsmetrics.RefusedLogRecordsKey + } + + span.SetAttributes( + attribute.String(obsmetrics.FormatKey, format), + attribute.Int64(acceptedItemsKey, int64(numAccepted)), + attribute.Int64(refusedItemsKey, int64(numRefused)), + ) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } + } + span.End() +} + +func (rec *ObsReport) recordMetrics(receiverCtx context.Context, dataType component.DataType, numAccepted, numRefused int) { + if rec.useOtelForMetrics { + rec.recordWithOtel(receiverCtx, dataType, numAccepted, numRefused) + } else { + rec.recordWithOC(receiverCtx, dataType, numAccepted, numRefused) + } +} + +func (rec *ObsReport) recordWithOtel(receiverCtx context.Context, dataType component.DataType, numAccepted, numRefused int) { + var acceptedMeasure, refusedMeasure metric.Int64Counter + switch dataType { + case component.DataTypeTraces: + acceptedMeasure = rec.acceptedSpansCounter + refusedMeasure = rec.refusedSpansCounter + case component.DataTypeMetrics: + acceptedMeasure = rec.acceptedMetricPointsCounter + refusedMeasure = rec.refusedMetricPointsCounter + case component.DataTypeLogs: + acceptedMeasure = rec.acceptedLogRecordsCounter + refusedMeasure = rec.refusedLogRecordsCounter + } + + acceptedMeasure.Add(receiverCtx, int64(numAccepted), metric.WithAttributes(rec.otelAttrs...)) + refusedMeasure.Add(receiverCtx, int64(numRefused), metric.WithAttributes(rec.otelAttrs...)) +} + +func (rec *ObsReport) recordWithOC(receiverCtx context.Context, dataType component.DataType, numAccepted, numRefused int) { + var acceptedMeasure, refusedMeasure *stats.Int64Measure + switch dataType { + case component.DataTypeTraces: + acceptedMeasure = obsmetrics.ReceiverAcceptedSpans + refusedMeasure = obsmetrics.ReceiverRefusedSpans + case component.DataTypeMetrics: + acceptedMeasure = obsmetrics.ReceiverAcceptedMetricPoints + refusedMeasure = obsmetrics.ReceiverRefusedMetricPoints + case component.DataTypeLogs: + acceptedMeasure = obsmetrics.ReceiverAcceptedLogRecords + refusedMeasure = obsmetrics.ReceiverRefusedLogRecords + } + + stats.Record( + receiverCtx, + acceptedMeasure.M(int64(numAccepted)), + refusedMeasure.M(int64(numRefused))) +} diff --git a/obsreport/obsreport_test.go b/receiver/receiverhelper/obsreport_test.go similarity index 75% rename from obsreport/obsreport_test.go rename to receiver/receiverhelper/obsreport_test.go index e8a3ad91776..4f3d57c945f 100644 --- a/obsreport/obsreport_test.go +++ b/receiver/receiverhelper/obsreport_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package obsreport +package receiverhelper import ( "context" @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/obsreport/obsreporttest" - "go.opentelemetry.io/collector/receiver/scrapererror" ) const ( @@ -26,10 +25,8 @@ const ( var ( receiverID = component.NewID("fakeReceiver") - scraperID = component.NewID("fakeScraper") - errFake = errors.New("errFake") - partialErrFake = scrapererror.NewPartialScrapeError(errFake, 1) + errFake = errors.New("errFake") ) type testParams struct { @@ -37,24 +34,6 @@ type testParams struct { err error } -func testTelemetry(t *testing.T, id component.ID, testFunc func(t *testing.T, tt obsreporttest.TestTelemetry, useOtel bool)) { - t.Run("WithOC", func(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry(id) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - testFunc(t, tt, false) - }) - - t.Run("WithOTel", func(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry(id) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - testFunc(t, tt, true) - }) -} - func TestReceiveTraceDataOp(t *testing.T) { testTelemetry(t, receiverID, func(t *testing.T, tt obsreporttest.TestTelemetry, useOtel bool) { parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) @@ -65,7 +44,7 @@ func TestReceiveTraceDataOp(t *testing.T) { {items: 42, err: nil}, } for i, param := range params { - rec, err := newReceiver(ReceiverSettings{ + rec, err := newReceiver(ObsReportSettings{ ReceiverID: receiverID, Transport: transport, ReceiverCreateSettings: tt.ToReceiverCreateSettings(), @@ -112,7 +91,7 @@ func TestReceiveLogsOp(t *testing.T) { {items: 42, err: nil}, } for i, param := range params { - rec, err := newReceiver(ReceiverSettings{ + rec, err := newReceiver(ObsReportSettings{ ReceiverID: receiverID, Transport: transport, ReceiverCreateSettings: tt.ToReceiverCreateSettings(), @@ -160,7 +139,7 @@ func TestReceiveMetricsOp(t *testing.T) { {items: 29, err: nil}, } for i, param := range params { - rec, err := newReceiver(ReceiverSettings{ + rec, err := newReceiver(ObsReportSettings{ ReceiverID: receiverID, Transport: transport, ReceiverCreateSettings: tt.ToReceiverCreateSettings(), @@ -199,63 +178,6 @@ func TestReceiveMetricsOp(t *testing.T) { }) } -func TestScrapeMetricsDataOp(t *testing.T) { - testTelemetry(t, receiverID, func(t *testing.T, tt obsreporttest.TestTelemetry, useOtel bool) { - parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) - defer parentSpan.End() - - params := []testParams{ - {items: 23, err: partialErrFake}, - {items: 29, err: errFake}, - {items: 15, err: nil}, - } - for i := range params { - scrp, err := newScraper(ScraperSettings{ - ReceiverID: receiverID, - Scraper: scraperID, - ReceiverCreateSettings: tt.ToReceiverCreateSettings(), - }, useOtel) - require.NoError(t, err) - ctx := scrp.StartMetricsOp(parentCtx) - assert.NotNil(t, ctx) - scrp.EndMetricsOp(ctx, params[i].items, params[i].err) - } - - spans := tt.SpanRecorder.Ended() - require.Equal(t, len(params), len(spans)) - - var scrapedMetricPoints, erroredMetricPoints int - for i, span := range spans { - assert.Equal(t, "scraper/"+receiverID.String()+"/"+scraperID.String()+"/MetricsScraped", span.Name()) - switch { - case params[i].err == nil: - scrapedMetricPoints += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)}) - assert.Equal(t, codes.Unset, span.Status().Code) - case errors.Is(params[i].err, errFake): - erroredMetricPoints += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - - case errors.Is(params[i].err, partialErrFake): - scrapedMetricPoints += params[i].items - erroredMetricPoints++ - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - default: - t.Fatalf("unexpected err param: %v", params[i].err) - } - } - - require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiverID, scraperID, int64(scrapedMetricPoints), int64(erroredMetricPoints))) - }) -} - func TestReceiveWithLongLivedCtx(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(receiverID) require.NoError(t, err) @@ -271,7 +193,7 @@ func TestReceiveWithLongLivedCtx(t *testing.T) { for i := range params { // Use a new context on each operation to simulate distinct operations // under the same long lived context. - rec, rerr := NewReceiver(ReceiverSettings{ + rec, rerr := NewObsReport(ObsReportSettings{ ReceiverID: receiverID, Transport: transport, LongLivedCtx: true, @@ -309,3 +231,21 @@ func TestReceiveWithLongLivedCtx(t *testing.T) { } } } + +func testTelemetry(t *testing.T, id component.ID, testFunc func(t *testing.T, tt obsreporttest.TestTelemetry, useOtel bool)) { + t.Run("WithOC", func(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(id) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + testFunc(t, tt, false) + }) + + t.Run("WithOTel", func(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(id) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + testFunc(t, tt, true) + }) +} diff --git a/receiver/scraperhelper/obsreport.go b/receiver/scraperhelper/obsreport.go new file mode 100644 index 00000000000..6f687d1fff4 --- /dev/null +++ b/receiver/scraperhelper/obsreport.go @@ -0,0 +1,178 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperhelper" + +import ( + "context" + "errors" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/scrapererror" +) + +var ( + scraperName = "scraper" + scraperScope = scopeName + nameSep + scraperName + scopeName = "go.opentelemetry.io/collector/obsreport" + nameSep = "/" +) + +// ObsReport is a helper to add observability to a scraper. +type ObsReport struct { + level configtelemetry.Level + receiverID component.ID + scraper component.ID + mutators []tag.Mutator + tracer trace.Tracer + + logger *zap.Logger + + useOtelForMetrics bool + otelAttrs []attribute.KeyValue + scrapedMetricsPoints metric.Int64Counter + erroredMetricsPoints metric.Int64Counter +} + +// ObsReportSettings are settings for creating an ObsReport. +type ObsReportSettings struct { + ReceiverID component.ID + Scraper component.ID + ReceiverCreateSettings receiver.CreateSettings +} + +// NewObsReport creates a new ObsReport. +func NewObsReport(cfg ObsReportSettings) (*ObsReport, error) { + return newScraper(cfg, obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled()) +} + +func newScraper(cfg ObsReportSettings, useOtel bool) (*ObsReport, error) { + scraper := &ObsReport{ + level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel, + receiverID: cfg.ReceiverID, + scraper: cfg.Scraper, + mutators: []tag.Mutator{ + tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)), + tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))}, + tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()), + + logger: cfg.ReceiverCreateSettings.Logger, + useOtelForMetrics: useOtel, + otelAttrs: []attribute.KeyValue{ + attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()), + attribute.String(obsmetrics.ScraperKey, cfg.Scraper.String()), + }, + } + + // ignore instrument name error as per workaround in https://github.com/open-telemetry/opentelemetry-collector/issues/8346 + // if err := scraper.createOtelMetrics(cfg); err != nil { + // return nil, err + // } + if err := scraper.createOtelMetrics(cfg); err != nil && !errors.Is(err, sdkmetric.ErrInstrumentName) { + return nil, err + } + + return scraper, nil +} + +func (s *ObsReport) createOtelMetrics(cfg ObsReportSettings) error { + if !s.useOtelForMetrics { + return nil + } + meter := cfg.ReceiverCreateSettings.MeterProvider.Meter(scraperScope) + + var errors, err error + + s.scrapedMetricsPoints, err = meter.Int64Counter( + obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey, + metric.WithDescription("Number of metric points successfully scraped."), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + + s.erroredMetricsPoints, err = meter.Int64Counter( + obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey, + metric.WithDescription("Number of metric points that were unable to be scraped."), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + + return errors +} + +// StartMetricsOp is called when a scrape operation is started. The +// returned context should be used in other calls to the obsreport functions +// dealing with the same scrape operation. +func (s *ObsReport) StartMetricsOp(ctx context.Context) context.Context { + ctx, _ = tag.New(ctx, s.mutators...) + + spanName := obsmetrics.ScraperPrefix + s.receiverID.String() + obsmetrics.NameSep + s.scraper.String() + obsmetrics.ScraperMetricsOperationSuffix + ctx, _ = s.tracer.Start(ctx, spanName) + return ctx +} + +// EndMetricsOp completes the scrape operation that was started with +// StartMetricsOp. +func (s *ObsReport) EndMetricsOp( + scraperCtx context.Context, + numScrapedMetrics int, + err error, +) { + numErroredMetrics := 0 + if err != nil { + var partialErr scrapererror.PartialScrapeError + if errors.As(err, &partialErr) { + numErroredMetrics = partialErr.Failed + } else { + numErroredMetrics = numScrapedMetrics + numScrapedMetrics = 0 + } + } + + span := trace.SpanFromContext(scraperCtx) + + if s.level != configtelemetry.LevelNone { + s.recordMetrics(scraperCtx, numScrapedMetrics, numErroredMetrics) + } + + // end span according to errors + if span.IsRecording() { + span.SetAttributes( + attribute.String(obsmetrics.FormatKey, string(component.DataTypeMetrics)), + attribute.Int64(obsmetrics.ScrapedMetricPointsKey, int64(numScrapedMetrics)), + attribute.Int64(obsmetrics.ErroredMetricPointsKey, int64(numErroredMetrics)), + ) + + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } + } + + span.End() +} + +func (s *ObsReport) recordMetrics(scraperCtx context.Context, numScrapedMetrics, numErroredMetrics int) { + if s.useOtelForMetrics { + s.scrapedMetricsPoints.Add(scraperCtx, int64(numScrapedMetrics), metric.WithAttributes(s.otelAttrs...)) + s.erroredMetricsPoints.Add(scraperCtx, int64(numErroredMetrics), metric.WithAttributes(s.otelAttrs...)) + } else { // OC for metrics + stats.Record( + scraperCtx, + obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)), + obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics))) + } +} diff --git a/receiver/scraperhelper/obsreport_test.go b/receiver/scraperhelper/obsreport_test.go new file mode 100644 index 00000000000..e658af3e74f --- /dev/null +++ b/receiver/scraperhelper/obsreport_test.go @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraperhelper + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" + "go.opentelemetry.io/collector/obsreport/obsreporttest" + "go.opentelemetry.io/collector/receiver/scrapererror" +) + +var ( + receiverID = component.NewID("fakeReceiver") + scraperID = component.NewID("fakeScraper") + + errFake = errors.New("errFake") + partialErrFake = scrapererror.NewPartialScrapeError(errFake, 1) +) + +type testParams struct { + items int + err error +} + +func TestScrapeMetricsDataOp(t *testing.T) { + testTelemetry(t, receiverID, func(t *testing.T, tt obsreporttest.TestTelemetry, useOtel bool) { + parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + params := []testParams{ + {items: 23, err: partialErrFake}, + {items: 29, err: errFake}, + {items: 15, err: nil}, + } + for i := range params { + scrp, err := newScraper(ObsReportSettings{ + ReceiverID: receiverID, + Scraper: scraperID, + ReceiverCreateSettings: tt.ToReceiverCreateSettings(), + }, useOtel) + require.NoError(t, err) + ctx := scrp.StartMetricsOp(parentCtx) + assert.NotNil(t, ctx) + scrp.EndMetricsOp(ctx, params[i].items, params[i].err) + } + + spans := tt.SpanRecorder.Ended() + require.Equal(t, len(params), len(spans)) + + var scrapedMetricPoints, erroredMetricPoints int + for i, span := range spans { + assert.Equal(t, "scraper/"+receiverID.String()+"/"+scraperID.String()+"/MetricsScraped", span.Name()) + switch { + case params[i].err == nil: + scrapedMetricPoints += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + case errors.Is(params[i].err, errFake): + erroredMetricPoints += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + + case errors.Is(params[i].err, partialErrFake): + scrapedMetricPoints += params[i].items + erroredMetricPoints++ + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + default: + t.Fatalf("unexpected err param: %v", params[i].err) + } + } + + require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiverID, scraperID, int64(scrapedMetricPoints), int64(erroredMetricPoints))) + }) +} + +func testTelemetry(t *testing.T, id component.ID, testFunc func(t *testing.T, tt obsreporttest.TestTelemetry, useOtel bool)) { + t.Run("WithOC", func(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(id) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + testFunc(t, tt, false) + }) + + t.Run("WithOTel", func(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(id) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + testFunc(t, tt, true) + }) +} diff --git a/receiver/scraperhelper/scrapercontroller.go b/receiver/scraperhelper/scrapercontroller.go index 9f3dd0b5f86..fb6da0a3ddf 100644 --- a/receiver/scraperhelper/scrapercontroller.go +++ b/receiver/scraperhelper/scrapercontroller.go @@ -13,9 +13,9 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/scrapererror" ) @@ -51,7 +51,7 @@ type controller struct { nextConsumer consumer.Metrics scrapers []Scraper - obsScrapers []*obsreport.Scraper + obsScrapers []*ObsReport tickerCh <-chan time.Time @@ -59,7 +59,7 @@ type controller struct { done chan struct{} terminated chan struct{} - obsrecv *obsreport.Receiver + obsrecv *receiverhelper.ObsReport recvSettings receiver.CreateSettings } @@ -78,7 +78,7 @@ func NewScraperControllerReceiver( return nil, errors.New("collection_interval must be a positive duration") } - obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "", ReceiverCreateSettings: set, @@ -104,9 +104,9 @@ func NewScraperControllerReceiver( op(sc) } - sc.obsScrapers = make([]*obsreport.Scraper, len(sc.scrapers)) + sc.obsScrapers = make([]*ObsReport, len(sc.scrapers)) for i, scraper := range sc.scrapers { - scrp, err := obsreport.NewScraper(obsreport.ScraperSettings{ + scrp, err := NewObsReport(ObsReportSettings{ ReceiverID: sc.id, Scraper: scraper.ID(), ReceiverCreateSettings: sc.recvSettings,