From 05867e63ac871040b790d91ffb85efdc4148b45f Mon Sep 17 00:00:00 2001 From: Joshua Jones Date: Thu, 21 Mar 2024 14:33:02 -0400 Subject: [PATCH] [otlphttpexporter] return nil from partial success handler when HTTP response body is empty (#9667) **Description:** Fixing a bug - When exporting using the otlphttpexporter, after receiving a successful HTTP response, when the response body's content length is 0 and the content type is specified as either "application/json" or "application/x-protobuf", an attempt will be made to unmarshal a nil value within any of the partial success response handler functions. This results in an error, and a potential resend of the original export request. To fix this scenario, a check was added to the `tracesPartialSuccessHandler`, `metricsPartialSuccessHandler`, and `logsPartialSuccessHandler` functions for a `nil` value in the `protoBytes` argument. When `nil`, the function will return with a `nil` value, indicating the absence of any error. **Link to tracking Issue:** #9666 --- .chloggen/fix_empty_response.yaml | 25 ++ exporter/otlphttpexporter/otlp.go | 9 + exporter/otlphttpexporter/otlp_test.go | 324 ++++++++++++++++++++----- 3 files changed, 298 insertions(+), 60 deletions(-) create mode 100644 .chloggen/fix_empty_response.yaml diff --git a/.chloggen/fix_empty_response.yaml b/.chloggen/fix_empty_response.yaml new file mode 100644 index 00000000000..66f1338c7ee --- /dev/null +++ b/.chloggen/fix_empty_response.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlphttpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: prevent error on empty response body when content type is application/json + +# One or more tracking issues or pull requests related to the change +issues: [9666] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 6ebc50e2cc0..8a8d428121b 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -301,6 +301,9 @@ func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler par type partialSuccessHandler func(bytes []byte, contentType string) error func (e *baseExporter) tracesPartialSuccessHandler(protoBytes []byte, contentType string) error { + if protoBytes == nil { + return nil + } exportResponse := ptraceotlp.NewExportResponse() switch contentType { case protobufContentType: @@ -328,6 +331,9 @@ func (e *baseExporter) tracesPartialSuccessHandler(protoBytes []byte, contentTyp } func (e *baseExporter) metricsPartialSuccessHandler(protoBytes []byte, contentType string) error { + if protoBytes == nil { + return nil + } exportResponse := pmetricotlp.NewExportResponse() switch contentType { case protobufContentType: @@ -355,6 +361,9 @@ func (e *baseExporter) metricsPartialSuccessHandler(protoBytes []byte, contentTy } func (e *baseExporter) logsPartialSuccessHandler(protoBytes []byte, contentType string) error { + if protoBytes == nil { + return nil + } exportResponse := plogotlp.NewExportResponse() switch contentType { case protobufContentType: diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 0116803d816..470a4d872e2 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -37,6 +37,41 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" ) +const tracesTelemetryType = "traces" +const metricsTelemetryType = "metrics" +const logsTelemetryType = "logs" + +type responseSerializer interface { + MarshalJSON() ([]byte, error) + MarshalProto() ([]byte, error) +} + +type responseSerializerProvider = func() responseSerializer + +func provideTracesResponseSerializer() responseSerializer { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + return response +} + +func provideMetricsResponseSerializer() responseSerializer { + response := pmetricotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedDataPoints(1) + return response +} + +func provideLogsResponseSerializer() responseSerializer { + response := plogotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedLogRecords(1) + return response +} + func TestErrorResponses(t *testing.T) { errMsgPrefix := func(srv *httptest.Server) string { return fmt.Sprintf("error exporting items, request to %s/v1/traces responded with HTTP Status Code ", srv.URL) @@ -467,22 +502,66 @@ func TestPartialResponse_missingHeaderButHasBody(t *testing.T) { exp, err := newExporter(cfg, set) require.NoError(t, err) - response := ptraceotlp.NewExportResponse() - partial := response.PartialSuccess() - partial.SetErrorMessage("hello") - partial.SetRejectedSpans(1) - data, err := response.MarshalProto() - require.NoError(t, err) - resp := &http.Response{ - // `-1` indicates a missing Content-Length header in the Go http standard library - ContentLength: -1, - Body: io.NopCloser(bytes.NewReader(data)), - Header: map[string][]string{ - "Content-Type": {"application/x-protobuf"}, + contentTypes := []struct { + contentType string + }{ + {contentType: protobufContentType}, + {contentType: jsonContentType}, + } + + telemetryTypes := []struct { + telemetryType string + handler partialSuccessHandler + serializer responseSerializerProvider + }{ + { + telemetryType: tracesTelemetryType, + handler: exp.tracesPartialSuccessHandler, + serializer: provideTracesResponseSerializer, + }, + { + telemetryType: metricsTelemetryType, + handler: exp.metricsPartialSuccessHandler, + serializer: provideMetricsResponseSerializer, + }, + { + telemetryType: logsTelemetryType, + handler: exp.logsPartialSuccessHandler, + serializer: provideLogsResponseSerializer, }, } - err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) - assert.NoError(t, err) + + for _, ct := range contentTypes { + for _, tt := range telemetryTypes { + t.Run(tt.telemetryType+" "+ct.contentType, func(t *testing.T) { + serializer := tt.serializer() + + var data []byte + var err error + + switch ct.contentType { + case jsonContentType: + data, err = serializer.MarshalJSON() + case protobufContentType: + data, err = serializer.MarshalProto() + default: + require.Fail(t, "unsupported content type: %s", ct.contentType) + } + require.NoError(t, err) + + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader(data)), + Header: map[string][]string{ + "Content-Type": {ct.contentType}, + }, + } + err = handlePartialSuccessResponse(resp, tt.handler) + assert.NoError(t, err) + }) + } + } } func TestPartialResponse_missingHeaderAndBody(t *testing.T) { @@ -491,16 +570,47 @@ func TestPartialResponse_missingHeaderAndBody(t *testing.T) { exp, err := newExporter(cfg, set) require.NoError(t, err) - resp := &http.Response{ - // `-1` indicates a missing Content-Length header in the Go http standard library - ContentLength: -1, - Body: io.NopCloser(bytes.NewReader([]byte{})), - Header: map[string][]string{ - "Content-Type": {"application/x-protobuf"}, + contentTypes := []struct { + contentType string + }{ + {contentType: protobufContentType}, + {contentType: jsonContentType}, + } + + telemetryTypes := []struct { + telemetryType string + handler partialSuccessHandler + }{ + { + telemetryType: tracesTelemetryType, + handler: exp.tracesPartialSuccessHandler, + }, + { + telemetryType: metricsTelemetryType, + handler: exp.metricsPartialSuccessHandler, + }, + { + telemetryType: logsTelemetryType, + handler: exp.logsPartialSuccessHandler, }, } - err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) - assert.Nil(t, err) + + for _, ct := range contentTypes { + for _, tt := range telemetryTypes { + t.Run(tt.telemetryType+" "+ct.contentType, func(t *testing.T) { + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader([]byte{})), + Header: map[string][]string{ + "Content-Type": {ct.contentType}, + }, + } + err = handlePartialSuccessResponse(resp, tt.handler) + assert.Nil(t, err) + }) + } + } } func TestPartialResponse_nonErrUnexpectedEOFError(t *testing.T) { @@ -524,53 +634,147 @@ func TestPartialSuccess_shortContentLengthHeader(t *testing.T) { exp, err := newExporter(cfg, set) require.NoError(t, err) - response := ptraceotlp.NewExportResponse() - partial := response.PartialSuccess() - partial.SetErrorMessage("hello") - partial.SetRejectedSpans(1) - data, err := response.MarshalProto() - require.NoError(t, err) - resp := &http.Response{ - ContentLength: 3, - Body: io.NopCloser(bytes.NewReader(data)), - Header: map[string][]string{ - "Content-Type": {"application/x-protobuf"}, + contentTypes := []struct { + contentType string + }{ + {contentType: protobufContentType}, + {contentType: jsonContentType}, + } + + telemetryTypes := []struct { + telemetryType string + handler partialSuccessHandler + serializer responseSerializerProvider + }{ + { + telemetryType: tracesTelemetryType, + handler: exp.tracesPartialSuccessHandler, + serializer: provideTracesResponseSerializer, + }, + { + telemetryType: metricsTelemetryType, + handler: exp.metricsPartialSuccessHandler, + serializer: provideMetricsResponseSerializer, + }, + { + telemetryType: logsTelemetryType, + handler: exp.logsPartialSuccessHandler, + serializer: provideLogsResponseSerializer, }, } - // For short content-length, a real error happens. - err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) - assert.Error(t, err) + + for _, ct := range contentTypes { + for _, tt := range telemetryTypes { + t.Run(tt.telemetryType+" "+ct.contentType, func(t *testing.T) { + serializer := tt.serializer() + + var data []byte + var err error + + switch ct.contentType { + case jsonContentType: + data, err = serializer.MarshalJSON() + case protobufContentType: + data, err = serializer.MarshalProto() + default: + require.Fail(t, "unsupported content type: %s", ct.contentType) + } + require.NoError(t, err) + + resp := &http.Response{ + ContentLength: 3, + Body: io.NopCloser(bytes.NewReader(data)), + Header: map[string][]string{ + "Content-Type": {ct.contentType}, + }, + } + // For short content-length, a real error happens. + err = handlePartialSuccessResponse(resp, tt.handler) + assert.Error(t, err) + }) + } + } } func TestPartialSuccess_longContentLengthHeader(t *testing.T) { - cfg := createDefaultConfig() - set := exportertest.NewNopCreateSettings() + contentTypes := []struct { + contentType string + }{ + {contentType: protobufContentType}, + {contentType: jsonContentType}, + } - logger, observed := observer.New(zap.DebugLevel) - set.TelemetrySettings.Logger = zap.New(logger) + telemetryTypes := []struct { + telemetryType string + serializer responseSerializerProvider + }{ + { + telemetryType: tracesTelemetryType, + serializer: provideTracesResponseSerializer, + }, + { + telemetryType: metricsTelemetryType, + serializer: provideMetricsResponseSerializer, + }, + { + telemetryType: logsTelemetryType, + serializer: provideLogsResponseSerializer, + }, + } - exp, err := newExporter(cfg, set) - require.NoError(t, err) + for _, ct := range contentTypes { + for _, tt := range telemetryTypes { + t.Run(tt.telemetryType+" "+ct.contentType, func(t *testing.T) { + cfg := createDefaultConfig() + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.DebugLevel) + set.TelemetrySettings.Logger = zap.New(logger) + exp, err := newExporter(cfg, set) + require.NoError(t, err) - response := ptraceotlp.NewExportResponse() - partial := response.PartialSuccess() - partial.SetErrorMessage("hello") - partial.SetRejectedSpans(1) - data, err := response.MarshalProto() - require.NoError(t, err) - resp := &http.Response{ - ContentLength: 4096, - Body: io.NopCloser(bytes.NewReader(data)), - Header: map[string][]string{ - "Content-Type": {"application/x-protobuf"}, - }, + serializer := tt.serializer() + + var handler partialSuccessHandler + + switch tt.telemetryType { + case tracesTelemetryType: + handler = exp.tracesPartialSuccessHandler + case metricsTelemetryType: + handler = exp.metricsPartialSuccessHandler + case logsTelemetryType: + handler = exp.logsPartialSuccessHandler + default: + require.Fail(t, "unsupported telemetry type: %s", ct.contentType) + } + + var data []byte + + switch ct.contentType { + case jsonContentType: + data, err = serializer.MarshalJSON() + case protobufContentType: + data, err = serializer.MarshalProto() + default: + require.Fail(t, "unsupported content type: %s", ct.contentType) + } + require.NoError(t, err) + + resp := &http.Response{ + ContentLength: 4096, + Body: io.NopCloser(bytes.NewReader(data)), + Header: map[string][]string{ + "Content-Type": {ct.contentType}, + }, + } + // No real error happens for long content length, so the partial + // success is handled as success with a warning. + err = handlePartialSuccessResponse(resp, handler) + assert.NoError(t, err) + assert.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1) + assert.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success") + }) + } } - // No real error happens for long content length, so the partial - // success is handled as success with a warning. - err = handlePartialSuccessResponse(resp, exp.tracesPartialSuccessHandler) - assert.NoError(t, err) - assert.Len(t, observed.FilterLevelExact(zap.WarnLevel).All(), 1) - assert.Contains(t, observed.FilterLevelExact(zap.WarnLevel).All()[0].Message, "Partial success") } func TestPartialSuccessInvalidResponseBody(t *testing.T) {