diff --git a/.chloggen/exp-status.yaml b/.chloggen/exp-status.yaml new file mode 100644 index 00000000000..05e3daab901 --- /dev/null +++ b/.chloggen/exp-status.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add runtime status reporting for OTLP exporters. + +# One or more tracking issues or pull requests related to the change +issues: [9957] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 81166979506..49a932041db 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -59,7 +59,7 @@ func createTraces( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelper.NewTraces(ctx, set, cfg, - oce.pushTraces, + oce.pushTracesWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), @@ -78,7 +78,7 @@ func createMetrics( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelper.NewMetrics(ctx, set, cfg, - oce.pushMetrics, + oce.pushMetricsWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), @@ -97,7 +97,7 @@ func createLogs( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelper.NewLogs(ctx, set, cfg, - oce.pushLogs, + oce.pushLogsWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), @@ -116,7 +116,7 @@ func createProfilesExporter( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelperprofiles.NewProfilesExporter(ctx, set, cfg, - oce.pushProfiles, + oce.pushProfilesWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index 4fa5d0430cc..f3a79bcbc0c 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.114.0 go.opentelemetry.io/collector/component v0.114.0 + go.opentelemetry.io/collector/component/componentstatus v0.114.0 go.opentelemetry.io/collector/component/componenttest v0.114.0 go.opentelemetry.io/collector/config/configauth v0.114.0 go.opentelemetry.io/collector/config/configcompression v1.20.0 diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index 9db2fbfc8a3..4467611f456 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -60,6 +60,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/component/componentstatus v0.114.0 h1:y9my/xink8KB5lK8zFAjgB2+pEh0QYy5TM972fxZY9w= +go.opentelemetry.io/collector/component/componentstatus v0.114.0/go.mod h1:RIoeCYZpPaae7QLE/1RacqzhHuXBmzRAk9H/EwYtIIs= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 h1:yMkBS9yViCc7U7yeLzJPM2XizlfdVvBRSmsQDWu6qc0= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0/go.mod h1:n8MR6/liuGB5EmTETUBeU5ZgqMOlqKRxUaqPQBOANZ8= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 20bf32f8411..f61d53b97d9 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" @@ -44,6 +45,7 @@ type baseExporter struct { metadata metadata.MD callOptions []grpc.CallOption + host component.Host settings component.TelemetrySettings // Default user-agent header. @@ -78,6 +80,7 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro e.callOptions = []grpc.CallOption{ grpc.WaitForReady(e.config.ClientConfig.WaitForReady), } + e.host = host return } @@ -105,6 +108,15 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { return nil } +func (e *baseExporter) pushTracesWithStatus(ctx context.Context, td ptrace.Traces) error { + if err := e.pushTraces(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { req := pmetricotlp.NewExportRequestFromMetrics(md) resp, respErr := e.metricExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) @@ -121,6 +133,15 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro return nil } +func (e *baseExporter) pushMetricsWithStatus(ctx context.Context, md pmetric.Metrics) error { + if err := e.pushMetrics(ctx, md); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { req := plogotlp.NewExportRequestFromLogs(ld) resp, respErr := e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) @@ -137,6 +158,15 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return nil } +func (e *baseExporter) pushLogsWithStatus(ctx context.Context, ld plog.Logs) error { + if err := e.pushLogs(ctx, ld); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error { req := pprofileotlp.NewExportRequestFromProfiles(td) resp, respErr := e.profileExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) @@ -153,6 +183,15 @@ func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) e return nil } +func (e *baseExporter) pushProfilesWithStatus(ctx context.Context, td pprofile.Profiles) error { + if err := e.pushProfiles(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) enhanceContext(ctx context.Context) context.Context { if e.metadata.Len() > 0 { return metadata.NewOutgoingContext(ctx, e.metadata) @@ -173,7 +212,6 @@ func processError(err error) error { return nil } - // Now, this is a real error. retryInfo := getRetryInfo(st) if !shouldRetry(st.Code(), retryInfo) { diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index ac50ce7027b..adc3cc5feb9 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -25,6 +25,8 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configopaque" @@ -1034,3 +1036,196 @@ func TestSendProfilesWhenEndpointHasHttpScheme(t *testing.T) { }) } } + +func TestComponentStatus(t *testing.T) { + tests := []struct { + name string + exportError error + componentStatus componentstatus.Status + }{ + { + name: "No Error", + exportError: nil, + componentStatus: componentstatus.StatusOK, + }, + { + name: "Resource Exhausted", + exportError: status.Error(codes.ResourceExhausted, "resource exhausted"), + componentStatus: componentstatus.StatusRecoverableError, + }, + } + + t.Run("traces", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) + rcv.setExportError(tt.exportError) + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.QueueConfig.Enabled = false + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + + set := exportertest.NewNopSettings() + host := &testHost{Host: componenttest.NewNopHost()} + + exp, err := factory.CreateTraces(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + require.NoError(t, exp.Start(context.Background(), host)) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + td := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), td) + + assert.Equal(t, tt.componentStatus != componentstatus.StatusOK, err != nil) + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("metrics", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + + rcv := otlpMetricsReceiverOnGRPCServer(ln) + rcv.setExportError(tt.exportError) + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.QueueConfig.Enabled = false + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + + set := exportertest.NewNopSettings() + host := &testHost{Host: componenttest.NewNopHost()} + + exp, err := factory.CreateMetrics(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + require.NoError(t, exp.Start(context.Background(), host)) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + md := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), md) + assert.Equal(t, tt.componentStatus != componentstatus.StatusOK, err != nil) + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("logs", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + + rcv := otlpLogsReceiverOnGRPCServer(ln) + rcv.setExportError(tt.exportError) + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.QueueConfig.Enabled = false + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + + set := exportertest.NewNopSettings() + host := &testHost{Host: componenttest.NewNopHost()} + + exp, err := factory.CreateLogs(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + require.NoError(t, exp.Start(context.Background(), host)) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + ld := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), ld) + assert.Equal(t, tt.componentStatus != componentstatus.StatusOK, err != nil) + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("profiles", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + + rcv, _ := otlpProfilesReceiverOnGRPCServer(ln, false) + rcv.setExportError(tt.exportError) + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.QueueConfig.Enabled = false + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + + set := exportertest.NewNopSettings() + host := &testHost{Host: componenttest.NewNopHost()} + + exp, err := factory.(exporterprofiles.Factory).CreateProfiles(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + require.NoError(t, exp.Start(context.Background(), host)) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + pd := pprofile.NewProfiles() + err = exp.ConsumeProfiles(context.Background(), pd) + assert.Equal(t, tt.componentStatus != componentstatus.StatusOK, err != nil) + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) +} + +var _ component.Host = (*testHost)(nil) +var _ componentstatus.Reporter = (*testHost)(nil) + +type testHost struct { + component.Host + lastStatus componentstatus.Status +} + +func (h *testHost) Report(ev *componentstatus.Event) { + if h.lastStatus != componentstatus.StatusPermanentError { + h.lastStatus = ev.Status() + } +} diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index c3a91a44bb9..2844d3b3348 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -90,7 +90,7 @@ func createTraces( } return exporterhelper.NewTraces(ctx, set, cfg, - oce.pushTraces, + oce.pushTracesWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. @@ -116,7 +116,7 @@ func createMetrics( } return exporterhelper.NewMetrics(ctx, set, cfg, - oce.pushMetrics, + oce.pushMetricsWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. @@ -141,7 +141,7 @@ func createLogs( } return exporterhelper.NewLogs(ctx, set, cfg, - oce.pushLogs, + oce.pushLogsWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. @@ -167,7 +167,7 @@ func createProfiles( } return exporterhelperprofiles.NewProfilesExporter(ctx, set, cfg, - oce.pushProfiles, + oce.pushProfilesWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index fdddca7c960..79e1a774a41 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.114.0 go.opentelemetry.io/collector/component v0.114.0 + go.opentelemetry.io/collector/component/componentstatus v0.114.0 go.opentelemetry.io/collector/component/componenttest v0.114.0 go.opentelemetry.io/collector/config/configcompression v1.20.0 go.opentelemetry.io/collector/config/confighttp v0.114.0 diff --git a/exporter/otlphttpexporter/go.sum b/exporter/otlphttpexporter/go.sum index 502920bff56..e156ee84206 100644 --- a/exporter/otlphttpexporter/go.sum +++ b/exporter/otlphttpexporter/go.sum @@ -64,6 +64,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/component/componentstatus v0.114.0 h1:y9my/xink8KB5lK8zFAjgB2+pEh0QYy5TM972fxZY9w= +go.opentelemetry.io/collector/component/componentstatus v0.114.0/go.mod h1:RIoeCYZpPaae7QLE/1RacqzhHuXBmzRAk9H/EwYtIIs= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 0decd18315a..0eeb756bae1 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -20,6 +20,7 @@ import ( "google.golang.org/protobuf/proto" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -43,6 +44,7 @@ type baseExporter struct { logsURL string profilesURL string logger *zap.Logger + host component.Host settings component.TelemetrySettings // Default user-agent header. userAgent string @@ -87,6 +89,7 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) error { return err } e.client = client + e.host = host return nil } @@ -111,6 +114,15 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { return e.export(ctx, e.tracesURL, request, e.tracesPartialSuccessHandler) } +func (e *baseExporter) pushTracesWithStatus(ctx context.Context, td ptrace.Traces) error { + if err := e.pushTraces(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { tr := pmetricotlp.NewExportRequestFromMetrics(md) @@ -131,6 +143,15 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro return e.export(ctx, e.metricsURL, request, e.metricsPartialSuccessHandler) } +func (e *baseExporter) pushMetricsWithStatus(ctx context.Context, md pmetric.Metrics) error { + if err := e.pushMetrics(ctx, md); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { tr := plogotlp.NewExportRequestFromLogs(ld) @@ -152,6 +173,15 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return e.export(ctx, e.logsURL, request, e.logsPartialSuccessHandler) } +func (e *baseExporter) pushLogsWithStatus(ctx context.Context, ld plog.Logs) error { + if err := e.pushLogs(ctx, ld); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error { tr := pprofileotlp.NewExportRequestFromProfiles(td) @@ -173,6 +203,15 @@ func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) e return e.export(ctx, e.profilesURL, request, e.profilesPartialSuccessHandler) } +func (e *baseExporter) pushProfilesWithStatus(ctx context.Context, td pprofile.Profiles) error { + if err := e.pushProfiles(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error { e.logger.Debug("Preparing to make HTTP request", zap.String("url", url)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request)) diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 65a2e5bacc7..e1a4e540e56 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -23,6 +23,8 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" @@ -1137,6 +1139,175 @@ func TestEncoding(t *testing.T) { }) } +func TestComponentStatus(t *testing.T) { + tests := []struct { + name string + responseStatus int + componentStatus componentstatus.Status + }{ + { + name: "200", + responseStatus: http.StatusOK, + componentStatus: componentstatus.StatusOK, + }, + { + name: "503", + responseStatus: http.StatusServiceUnavailable, + componentStatus: componentstatus.StatusRecoverableError, + }, + } + + t.Run("traces", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(tt.responseStatus) + }) + defer srv.Close() + + cfg := &Config{ + Encoding: EncodingProto, + TracesEndpoint: srv.URL + "%s/v1/traces", + } + + set := exportertest.NewNopSettings() + host := &testHost{ + Host: componenttest.NewNopHost(), + } + + exp, err := createTraces(context.Background(), set, cfg) + require.NoError(t, err) + + err = exp.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + traces := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), traces) + if tt.componentStatus != componentstatus.StatusOK { + require.Error(t, err) + } + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("metrics", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(tt.responseStatus) + }) + defer srv.Close() + + cfg := &Config{ + Encoding: EncodingProto, + MetricsEndpoint: srv.URL + "%s/v1/metrics", + } + + set := exportertest.NewNopSettings() + host := &testHost{ + Host: componenttest.NewNopHost(), + } + + exp, err := createMetrics(context.Background(), set, cfg) + require.NoError(t, err) + + err = exp.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + metrics := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), metrics) + if tt.componentStatus != componentstatus.StatusOK { + require.Error(t, err) + } + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("logs", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := createBackend("/v1/logs", func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(tt.responseStatus) + }) + defer srv.Close() + + cfg := &Config{ + Encoding: EncodingProto, + LogsEndpoint: srv.URL + "%s/v1/logs", + } + + set := exportertest.NewNopSettings() + host := &testHost{ + Host: componenttest.NewNopHost(), + } + + exp, err := createLogs(context.Background(), set, cfg) + require.NoError(t, err) + + err = exp.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + logs := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), logs) + if tt.componentStatus != componentstatus.StatusOK { + require.Error(t, err) + } + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("profiles", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := createBackend("/v1development/profiles", func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(tt.responseStatus) + }) + defer srv.Close() + + cfg := &Config{ + ClientConfig: confighttp.ClientConfig{ + Endpoint: srv.URL, + }, + Encoding: EncodingProto, + } + + set := exportertest.NewNopSettings() + host := &testHost{ + Host: componenttest.NewNopHost(), + } + + exp, err := createProfiles(context.Background(), set, cfg) + require.NoError(t, err) + + err = exp.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + pd := pprofile.NewProfiles() + err = exp.ConsumeProfiles(context.Background(), pd) + if tt.componentStatus != componentstatus.StatusOK { + require.Error(t, err) + } + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) +} + func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(endpoint, handler) @@ -1151,3 +1322,17 @@ type badReader struct{} func (b badReader) Read([]byte) (int, error) { return 0, errors.New("Bad read") } + +var _ component.Host = (*testHost)(nil) +var _ componentstatus.Reporter = (*testHost)(nil) + +type testHost struct { + component.Host + lastStatus componentstatus.Status +} + +func (h *testHost) Report(ev *componentstatus.Event) { + if h.lastStatus != componentstatus.StatusPermanentError { + h.lastStatus = ev.Status() + } +}