diff --git a/modules/frontend/combiner/common.go b/modules/frontend/combiner/common.go index 7b73a93ac36..d594092b15d 100644 --- a/modules/frontend/combiner/common.go +++ b/modules/frontend/combiner/common.go @@ -8,6 +8,7 @@ import ( "sync" tempo_io "github.com/grafana/tempo/pkg/io" + "github.com/grafana/tempo/pkg/util" "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" @@ -217,6 +218,9 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) { grpcErr = status.Error(codes.ResourceExhausted, c.httpRespBody) case http.StatusBadRequest: grpcErr = status.Error(codes.InvalidArgument, c.httpRespBody) + case util.StatusClientClosedRequest: + // HTTP 499 is mapped to codes.Canceled grpc error + grpcErr = status.Error(codes.Canceled, c.httpRespBody) default: if c.httpStatusCode/100 == 5 { grpcErr = status.Error(codes.Internal, c.httpRespBody) @@ -226,7 +230,7 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) { } httpResp := &http.Response{ StatusCode: c.httpStatusCode, - Status: http.StatusText(c.httpStatusCode), + Status: util.StatusText(c.httpStatusCode), Body: io.NopCloser(strings.NewReader(c.httpRespBody)), } diff --git a/modules/frontend/combiner/common_test.go b/modules/frontend/combiner/common_test.go index 8bee69c1bb4..c17df1ee09e 100644 --- a/modules/frontend/combiner/common_test.go +++ b/modules/frontend/combiner/common_test.go @@ -13,6 +13,7 @@ import ( "github.com/gogo/status" "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" ) @@ -55,6 +56,17 @@ func TestErroredResponse(t *testing.T) { }, expectedErr: status.Error(codes.InvalidArgument, "foo"), }, + { + name: "499", + statusCode: util.StatusClientClosedRequest, + respBody: "foo", + expectedResp: &http.Response{ + StatusCode: util.StatusClientClosedRequest, + Status: util.StatusTextClientClosedRequest, + Body: io.NopCloser(strings.NewReader("foo")), + }, + expectedErr: status.Error(codes.Canceled, "foo"), + }, } for _, tc := range tests { diff --git a/modules/frontend/handler.go b/modules/frontend/handler.go index 4a9fbe2e3ff..9a8ff0455d1 100644 --- a/modules/frontend/handler.go +++ b/modules/frontend/handler.go @@ -10,6 +10,7 @@ import ( "time" "github.com/grafana/dskit/flagext" + "github.com/grafana/tempo/pkg/util" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -22,14 +23,12 @@ import ( ) const ( - // StatusClientClosedRequest is the status code for when a client request cancellation of an http request - StatusClientClosedRequest = 499 // nil response in ServeHTTP NilResponseError = "nil resp in ServeHTTP" ) var ( - errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error()) + errCanceled = httpgrpc.Errorf(util.StatusClientClosedRequest, context.Canceled.Error()) errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error()) errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large") ) @@ -88,7 +87,7 @@ func (f *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logMessage = append( logMessage, "status", statusCode, - "err", err.Error(), + "error", err.Error(), "response_size", 0, ) level.Info(f.logger).Log(logMessage...) diff --git a/modules/frontend/pipeline/collector_grpc.go b/modules/frontend/pipeline/collector_grpc.go index cd63181f543..2f5b82ac23d 100644 --- a/modules/frontend/pipeline/collector_grpc.go +++ b/modules/frontend/pipeline/collector_grpc.go @@ -2,6 +2,7 @@ package pipeline import ( "context" + "errors" "net/http" "time" @@ -83,5 +84,11 @@ func grpcError(err error) error { return err } + // if this is context cancelled, we return a grpc cancelled error + if errors.Is(err, context.Canceled) { + return status.Error(codes.Canceled, err.Error()) + } + + // rest all fall into internal server error return status.Error(codes.Internal, err.Error()) } diff --git a/modules/frontend/search_handlers_test.go b/modules/frontend/search_handlers_test.go index 05766eb78ba..56c719237d6 100644 --- a/modules/frontend/search_handlers_test.go +++ b/modules/frontend/search_handlers_test.go @@ -295,7 +295,7 @@ func runnerClientCancelContext(t *testing.T, f *QueryFrontend) { }() grpcReq := &tempopb.SearchRequest{} err := f.streamingSearch(grpcReq, srv) - require.Equal(t, status.Error(codes.Internal, "context canceled"), err) + require.Equal(t, status.Error(codes.Canceled, "context canceled"), err) } func TestSearchLimitHonored(t *testing.T) { diff --git a/modules/frontend/slos.go b/modules/frontend/slos.go index e2e7c8cc918..f038bb02a84 100644 --- a/modules/frontend/slos.go +++ b/modules/frontend/slos.go @@ -1,10 +1,13 @@ package frontend import ( + "context" + "errors" "net/http" "time" "github.com/gogo/status" + "github.com/grafana/tempo/pkg/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc/codes" @@ -89,11 +92,15 @@ func sloHook(allByTenantCounter, withinSLOByTenantCounter *prometheus.CounterVec // most errors are SLO violations if err != nil { - // however, if this is a grpc resource exhausted error (429) or invalid argument (400) then we are within SLO + // However, gRPC resource exhausted error (429), invalid argument (400), not found (404) and + // request cancellations are considered within the SLO. switch status.Code(err) { - case codes.ResourceExhausted, - codes.InvalidArgument, - codes.NotFound: + case codes.ResourceExhausted, codes.InvalidArgument, codes.NotFound, codes.Canceled: + withinSLOByTenantCounter.WithLabelValues(tenant).Inc() + } + + // we don't always get a gRPC codes.Canceled status code, so check for context.Canceled and http 499 as well + if errors.Is(err, context.Canceled) || (resp != nil && resp.StatusCode == util.StatusClientClosedRequest) { withinSLOByTenantCounter.WithLabelValues(tenant).Inc() } return diff --git a/modules/frontend/slos_test.go b/modules/frontend/slos_test.go index c88481d73cc..f24b5bb201c 100644 --- a/modules/frontend/slos_test.go +++ b/modules/frontend/slos_test.go @@ -1,6 +1,7 @@ package frontend import ( + "context" "errors" "io" "net/http" @@ -8,6 +9,7 @@ import ( "testing" "time" + "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -165,3 +167,101 @@ func TestBadRequest(t *testing.T) { require.Equal(t, 1.0, actualAll) require.Equal(t, 1.0, actualSLO) } + +func TestCanceledRequest(t *testing.T) { + tests := []struct { + name string + statusCode int + err error + withInSLO bool + }{ + { + name: "random error with http response has 499 status code", + statusCode: util.StatusClientClosedRequest, + err: errors.New("foo"), + withInSLO: true, + }, + { + name: "context.Canceled error with http response has 499 status code", + statusCode: util.StatusClientClosedRequest, + err: context.Canceled, + withInSLO: true, + }, + { + name: "context.Canceled error with 500 status code", + statusCode: http.StatusInternalServerError, + err: context.Canceled, + withInSLO: true, + }, + { + name: "context.Canceled error with 200 status code", + statusCode: http.StatusOK, + err: context.Canceled, + withInSLO: true, + }, + { + name: "grpc codes.Canceled error with 500 status code", + statusCode: http.StatusInternalServerError, + err: status.Error(codes.Canceled, "foo"), + withInSLO: true, + }, + { + name: "grpc codes.Canceled error with 200 status code", + statusCode: http.StatusOK, + err: status.Error(codes.Canceled, "foo"), + withInSLO: true, + }, + { + name: "no error with 200 status code", + statusCode: http.StatusOK, + err: nil, + withInSLO: false, + }, + { + name: "no error with 500 status code", + statusCode: http.StatusInternalServerError, + err: nil, + withInSLO: false, + }, + { + name: "no error with http response has 499 status code", + statusCode: util.StatusClientClosedRequest, + err: nil, + withInSLO: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + allCounter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "all"}, []string{"tenant"}) + sloCounter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "slo"}, []string{"tenant"}) + throughputVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: "throughput"}, []string{"tenant"}) + + hook := sloHook(allCounter, sloCounter, throughputVec, SLOConfig{ + DurationSLO: 10 * time.Second, + ThroughputBytesSLO: 100, + }) + + res := &http.Response{ + StatusCode: tt.statusCode, + Status: "context canceled", + Body: io.NopCloser(strings.NewReader("foo")), + } + + // latency is below DurationSLO threshold + hook(res, "tenant", 0, 15*time.Second, tt.err) + + actualAll, err := test.GetCounterValue(allCounter.WithLabelValues("tenant")) + require.NoError(t, err) + require.Equal(t, 1.0, actualAll) + + actualSLO, err := test.GetCounterValue(sloCounter.WithLabelValues("tenant")) + require.NoError(t, err) + if tt.withInSLO { + require.Equal(t, 1.0, actualSLO) + } else { + require.Equal(t, 0.0, actualSLO) + } + }) + } +} diff --git a/modules/frontend/tag_handlers_test.go b/modules/frontend/tag_handlers_test.go index 23f46147f83..77915e55b72 100644 --- a/modules/frontend/tag_handlers_test.go +++ b/modules/frontend/tag_handlers_test.go @@ -128,7 +128,7 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) { }() grpcReq := &tempopb.SearchTagsRequest{} err := f.streamingTagsV2(grpcReq, srv) - require.Equal(t, status.Error(codes.Internal, "context canceled"), err) + require.Equal(t, status.Error(codes.Canceled, "context canceled"), err) } func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) { @@ -160,7 +160,7 @@ func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) { TagName: "foo", } err := f.streamingTagValuesV2(grpcReq, srv) - require.Equal(t, status.Error(codes.Internal, "context canceled"), err) + require.Equal(t, status.Error(codes.Canceled, "context canceled"), err) } // todo: a lot of code is replicated between all of these "failure propagates from queriers" tests. we should refactor diff --git a/pkg/util/errors.go b/pkg/util/errors.go index 626325ac51c..56c041ab0a4 100644 --- a/pkg/util/errors.go +++ b/pkg/util/errors.go @@ -3,34 +3,26 @@ package util import ( "errors" "fmt" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "net/http" ) var ( // ErrTraceNotFound can be used when we don't find a trace ErrTraceNotFound = errors.New("trace not found") - // ErrSearchKeyValueNotFound is used to indicate the requested key/value pair was not found. - ErrSearchKeyValueNotFound = errors.New("key/value not found") + // StatusClientClosedRequest is the status code for when a client request cancellation of an http request + StatusClientClosedRequest = 499 + StatusTextClientClosedRequest = "Request Cancelled" ErrUnsupported = fmt.Errorf("unsupported") ) -// IsConnCanceled returns true, if error is from a closed gRPC connection. -// copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646 -func IsConnCanceled(err error) bool { - if err == nil { - return false - } - - // >= gRPC v1.23.x - s, ok := status.FromError(err) - if ok { - // connection is canceled or server has already closed the connection - return s.Code() == codes.Canceled || s.Message() == "transport is closing" +func StatusText(code int) string { + switch code { + case StatusClientClosedRequest: + // 499 doesn't have status text in http package, so we define it here + return StatusTextClientClosedRequest + default: + return http.StatusText(code) } - - return false }