Skip to content

Commit

Permalink
Merge branch 'main' into fix-tempo-cli-arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
faridtmammadov authored Nov 1, 2024
2 parents 2b56810 + 44c18cc commit bbf6c8d
Show file tree
Hide file tree
Showing 23 changed files with 256 additions and 188 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott)
* [ENHANCEMENT] Added `insecure-skip-verify` option in tempo-cli to skip SSL certificate validation when connecting to the S3 backend. [#44236](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
* [ENHANCEMENT] Collection of query-frontend changes to reduce allocs. [#4242]https://github.com/grafana/tempo/pull/4242 (@joe-elliott)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
Expand Down
2 changes: 1 addition & 1 deletion integration/e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func callSearchTagValuesV2AndAssert(t *testing.T, svc *e2e.HTTPService, tagName,
require.Equal(t, expected.TagValues, actualGrpcResp.TagValues)
// assert metrics, and make sure it's non-zero when response is non-empty
if len(grpcResp.TagValues) > 0 {
require.Greater(t, grpcResp.Metrics.InspectedBytes, uint64(100))
require.Greater(t, grpcResp.Metrics.InspectedBytes, uint64(0))
}
}

Expand Down
4 changes: 3 additions & 1 deletion modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"sync"

tempo_io "github.com/grafana/tempo/pkg/io"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
Expand Down Expand Up @@ -90,7 +92,7 @@ func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {

switch res.Header.Get(api.HeaderContentType) {
case api.HeaderAcceptProtobuf:
b, err := io.ReadAll(res.Body)
b, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength)
if err != nil {
return fmt.Errorf("error reading response body")
}
Expand Down
3 changes: 2 additions & 1 deletion modules/frontend/combiner/trace_by_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/api"
tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
)
Expand Down Expand Up @@ -67,7 +68,7 @@ func (c *traceByIDCombiner) AddResponse(r PipelineResponse) error {
}

// Read the body
buff, err := io.ReadAll(res.Body)
buff, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength)
if err != nil {
c.statusMessage = internalErrorMsg
return fmt.Errorf("error reading response body: %w", err)
Expand Down
30 changes: 29 additions & 1 deletion modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR
resp, _, err := resps.Next(req.Context()) // metrics path will only ever have one response

level.Info(logger).Log(
"msg", "search tag response",
"msg", "metrics summary response",
"tenant", tenant,
"path", req.URL.Path,
"err", err)
Expand All @@ -278,6 +278,34 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR
})
}

// cloneRequestforQueriers returns a cloned pipeline.Request from the passed pipeline.Request ready for queriers. The caller is given an opportunity
// to modify the internal http.Request before it is returned using the modHTTP param. If modHTTP is nil, the internal http.Request is returned.
func cloneRequestforQueriers(parent pipeline.Request, tenant string, modHTTP func(*http.Request) (*http.Request, error)) (pipeline.Request, error) {
// first clone the http request with headers nil'ed out. this prevents the headers from being copied saving allocs
// here and especially downstream in the httpgrpc bridge. prepareRequestForQueriers will add the only headers that
// the queriers actually need.
req := parent.HTTPRequest()
saveHeaders := req.Header
req.Header = nil
clonedHTTPReq := req.Clone(req.Context())

req.Header = saveHeaders
clonedHTTPReq.Header = make(http.Header, 2) // cheating here. alloc 2 b/c we know that's how many headers prepareRequestForQueriers will add

// give the caller a chance to modify the internal http request
if modHTTP != nil {
var err error
clonedHTTPReq, err = modHTTP(clonedHTTPReq)
if err != nil {
return nil, err
}
}

prepareRequestForQueriers(clonedHTTPReq, tenant)

return parent.CloneFromHTTPRequest(clonedHTTPReq), nil
}

// prepareRequestForQueriers modifies the request so they will be farmed correctly to the queriers
// - adds the tenant header
// - sets the requesturi (see below for details)
Expand Down
6 changes: 3 additions & 3 deletions modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper
func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds float64, req *tempopb.QueryRangeRequest, resp *tempopb.QueryRangeResponse, err error) {
if resp == nil {
level.Info(logger).Log(
"msg", "query range results - no resp",
"msg", "query range response - no resp",
"tenant", tenantID,
"duration_seconds", durationSeconds,
"error", err)
Expand All @@ -123,7 +123,7 @@ func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds flo

if resp.Metrics == nil {
level.Info(logger).Log(
"msg", "query range results - no metrics",
"msg", "query range response - no metrics",
"tenant", tenantID,
"query", req.Query,
"range_nanos", req.End-req.Start,
Expand All @@ -133,7 +133,7 @@ func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds flo
}

level.Info(logger).Log(
"msg", "query range results",
"msg", "query range response",
"tenant", tenantID,
"query", req.Query,
"range_nanos", req.End-req.Start,
Expand Down
75 changes: 38 additions & 37 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
"net/http"
"time"

"github.com/go-kit/log" //nolint:all deprecated
Expand Down Expand Up @@ -113,7 +114,7 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
cutoff = time.Now().Add(-s.cfg.QueryBackendAfter)
)

generatorReq := s.generatorRequest(ctx, tenantID, pipelineRequest, *req, cutoff)
generatorReq := s.generatorRequest(tenantID, pipelineRequest, *req, cutoff)
reqCh := make(chan pipeline.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics

if generatorReq != nil {
Expand Down Expand Up @@ -243,15 +244,13 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
exemplars = max(uint32(float64(exemplars)*float64(m.TotalRecords)/float64(pages)), 1)
}

for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.HTTPRequest().Clone(ctx)

dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
// errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
continue
}
dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
_ = level.Error(s.logger).Log("msg", "failed to convert dedicated columns in query range sharder. skipping", "block", m.BlockID, "err", err)
continue
}

for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
// Trim and align the request for this block. I.e. if the request is "Last Hour" we don't want to
// cache the response for that, we want only the few minutes time range for this block. This has
// size savings but the main thing is that the response is reuseable for any overlapping query.
Expand All @@ -261,31 +260,34 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
continue
}

queryRangeReq := &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: start,
End: end,
Step: step,
QueryMode: searchReq.QueryMode,
// New RF1 fields
BlockID: m.BlockID.String(),
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Version: m.Version,
Encoding: m.Encoding.String(),
Size_: m.Size_,
FooterSize: m.FooterSize,
// DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json
Exemplars: exemplars,
pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
queryRangeReq := &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: start,
End: end,
Step: step,
QueryMode: searchReq.QueryMode,
// New RF1 fields
BlockID: m.BlockID.String(),
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Version: m.Version,
Encoding: m.Encoding.String(),
Size_: m.Size_,
FooterSize: m.FooterSize,
// DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json
Exemplars: exemplars,
}

return api.BuildQueryRangeRequest(r, queryRangeReq, dedColsJSON), nil
})
if err != nil {
_ = level.Error(s.logger).Log("msg", "failed to cloneRequestForQuerirs in the query range sharder. skipping", "block", m.BlockID, "err", err)
continue
}

subR = api.BuildQueryRangeRequest(subR, queryRangeReq, dedColsJSON)

prepareRequestForQueriers(subR, tenantID)
pipelineR := parent.CloneFromHTTPRequest(subR)

// TODO: Handle sampling rate
key := queryRangeCacheKey(tenantID, queryHash, int64(queryRangeReq.Start), int64(queryRangeReq.End), m, int(queryRangeReq.StartPage), int(queryRangeReq.PagesToSearch))
key := queryRangeCacheKey(tenantID, queryHash, int64(start), int64(end), m, int(step), pages)
if len(key) > 0 {
pipelineR.SetCacheKey(key)
}
Expand All @@ -306,7 +308,7 @@ func max(a, b uint32) uint32 {
return b
}

func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) *pipeline.HTTPRequest {
func (s *queryRangeSharder) generatorRequest(tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) pipeline.Request {
traceql.TrimToAfter(&searchReq, cutoff)
// if start == end then we don't need to query it
if searchReq.Start == searchReq.End {
Expand All @@ -315,12 +317,11 @@ func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID strin

searchReq.QueryMode = querier.QueryModeRecent

subR := parent.HTTPRequest().Clone(ctx)
subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators

prepareRequestForQueriers(subR, tenantID)
subR, _ := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
return api.BuildQueryRangeRequest(r, &searchReq, ""), nil
})

return parent.CloneFromHTTPRequest(subR)
return subR
}

// maxDuration returns the max search duration allowed for this tenant.
Expand Down
12 changes: 9 additions & 3 deletions modules/frontend/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ var tracer = otel.Tracer("modules/frontend/pipeline")
type Request interface {
HTTPRequest() *http.Request
Context() context.Context

WithContext(context.Context)
CloneFromHTTPRequest(request *http.Request) Request

Weight() int
SetWeight(int)
Expand All @@ -23,7 +25,6 @@ type Request interface {

SetResponseData(any) // add data that will be sent back with this requests response
ResponseData() any
CloneFromHTTPRequest(request *http.Request) *HTTPRequest
}

type HTTPRequest struct {
Expand Down Expand Up @@ -78,8 +79,13 @@ func (r *HTTPRequest) SetWeight(w int) {
r.weight = w
}

func (r *HTTPRequest) CloneFromHTTPRequest(request *http.Request) *HTTPRequest {
return &HTTPRequest{req: request, weight: r.weight}
func (r *HTTPRequest) CloneFromHTTPRequest(request *http.Request) Request {
return &HTTPRequest{
req: request,
weight: r.weight,
cacheKey: r.cacheKey,
responseData: r.responseData,
}
}

//
Expand Down
6 changes: 3 additions & 3 deletions modules/frontend/search_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req

if resp == nil {
level.Info(logger).Log(
"msg", "search results - no resp",
"msg", "search response - no resp",
"tenant", tenantID,
"duration_seconds", durationSeconds,
"status_code", statusCode,
Expand All @@ -156,7 +156,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req

if resp.Metrics == nil {
level.Info(logger).Log(
"msg", "search results - no metrics",
"msg", "search response - no metrics",
"tenant", tenantID,
"query", req.Query,
"range_seconds", req.End-req.Start,
Expand All @@ -167,7 +167,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req
}

level.Info(logger).Log(
"msg", "search results",
"msg", "search response",
"tenant", tenantID,
"query", req.Query,
"range_seconds", req.End-req.Start,
Expand Down
Loading

0 comments on commit bbf6c8d

Please sign in to comment.