From 4d8e3a6e5808fb143fba13f811f98c61fc75bfcc Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Fri, 4 Oct 2024 09:35:21 -0700 Subject: [PATCH 1/9] Report runtime status from otlpexporter --- exporter/otlpexporter/go.mod | 1 + exporter/otlpexporter/otlp.go | 70 +++++++++--- exporter/otlpexporter/otlp_test.go | 170 +++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 14 deletions(-) diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index 4fa5d0430cc..f3a79bcbc0c 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.114.0 go.opentelemetry.io/collector/component v0.114.0 + go.opentelemetry.io/collector/component/componentstatus v0.114.0 go.opentelemetry.io/collector/component/componenttest v0.114.0 go.opentelemetry.io/collector/config/configauth v0.114.0 go.opentelemetry.io/collector/config/configcompression v1.20.0 diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 20bf32f8411..8933236ae8b 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" @@ -44,6 +45,7 @@ type baseExporter struct { metadata metadata.MD callOptions []grpc.CallOption + host component.Host settings component.TelemetrySettings // Default user-agent header. @@ -78,6 +80,7 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) (err erro e.callOptions = []grpc.CallOption{ grpc.WaitForReady(e.config.ClientConfig.WaitForReady), } + e.host = host return } @@ -89,11 +92,14 @@ func (e *baseExporter) shutdown(context.Context) error { return nil } -func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { +func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) (err error) { + defer func() { + e.reportStatusFromError(err) + }() req := ptraceotlp.NewExportRequestFromTraces(td) resp, respErr := e.traceExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { - return err + if err = e.processError(respErr); err != nil { + return } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { @@ -102,14 +108,17 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { zap.Int64("dropped_spans", resp.PartialSuccess().RejectedSpans()), ) } - return nil + return } -func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { +func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) (err error) { + defer func() { + e.reportStatusFromError(err) + }() req := pmetricotlp.NewExportRequestFromMetrics(md) resp, respErr := e.metricExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { - return err + if err = e.processError(respErr); err != nil { + return } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { @@ -118,14 +127,17 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro zap.Int64("dropped_data_points", resp.PartialSuccess().RejectedDataPoints()), ) } - return nil + return } -func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { +func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) (err error) { + defer func() { + e.reportStatusFromError(err) + }() req := plogotlp.NewExportRequestFromLogs(ld) resp, respErr := e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { - return err + if err = e.processError(respErr); err != nil { + return } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { @@ -134,13 +146,13 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { zap.Int64("dropped_log_records", resp.PartialSuccess().RejectedLogRecords()), ) } - return nil + return } func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error { req := pprofileotlp.NewExportRequestFromProfiles(td) resp, respErr := e.profileExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { + if err := e.processError(respErr); err != nil { return err } partialSuccess := resp.PartialSuccess() @@ -160,7 +172,7 @@ func (e *baseExporter) enhanceContext(ctx context.Context) context.Context { return ctx } -func processError(err error) error { +func (e *baseExporter) processError(err error) error { if err == nil { // Request is successful, we are done. return nil @@ -174,6 +186,10 @@ func processError(err error) error { } // Now, this is a real error. + if isComponentPermanentError(st) { + componentstatus.ReportStatus(e.host, componentstatus.NewPermanentErrorEvent(err)) + } + retryInfo := getRetryInfo(st) if !shouldRetry(st.Code(), retryInfo) { @@ -192,6 +208,14 @@ func processError(err error) error { return err } +func (e *baseExporter) reportStatusFromError(err error) { + if err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) +} + func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool { switch code { case codes.Canceled, @@ -229,3 +253,21 @@ func getThrottleDuration(t *errdetails.RetryInfo) time.Duration { } return 0 } + +// A component status of PermanentError indicates the component is in a state that will require user +// intervention to fix. Typically this is a misconfiguration detected at runtime. A component +// PermanentError has different semantics than a consumererror. For more information, see: +// https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-status.md +func isComponentPermanentError(st *status.Status) bool { + switch st.Code() { + case codes.NotFound: + return true + case codes.PermissionDenied: + return true + case codes.Unauthenticated: + return true + default: + return false + } + +} diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index ac50ce7027b..8de2b4bf123 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -25,6 +25,8 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configopaque" @@ -1034,3 +1036,171 @@ func TestSendProfilesWhenEndpointHasHttpScheme(t *testing.T) { }) } } + +func TestComponentStatus(t *testing.T) { + tests := []struct { + name string + exportError error + componentStatus componentstatus.Status + }{ + { + name: "No Error", + exportError: nil, + componentStatus: componentstatus.StatusOK, + }, + { + name: "Permission Denied", + exportError: status.Error(codes.PermissionDenied, "permission denied"), + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "Not Found", + exportError: status.Error(codes.NotFound, "not found"), + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "Unauthenticated", + exportError: status.Error(codes.Unauthenticated, "unauthenticated"), + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "Resource Exhausted", + exportError: status.Error(codes.ResourceExhausted, "resource exhausted"), + componentStatus: componentstatus.StatusRecoverableError, + }, + } + + t.Run("traces", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false) + rcv.setExportError(tt.exportError) + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.QueueConfig.Enabled = false + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + + set := exportertest.NewNopSettings() + host := &testHost{Host: componenttest.NewNopHost()} + + exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + assert.NoError(t, exp.Start(context.Background(), host)) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + td := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), td) + + assert.Equal(t, tt.componentStatus != componentstatus.StatusOK, err != nil) + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("metrics", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + + rcv := otlpMetricsReceiverOnGRPCServer(ln) + rcv.setExportError(tt.exportError) + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.QueueConfig.Enabled = false + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + + set := exportertest.NewNopSettings() + host := &testHost{Host: componenttest.NewNopHost()} + + exp, err := factory.CreateMetricsExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + assert.NoError(t, exp.Start(context.Background(), host)) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + md := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), md) + assert.Equal(t, tt.componentStatus != componentstatus.StatusOK, err != nil) + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("logs", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + + rcv := otlpLogsReceiverOnGRPCServer(ln) + rcv.setExportError(tt.exportError) + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.QueueConfig.Enabled = false + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + + set := exportertest.NewNopSettings() + host := &testHost{Host: componenttest.NewNopHost()} + + exp, err := factory.CreateLogsExporter(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + assert.NoError(t, exp.Start(context.Background(), host)) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + ld := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), ld) + assert.Equal(t, tt.componentStatus != componentstatus.StatusOK, err != nil) + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) +} + +var _ component.Host = (*testHost)(nil) +var _ componentstatus.Reporter = (*testHost)(nil) + +type testHost struct { + component.Host + lastStatus componentstatus.Status +} + +func (h *testHost) Report(ev *componentstatus.Event) { + if h.lastStatus != componentstatus.StatusPermanentError { + h.lastStatus = ev.Status() + } +} From 4a1fc30a004d2eb2bd31c64d6294b694cc089191 Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Fri, 4 Oct 2024 14:13:09 -0700 Subject: [PATCH 2/9] Report runtime status from otlphttpexporter --- exporter/otlphttpexporter/go.mod | 1 + exporter/otlphttpexporter/otlp.go | 45 +++++- exporter/otlphttpexporter/otlp_test.go | 186 +++++++++++++++++++++++++ 3 files changed, 231 insertions(+), 1 deletion(-) diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index fdddca7c960..79e1a774a41 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.114.0 go.opentelemetry.io/collector/component v0.114.0 + go.opentelemetry.io/collector/component/componentstatus v0.114.0 go.opentelemetry.io/collector/component/componenttest v0.114.0 go.opentelemetry.io/collector/config/configcompression v1.20.0 go.opentelemetry.io/collector/config/confighttp v0.114.0 diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 0decd18315a..c823a56a4cd 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -20,6 +20,7 @@ import ( "google.golang.org/protobuf/proto" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -43,6 +44,7 @@ type baseExporter struct { logsURL string profilesURL string logger *zap.Logger + host component.Host settings component.TelemetrySettings // Default user-agent header. userAgent string @@ -87,6 +89,7 @@ func (e *baseExporter) start(ctx context.Context, host component.Host) error { return err } e.client = client + e.host = host return nil } @@ -173,7 +176,10 @@ func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) e return e.export(ctx, e.profilesURL, request, e.profilesPartialSuccessHandler) } -func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error { +func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) (err error) { + defer func() { + e.reportStatusFromError(err) + }() e.logger.Debug("Preparing to make HTTP request", zap.String("url", url)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request)) if err != nil { @@ -222,6 +228,10 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p } formattedErr = httphelper.NewStatusFromMsgAndHTTPCode(errString, resp.StatusCode).Err() + if isComponentPermanentError(resp.StatusCode) { + componentstatus.ReportStatus(e.host, componentstatus.NewPermanentErrorEvent(formattedErr)) + } + if isRetryableStatusCode(resp.StatusCode) { // A retry duration of 0 seconds will trigger the default backoff policy // of our caller (retry handler). @@ -242,6 +252,14 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p return consumererror.NewPermanent(formattedErr) } +func (e *baseExporter) reportStatusFromError(err error) { + if err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) +} + // Determine if the status code is retryable according to the specification. // For more, see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures-1 func isRetryableStatusCode(code int) bool { @@ -259,6 +277,31 @@ func isRetryableStatusCode(code int) bool { } } +// A component status of PermanentError indicates the component is in a state that will require user +// intervention to fix. Typically this is a misconfiguration detected at runtime. A component +// PermanentError has different semantics than a consumererror. For more information, see: +// https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-status.md +func isComponentPermanentError(code int) bool { + switch code { + case http.StatusUnauthorized: + return true + case http.StatusForbidden: + return true + case http.StatusNotFound: + return true + case http.StatusMethodNotAllowed: + return true + case http.StatusRequestEntityTooLarge: + return true + case http.StatusRequestURITooLong: + return true + case http.StatusRequestHeaderFieldsTooLarge: + return true + default: + return false + } +} + func readResponseBody(resp *http.Response) ([]byte, error) { if resp.ContentLength == 0 { return nil, nil diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 65a2e5bacc7..fe46bcd8a2c 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -23,6 +23,8 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" @@ -1137,6 +1139,176 @@ func TestEncoding(t *testing.T) { }) } +func TestComponentStatus(t *testing.T) { + tests := []struct { + name string + responseStatus int + componentStatus componentstatus.Status + }{ + { + name: "200", + responseStatus: http.StatusOK, + componentStatus: componentstatus.StatusOK, + }, + { + name: "401", + responseStatus: http.StatusUnauthorized, + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "403", + responseStatus: http.StatusForbidden, + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "404", + responseStatus: http.StatusNotFound, + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "405", + responseStatus: http.StatusMethodNotAllowed, + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "413", + responseStatus: http.StatusRequestEntityTooLarge, + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "414", + responseStatus: http.StatusRequestURITooLong, + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "419", + responseStatus: http.StatusTooManyRequests, + componentStatus: componentstatus.StatusRecoverableError, + }, + { + name: "431", + responseStatus: http.StatusRequestHeaderFieldsTooLarge, + componentStatus: componentstatus.StatusPermanentError, + }, + { + name: "503", + responseStatus: http.StatusServiceUnavailable, + componentStatus: componentstatus.StatusRecoverableError, + }, + } + + t.Run("traces", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + writer.WriteHeader(tt.responseStatus) + }) + defer srv.Close() + + cfg := &Config{ + Encoding: EncodingProto, + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), + } + + set := exportertest.NewNopSettings() + host := &testHost{ + Host: componenttest.NewNopHost(), + } + + exp, err := createTraces(context.Background(), set, cfg) + require.NoError(t, err) + + err = exp.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + traces := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), traces) + if tt.componentStatus != componentstatus.StatusOK { + assert.Error(t, err) + } + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("metrics", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + writer.WriteHeader(tt.responseStatus) + }) + defer srv.Close() + + cfg := &Config{ + Encoding: EncodingProto, + MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), + } + + set := exportertest.NewNopSettings() + host := &testHost{ + Host: componenttest.NewNopHost(), + } + + exp, err := createMetrics(context.Background(), set, cfg) + require.NoError(t, err) + + err = exp.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + metrics := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), metrics) + if tt.componentStatus != componentstatus.StatusOK { + assert.Error(t, err) + } + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) + + t.Run("logs", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + writer.WriteHeader(tt.responseStatus) + }) + defer srv.Close() + + cfg := &Config{ + Encoding: EncodingProto, + LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), + } + + set := exportertest.NewNopSettings() + host := &testHost{ + Host: componenttest.NewNopHost(), + } + + exp, err := createLogs(context.Background(), set, cfg) + require.NoError(t, err) + + err = exp.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + logs := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), logs) + if tt.componentStatus != componentstatus.StatusOK { + assert.Error(t, err) + } + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) +} + func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(endpoint, handler) @@ -1151,3 +1323,17 @@ type badReader struct{} func (b badReader) Read([]byte) (int, error) { return 0, errors.New("Bad read") } + +var _ component.Host = (*testHost)(nil) +var _ componentstatus.Reporter = (*testHost)(nil) + +type testHost struct { + component.Host + lastStatus componentstatus.Status +} + +func (h *testHost) Report(ev *componentstatus.Event) { + if h.lastStatus != componentstatus.StatusPermanentError { + h.lastStatus = ev.Status() + } +} From 5e2ba13f93283a1a1fb6802de734deb110751161 Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Fri, 4 Oct 2024 15:06:04 -0700 Subject: [PATCH 3/9] Add changelog --- .chloggen/exp-status.yaml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .chloggen/exp-status.yaml diff --git a/.chloggen/exp-status.yaml b/.chloggen/exp-status.yaml new file mode 100644 index 00000000000..05e3daab901 --- /dev/null +++ b/.chloggen/exp-status.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add runtime status reporting for OTLP exporters. + +# One or more tracking issues or pull requests related to the change +issues: [9957] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From ed5842a9e6ff40d18e7c4ac0bd47d0348e9f1483 Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Fri, 4 Oct 2024 15:11:20 -0700 Subject: [PATCH 4/9] Lint --- exporter/otlpexporter/otlp_test.go | 6 +++--- exporter/otlphttpexporter/otlp_test.go | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index 8de2b4bf123..ad1241b5f04 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -1095,7 +1095,7 @@ func TestComponentStatus(t *testing.T) { exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) require.NoError(t, err) require.NotNil(t, exp) - assert.NoError(t, exp.Start(context.Background(), host)) + require.NoError(t, exp.Start(context.Background(), host)) defer func() { assert.NoError(t, exp.Shutdown(context.Background())) @@ -1136,7 +1136,7 @@ func TestComponentStatus(t *testing.T) { exp, err := factory.CreateMetricsExporter(context.Background(), set, cfg) require.NoError(t, err) require.NotNil(t, exp) - assert.NoError(t, exp.Start(context.Background(), host)) + require.NoError(t, exp.Start(context.Background(), host)) defer func() { assert.NoError(t, exp.Shutdown(context.Background())) @@ -1176,7 +1176,7 @@ func TestComponentStatus(t *testing.T) { exp, err := factory.CreateLogsExporter(context.Background(), set, cfg) require.NoError(t, err) require.NotNil(t, exp) - assert.NoError(t, exp.Start(context.Background(), host)) + require.NoError(t, exp.Start(context.Background(), host)) defer func() { assert.NoError(t, exp.Shutdown(context.Background())) diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index fe46bcd8a2c..9a4fe9cd887 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -1200,7 +1200,7 @@ func TestComponentStatus(t *testing.T) { t.Run("traces", func(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(tt.responseStatus) }) defer srv.Close() @@ -1227,7 +1227,7 @@ func TestComponentStatus(t *testing.T) { traces := ptrace.NewTraces() err = exp.ConsumeTraces(context.Background(), traces) if tt.componentStatus != componentstatus.StatusOK { - assert.Error(t, err) + require.Error(t, err) } assert.Equal(t, tt.componentStatus, host.lastStatus) }) @@ -1237,7 +1237,7 @@ func TestComponentStatus(t *testing.T) { t.Run("metrics", func(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(tt.responseStatus) }) defer srv.Close() @@ -1264,7 +1264,7 @@ func TestComponentStatus(t *testing.T) { metrics := pmetric.NewMetrics() err = exp.ConsumeMetrics(context.Background(), metrics) if tt.componentStatus != componentstatus.StatusOK { - assert.Error(t, err) + require.Error(t, err) } assert.Equal(t, tt.componentStatus, host.lastStatus) }) @@ -1274,7 +1274,7 @@ func TestComponentStatus(t *testing.T) { t.Run("logs", func(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/logs", func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(tt.responseStatus) }) defer srv.Close() @@ -1301,7 +1301,7 @@ func TestComponentStatus(t *testing.T) { logs := plog.NewLogs() err = exp.ConsumeLogs(context.Background(), logs) if tt.componentStatus != componentstatus.StatusOK { - assert.Error(t, err) + require.Error(t, err) } assert.Equal(t, tt.componentStatus, host.lastStatus) }) From 763363a1376cbe5a907ba82aa6ac2b7872d072c2 Mon Sep 17 00:00:00 2001 From: "matt.wear" Date: Fri, 18 Oct 2024 15:10:32 -0700 Subject: [PATCH 5/9] Handle all errors as recoverable --- exporter/otlpexporter/otlp.go | 33 ++++----------------- exporter/otlpexporter/otlp_test.go | 21 ++------------ exporter/otlphttpexporter/otlp.go | 29 ------------------- exporter/otlphttpexporter/otlp_test.go | 40 -------------------------- 4 files changed, 8 insertions(+), 115 deletions(-) diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 8933236ae8b..2ca4deadd93 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -98,7 +98,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) (err er }() req := ptraceotlp.NewExportRequestFromTraces(td) resp, respErr := e.traceExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err = e.processError(respErr); err != nil { + if err = processError(respErr); err != nil { return } partialSuccess := resp.PartialSuccess() @@ -117,7 +117,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) (err }() req := pmetricotlp.NewExportRequestFromMetrics(md) resp, respErr := e.metricExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err = e.processError(respErr); err != nil { + if err = processError(respErr); err != nil { return } partialSuccess := resp.PartialSuccess() @@ -136,7 +136,7 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) (err error) { }() req := plogotlp.NewExportRequestFromLogs(ld) resp, respErr := e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err = e.processError(respErr); err != nil { + if err = processError(respErr); err != nil { return } partialSuccess := resp.PartialSuccess() @@ -152,7 +152,7 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) (err error) { func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error { req := pprofileotlp.NewExportRequestFromProfiles(td) resp, respErr := e.profileExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := e.processError(respErr); err != nil { + if err := processError(respErr); err != nil { return err } partialSuccess := resp.PartialSuccess() @@ -172,7 +172,7 @@ func (e *baseExporter) enhanceContext(ctx context.Context) context.Context { return ctx } -func (e *baseExporter) processError(err error) error { +func processError(err error) error { if err == nil { // Request is successful, we are done. return nil @@ -185,11 +185,6 @@ func (e *baseExporter) processError(err error) error { return nil } - // Now, this is a real error. - if isComponentPermanentError(st) { - componentstatus.ReportStatus(e.host, componentstatus.NewPermanentErrorEvent(err)) - } - retryInfo := getRetryInfo(st) if !shouldRetry(st.Code(), retryInfo) { @@ -253,21 +248,3 @@ func getThrottleDuration(t *errdetails.RetryInfo) time.Duration { } return 0 } - -// A component status of PermanentError indicates the component is in a state that will require user -// intervention to fix. Typically this is a misconfiguration detected at runtime. A component -// PermanentError has different semantics than a consumererror. For more information, see: -// https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-status.md -func isComponentPermanentError(st *status.Status) bool { - switch st.Code() { - case codes.NotFound: - return true - case codes.PermissionDenied: - return true - case codes.Unauthenticated: - return true - default: - return false - } - -} diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index ad1241b5f04..4f4fb32e5d7 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -1048,21 +1048,6 @@ func TestComponentStatus(t *testing.T) { exportError: nil, componentStatus: componentstatus.StatusOK, }, - { - name: "Permission Denied", - exportError: status.Error(codes.PermissionDenied, "permission denied"), - componentStatus: componentstatus.StatusPermanentError, - }, - { - name: "Not Found", - exportError: status.Error(codes.NotFound, "not found"), - componentStatus: componentstatus.StatusPermanentError, - }, - { - name: "Unauthenticated", - exportError: status.Error(codes.Unauthenticated, "unauthenticated"), - componentStatus: componentstatus.StatusPermanentError, - }, { name: "Resource Exhausted", exportError: status.Error(codes.ResourceExhausted, "resource exhausted"), @@ -1092,7 +1077,7 @@ func TestComponentStatus(t *testing.T) { set := exportertest.NewNopSettings() host := &testHost{Host: componenttest.NewNopHost()} - exp, err := factory.CreateTracesExporter(context.Background(), set, cfg) + exp, err := factory.CreateTraces(context.Background(), set, cfg) require.NoError(t, err) require.NotNil(t, exp) require.NoError(t, exp.Start(context.Background(), host)) @@ -1133,7 +1118,7 @@ func TestComponentStatus(t *testing.T) { set := exportertest.NewNopSettings() host := &testHost{Host: componenttest.NewNopHost()} - exp, err := factory.CreateMetricsExporter(context.Background(), set, cfg) + exp, err := factory.CreateMetrics(context.Background(), set, cfg) require.NoError(t, err) require.NotNil(t, exp) require.NoError(t, exp.Start(context.Background(), host)) @@ -1173,7 +1158,7 @@ func TestComponentStatus(t *testing.T) { set := exportertest.NewNopSettings() host := &testHost{Host: componenttest.NewNopHost()} - exp, err := factory.CreateLogsExporter(context.Background(), set, cfg) + exp, err := factory.CreateLogs(context.Background(), set, cfg) require.NoError(t, err) require.NotNil(t, exp) require.NoError(t, exp.Start(context.Background(), host)) diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index c823a56a4cd..ad066cb343e 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -228,10 +228,6 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p } formattedErr = httphelper.NewStatusFromMsgAndHTTPCode(errString, resp.StatusCode).Err() - if isComponentPermanentError(resp.StatusCode) { - componentstatus.ReportStatus(e.host, componentstatus.NewPermanentErrorEvent(formattedErr)) - } - if isRetryableStatusCode(resp.StatusCode) { // A retry duration of 0 seconds will trigger the default backoff policy // of our caller (retry handler). @@ -277,31 +273,6 @@ func isRetryableStatusCode(code int) bool { } } -// A component status of PermanentError indicates the component is in a state that will require user -// intervention to fix. Typically this is a misconfiguration detected at runtime. A component -// PermanentError has different semantics than a consumererror. For more information, see: -// https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-status.md -func isComponentPermanentError(code int) bool { - switch code { - case http.StatusUnauthorized: - return true - case http.StatusForbidden: - return true - case http.StatusNotFound: - return true - case http.StatusMethodNotAllowed: - return true - case http.StatusRequestEntityTooLarge: - return true - case http.StatusRequestURITooLong: - return true - case http.StatusRequestHeaderFieldsTooLarge: - return true - default: - return false - } -} - func readResponseBody(resp *http.Response) ([]byte, error) { if resp.ContentLength == 0 { return nil, nil diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 9a4fe9cd887..459c9447427 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -1150,46 +1150,6 @@ func TestComponentStatus(t *testing.T) { responseStatus: http.StatusOK, componentStatus: componentstatus.StatusOK, }, - { - name: "401", - responseStatus: http.StatusUnauthorized, - componentStatus: componentstatus.StatusPermanentError, - }, - { - name: "403", - responseStatus: http.StatusForbidden, - componentStatus: componentstatus.StatusPermanentError, - }, - { - name: "404", - responseStatus: http.StatusNotFound, - componentStatus: componentstatus.StatusPermanentError, - }, - { - name: "405", - responseStatus: http.StatusMethodNotAllowed, - componentStatus: componentstatus.StatusPermanentError, - }, - { - name: "413", - responseStatus: http.StatusRequestEntityTooLarge, - componentStatus: componentstatus.StatusPermanentError, - }, - { - name: "414", - responseStatus: http.StatusRequestURITooLong, - componentStatus: componentstatus.StatusPermanentError, - }, - { - name: "419", - responseStatus: http.StatusTooManyRequests, - componentStatus: componentstatus.StatusRecoverableError, - }, - { - name: "431", - responseStatus: http.StatusRequestHeaderFieldsTooLarge, - componentStatus: componentstatus.StatusPermanentError, - }, { name: "503", responseStatus: http.StatusServiceUnavailable, From 0942592ab432f88bf908feeee69e66d070bd6b1d Mon Sep 17 00:00:00 2001 From: "matt.wear" Date: Wed, 23 Oct 2024 13:41:51 -0700 Subject: [PATCH 6/9] Use wrapper function instead of defer --- exporter/otlpexporter/factory.go | 8 +-- exporter/otlpexporter/otlp.go | 77 +++++++++++++++++----------- exporter/otlphttpexporter/factory.go | 8 +-- exporter/otlphttpexporter/otlp.go | 49 +++++++++++++----- 4 files changed, 93 insertions(+), 49 deletions(-) diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 81166979506..49a932041db 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -59,7 +59,7 @@ func createTraces( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelper.NewTraces(ctx, set, cfg, - oce.pushTraces, + oce.pushTracesWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), @@ -78,7 +78,7 @@ func createMetrics( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelper.NewMetrics(ctx, set, cfg, - oce.pushMetrics, + oce.pushMetricsWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), @@ -97,7 +97,7 @@ func createLogs( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelper.NewLogs(ctx, set, cfg, - oce.pushLogs, + oce.pushLogsWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), @@ -116,7 +116,7 @@ func createProfilesExporter( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelperprofiles.NewProfilesExporter(ctx, set, cfg, - oce.pushProfiles, + oce.pushProfilesWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 2ca4deadd93..f61d53b97d9 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -92,14 +92,11 @@ func (e *baseExporter) shutdown(context.Context) error { return nil } -func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) (err error) { - defer func() { - e.reportStatusFromError(err) - }() +func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { req := ptraceotlp.NewExportRequestFromTraces(td) resp, respErr := e.traceExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err = processError(respErr); err != nil { - return + if err := processError(respErr); err != nil { + return err } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { @@ -108,17 +105,23 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) (err er zap.Int64("dropped_spans", resp.PartialSuccess().RejectedSpans()), ) } - return + return nil +} + +func (e *baseExporter) pushTracesWithStatus(ctx context.Context, td ptrace.Traces) error { + if err := e.pushTraces(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil } -func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) (err error) { - defer func() { - e.reportStatusFromError(err) - }() +func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { req := pmetricotlp.NewExportRequestFromMetrics(md) resp, respErr := e.metricExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err = processError(respErr); err != nil { - return + if err := processError(respErr); err != nil { + return err } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { @@ -127,17 +130,23 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) (err zap.Int64("dropped_data_points", resp.PartialSuccess().RejectedDataPoints()), ) } - return + return nil } -func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) (err error) { - defer func() { - e.reportStatusFromError(err) - }() +func (e *baseExporter) pushMetricsWithStatus(ctx context.Context, md pmetric.Metrics) error { + if err := e.pushMetrics(ctx, md); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + +func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { req := plogotlp.NewExportRequestFromLogs(ld) resp, respErr := e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err = processError(respErr); err != nil { - return + if err := processError(respErr); err != nil { + return err } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { @@ -146,7 +155,16 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) (err error) { zap.Int64("dropped_log_records", resp.PartialSuccess().RejectedLogRecords()), ) } - return + return nil +} + +func (e *baseExporter) pushLogsWithStatus(ctx context.Context, ld plog.Logs) error { + if err := e.pushLogs(ctx, ld); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil } func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error { @@ -165,6 +183,15 @@ func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) e return nil } +func (e *baseExporter) pushProfilesWithStatus(ctx context.Context, td pprofile.Profiles) error { + if err := e.pushProfiles(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) enhanceContext(ctx context.Context) context.Context { if e.metadata.Len() > 0 { return metadata.NewOutgoingContext(ctx, e.metadata) @@ -203,14 +230,6 @@ func processError(err error) error { return err } -func (e *baseExporter) reportStatusFromError(err error) { - if err != nil { - componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) - return - } - componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) -} - func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool { switch code { case codes.Canceled, diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index c3a91a44bb9..2844d3b3348 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -90,7 +90,7 @@ func createTraces( } return exporterhelper.NewTraces(ctx, set, cfg, - oce.pushTraces, + oce.pushTracesWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. @@ -116,7 +116,7 @@ func createMetrics( } return exporterhelper.NewMetrics(ctx, set, cfg, - oce.pushMetrics, + oce.pushMetricsWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. @@ -141,7 +141,7 @@ func createLogs( } return exporterhelper.NewLogs(ctx, set, cfg, - oce.pushLogs, + oce.pushLogsWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. @@ -167,7 +167,7 @@ func createProfiles( } return exporterhelperprofiles.NewProfilesExporter(ctx, set, cfg, - oce.pushProfiles, + oce.pushProfilesWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index ad066cb343e..0eeb756bae1 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -114,6 +114,15 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { return e.export(ctx, e.tracesURL, request, e.tracesPartialSuccessHandler) } +func (e *baseExporter) pushTracesWithStatus(ctx context.Context, td ptrace.Traces) error { + if err := e.pushTraces(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { tr := pmetricotlp.NewExportRequestFromMetrics(md) @@ -134,6 +143,15 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro return e.export(ctx, e.metricsURL, request, e.metricsPartialSuccessHandler) } +func (e *baseExporter) pushMetricsWithStatus(ctx context.Context, md pmetric.Metrics) error { + if err := e.pushMetrics(ctx, md); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { tr := plogotlp.NewExportRequestFromLogs(ld) @@ -155,6 +173,15 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return e.export(ctx, e.logsURL, request, e.logsPartialSuccessHandler) } +func (e *baseExporter) pushLogsWithStatus(ctx context.Context, ld plog.Logs) error { + if err := e.pushLogs(ctx, ld); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error { tr := pprofileotlp.NewExportRequestFromProfiles(td) @@ -176,10 +203,16 @@ func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) e return e.export(ctx, e.profilesURL, request, e.profilesPartialSuccessHandler) } -func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) (err error) { - defer func() { - e.reportStatusFromError(err) - }() +func (e *baseExporter) pushProfilesWithStatus(ctx context.Context, td pprofile.Profiles) error { + if err := e.pushProfiles(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + +func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error { e.logger.Debug("Preparing to make HTTP request", zap.String("url", url)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request)) if err != nil { @@ -248,14 +281,6 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p return consumererror.NewPermanent(formattedErr) } -func (e *baseExporter) reportStatusFromError(err error) { - if err != nil { - componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) - return - } - componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) -} - // Determine if the status code is retryable according to the specification. // For more, see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures-1 func isRetryableStatusCode(code int) bool { From 6a2376a02344f61703b4cdba2d771d32067ca1f1 Mon Sep 17 00:00:00 2001 From: "matt.wear" Date: Wed, 23 Oct 2024 16:51:26 -0700 Subject: [PATCH 7/9] Coverage --- exporter/otlpexporter/otlp_test.go | 40 ++++++++++++++++++++++++++ exporter/otlphttpexporter/otlp_test.go | 39 +++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index 4f4fb32e5d7..adc3cc5feb9 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -1174,6 +1174,46 @@ func TestComponentStatus(t *testing.T) { }) } }) + + t.Run("profiles", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + + rcv, _ := otlpProfilesReceiverOnGRPCServer(ln, false) + rcv.setExportError(tt.exportError) + defer rcv.srv.GracefulStop() + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.QueueConfig.Enabled = false + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + + set := exportertest.NewNopSettings() + host := &testHost{Host: componenttest.NewNopHost()} + + exp, err := factory.(exporterprofiles.Factory).CreateProfiles(context.Background(), set, cfg) + require.NoError(t, err) + require.NotNil(t, exp) + require.NoError(t, exp.Start(context.Background(), host)) + + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }() + + pd := pprofile.NewProfiles() + err = exp.ConsumeProfiles(context.Background(), pd) + assert.Equal(t, tt.componentStatus != componentstatus.StatusOK, err != nil) + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) } var _ component.Host = (*testHost)(nil) diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 459c9447427..73e43ba88d7 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -1267,6 +1267,45 @@ func TestComponentStatus(t *testing.T) { }) } }) + + t.Run("profiles", func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := createBackend("/v1development/profiles", func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(tt.responseStatus) + }) + defer srv.Close() + + cfg := &Config{ + ClientConfig: confighttp.ClientConfig{ + Endpoint: srv.URL, + }, + Encoding: EncodingProto, + } + + set := exportertest.NewNopSettings() + host := &testHost{ + Host: componenttest.NewNopHost(), + } + + exp, err := createProfiles(context.Background(), set, cfg) + require.NoError(t, err) + + err = exp.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + pd := pprofile.NewProfiles() + err = exp.ConsumeProfiles(context.Background(), pd) + if tt.componentStatus != componentstatus.StatusOK { + require.Error(t, err) + } + assert.Equal(t, tt.componentStatus, host.lastStatus) + }) + } + }) } func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server { From 2043ebaa8c68d4b78f06360105c01008c252143d Mon Sep 17 00:00:00 2001 From: "matt.wear" Date: Fri, 22 Nov 2024 15:54:20 -0800 Subject: [PATCH 8/9] go mod tidy --- exporter/otlpexporter/go.sum | 2 ++ exporter/otlphttpexporter/go.sum | 2 ++ 2 files changed, 4 insertions(+) diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index 9db2fbfc8a3..4467611f456 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -60,6 +60,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/component/componentstatus v0.114.0 h1:y9my/xink8KB5lK8zFAjgB2+pEh0QYy5TM972fxZY9w= +go.opentelemetry.io/collector/component/componentstatus v0.114.0/go.mod h1:RIoeCYZpPaae7QLE/1RacqzhHuXBmzRAk9H/EwYtIIs= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 h1:yMkBS9yViCc7U7yeLzJPM2XizlfdVvBRSmsQDWu6qc0= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0/go.mod h1:n8MR6/liuGB5EmTETUBeU5ZgqMOlqKRxUaqPQBOANZ8= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= diff --git a/exporter/otlphttpexporter/go.sum b/exporter/otlphttpexporter/go.sum index 502920bff56..e156ee84206 100644 --- a/exporter/otlphttpexporter/go.sum +++ b/exporter/otlphttpexporter/go.sum @@ -64,6 +64,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/component/componentstatus v0.114.0 h1:y9my/xink8KB5lK8zFAjgB2+pEh0QYy5TM972fxZY9w= +go.opentelemetry.io/collector/component/componentstatus v0.114.0/go.mod h1:RIoeCYZpPaae7QLE/1RacqzhHuXBmzRAk9H/EwYtIIs= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= From 4557415bbac307b1e843dfc238dc65a0dad62369 Mon Sep 17 00:00:00 2001 From: "matt.wear" Date: Fri, 22 Nov 2024 16:08:26 -0800 Subject: [PATCH 9/9] Lint --- exporter/otlphttpexporter/otlp_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 73e43ba88d7..e1a4e540e56 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -1167,7 +1167,7 @@ func TestComponentStatus(t *testing.T) { cfg := &Config{ Encoding: EncodingProto, - TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), + TracesEndpoint: srv.URL + "%s/v1/traces", } set := exportertest.NewNopSettings() @@ -1204,7 +1204,7 @@ func TestComponentStatus(t *testing.T) { cfg := &Config{ Encoding: EncodingProto, - MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), + MetricsEndpoint: srv.URL + "%s/v1/metrics", } set := exportertest.NewNopSettings() @@ -1241,7 +1241,7 @@ func TestComponentStatus(t *testing.T) { cfg := &Config{ Encoding: EncodingProto, - LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), + LogsEndpoint: srv.URL + "%s/v1/logs", } set := exportertest.NewNopSettings()