Skip to content

Commit

Permalink
slo: include all request cancellations within SLO
Browse files Browse the repository at this point in the history
  • Loading branch information
electron0zero committed Nov 20, 2024
1 parent e6d1e07 commit 9484702
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 31 deletions.
6 changes: 5 additions & 1 deletion modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/pkg/util"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -217,6 +218,9 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) {
grpcErr = status.Error(codes.ResourceExhausted, c.httpRespBody)
case http.StatusBadRequest:
grpcErr = status.Error(codes.InvalidArgument, c.httpRespBody)
case util.StatusClientClosedRequest:
// HTTP 499 is mapped to codes.Canceled grpc error
grpcErr = status.Error(codes.Canceled, c.httpRespBody)
default:
if c.httpStatusCode/100 == 5 {
grpcErr = status.Error(codes.Internal, c.httpRespBody)
Expand All @@ -226,7 +230,7 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) {
}
httpResp := &http.Response{
StatusCode: c.httpStatusCode,
Status: http.StatusText(c.httpStatusCode),
Status: util.StatusText(c.httpStatusCode),
Body: io.NopCloser(strings.NewReader(c.httpRespBody)),
}

Expand Down
12 changes: 12 additions & 0 deletions modules/frontend/combiner/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gogo/status"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
)
Expand Down Expand Up @@ -55,6 +56,17 @@ func TestErroredResponse(t *testing.T) {
},
expectedErr: status.Error(codes.InvalidArgument, "foo"),
},
{
name: "499",
statusCode: util.StatusClientClosedRequest,
respBody: "foo",
expectedResp: &http.Response{
StatusCode: util.StatusClientClosedRequest,
Status: util.StatusTextClientClosedRequest,
Body: io.NopCloser(strings.NewReader("foo")),
},
expectedErr: status.Error(codes.Canceled, "foo"),
},
}

for _, tc := range tests {
Expand Down
7 changes: 3 additions & 4 deletions modules/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/grafana/dskit/flagext"
"github.com/grafana/tempo/pkg/util"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand All @@ -22,14 +23,12 @@ import (
)

const (
// StatusClientClosedRequest is the status code for when a client request cancellation of an http request
StatusClientClosedRequest = 499
// nil response in ServeHTTP
NilResponseError = "nil resp in ServeHTTP"
)

var (
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
errCanceled = httpgrpc.Errorf(util.StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
)
Expand Down Expand Up @@ -88,7 +87,7 @@ func (f *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logMessage = append(
logMessage,
"status", statusCode,
"err", err.Error(),
"error", err.Error(),
"response_size", 0,
)
level.Info(f.logger).Log(logMessage...)
Expand Down
7 changes: 7 additions & 0 deletions modules/frontend/pipeline/collector_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pipeline

import (
"context"
"errors"
"net/http"
"time"

Expand Down Expand Up @@ -83,5 +84,11 @@ func grpcError(err error) error {
return err
}

// if this is context cancelled, we return a grpc cancelled error
if errors.Is(err, context.Canceled) {
return status.Error(codes.Canceled, err.Error())
}

// rest all fall into internal server error
return status.Error(codes.Internal, err.Error())
}
2 changes: 1 addition & 1 deletion modules/frontend/search_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func runnerClientCancelContext(t *testing.T, f *QueryFrontend) {
}()
grpcReq := &tempopb.SearchRequest{}
err := f.streamingSearch(grpcReq, srv)
require.Equal(t, status.Error(codes.Internal, "context canceled"), err)
require.Equal(t, status.Error(codes.Canceled, "context canceled"), err)
}

func TestSearchLimitHonored(t *testing.T) {
Expand Down
15 changes: 11 additions & 4 deletions modules/frontend/slos.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package frontend

import (
"context"
"errors"
"net/http"
"time"

"github.com/gogo/status"
"github.com/grafana/tempo/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -89,11 +92,15 @@ func sloHook(allByTenantCounter, withinSLOByTenantCounter *prometheus.CounterVec

// most errors are SLO violations
if err != nil {
// however, if this is a grpc resource exhausted error (429) or invalid argument (400) then we are within SLO
// However, gRPC resource exhausted error (429), invalid argument (400), not found (404) and
// request cancellations are considered within the SLO.
switch status.Code(err) {
case codes.ResourceExhausted,
codes.InvalidArgument,
codes.NotFound:
case codes.ResourceExhausted, codes.InvalidArgument, codes.NotFound, codes.Canceled:
withinSLOByTenantCounter.WithLabelValues(tenant).Inc()
}

// we don't always get a gRPC codes.Canceled status code, so check for context.Canceled and http 499 as well
if errors.Is(err, context.Canceled) || (resp != nil && resp.StatusCode == util.StatusClientClosedRequest) {
withinSLOByTenantCounter.WithLabelValues(tenant).Inc()
}
return
Expand Down
100 changes: 100 additions & 0 deletions modules/frontend/slos_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package frontend

import (
"context"
"errors"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -165,3 +167,101 @@ func TestBadRequest(t *testing.T) {
require.Equal(t, 1.0, actualAll)
require.Equal(t, 1.0, actualSLO)
}

func TestCanceledRequest(t *testing.T) {
tests := []struct {
name string
statusCode int
err error
withInSLO bool
}{
{
name: "random error with http response has 499 status code",
statusCode: util.StatusClientClosedRequest,
err: errors.New("foo"),
withInSLO: true,
},
{
name: "context.Canceled error with http response has 499 status code",
statusCode: util.StatusClientClosedRequest,
err: context.Canceled,
withInSLO: true,
},
{
name: "context.Canceled error with 500 status code",
statusCode: http.StatusInternalServerError,
err: context.Canceled,
withInSLO: true,
},
{
name: "context.Canceled error with 200 status code",
statusCode: http.StatusOK,
err: context.Canceled,
withInSLO: true,
},
{
name: "grpc codes.Canceled error with 500 status code",
statusCode: http.StatusInternalServerError,
err: status.Error(codes.Canceled, "foo"),
withInSLO: true,
},
{
name: "grpc codes.Canceled error with 200 status code",
statusCode: http.StatusOK,
err: status.Error(codes.Canceled, "foo"),
withInSLO: true,
},
{
name: "no error with 200 status code",
statusCode: http.StatusOK,
err: nil,
withInSLO: false,
},
{
name: "no error with 500 status code",
statusCode: http.StatusInternalServerError,
err: nil,
withInSLO: false,
},
{
name: "no error with http response has 499 status code",
statusCode: util.StatusClientClosedRequest,
err: nil,
withInSLO: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
allCounter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "all"}, []string{"tenant"})
sloCounter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "slo"}, []string{"tenant"})
throughputVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: "throughput"}, []string{"tenant"})

hook := sloHook(allCounter, sloCounter, throughputVec, SLOConfig{
DurationSLO: 10 * time.Second,
ThroughputBytesSLO: 100,
})

res := &http.Response{
StatusCode: tt.statusCode,
Status: "context canceled",
Body: io.NopCloser(strings.NewReader("foo")),
}

// latency is below DurationSLO threshold
hook(res, "tenant", 0, 15*time.Second, tt.err)

actualAll, err := test.GetCounterValue(allCounter.WithLabelValues("tenant"))
require.NoError(t, err)
require.Equal(t, 1.0, actualAll)

actualSLO, err := test.GetCounterValue(sloCounter.WithLabelValues("tenant"))
require.NoError(t, err)
if tt.withInSLO {
require.Equal(t, 1.0, actualSLO)
} else {
require.Equal(t, 0.0, actualSLO)
}
})
}
}
4 changes: 2 additions & 2 deletions modules/frontend/tag_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
}()
grpcReq := &tempopb.SearchTagsRequest{}
err := f.streamingTagsV2(grpcReq, srv)
require.Equal(t, status.Error(codes.Internal, "context canceled"), err)
require.Equal(t, status.Error(codes.Canceled, "context canceled"), err)
}

func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
Expand Down Expand Up @@ -160,7 +160,7 @@ func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
TagName: "foo",
}
err := f.streamingTagValuesV2(grpcReq, srv)
require.Equal(t, status.Error(codes.Internal, "context canceled"), err)
require.Equal(t, status.Error(codes.Canceled, "context canceled"), err)
}

// todo: a lot of code is replicated between all of these "failure propagates from queriers" tests. we should refactor
Expand Down
30 changes: 11 additions & 19 deletions pkg/util/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,26 @@ package util
import (
"errors"
"fmt"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"net/http"
)

var (
// ErrTraceNotFound can be used when we don't find a trace
ErrTraceNotFound = errors.New("trace not found")

// ErrSearchKeyValueNotFound is used to indicate the requested key/value pair was not found.
ErrSearchKeyValueNotFound = errors.New("key/value not found")
// StatusClientClosedRequest is the status code for when a client request cancellation of an http request
StatusClientClosedRequest = 499
StatusTextClientClosedRequest = "Request Cancelled"

ErrUnsupported = fmt.Errorf("unsupported")
)

// IsConnCanceled returns true, if error is from a closed gRPC connection.
// copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646
func IsConnCanceled(err error) bool {
if err == nil {
return false
}

// >= gRPC v1.23.x
s, ok := status.FromError(err)
if ok {
// connection is canceled or server has already closed the connection
return s.Code() == codes.Canceled || s.Message() == "transport is closing"
func StatusText(code int) string {
switch code {
case StatusClientClosedRequest:
// 499 doesn't have status text in http package, so we define it here
return StatusTextClientClosedRequest
default:
return http.StatusText(code)
}

return false
}

0 comments on commit 9484702

Please sign in to comment.