diff --git a/CHANGELOG.md b/CHANGELOG.md index 844843d0892..3da92c445ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ * [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero) * [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott) * [ENHANCEMENT] Added `insecure-skip-verify` option in tempo-cli to skip SSL certificate validation when connecting to the S3 backend. [#44236](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov) +* [ENHANCEMENT] Collection of query-frontend changes to reduce allocs. [#4242]https://github.com/grafana/tempo/pull/4242 (@joe-elliott) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott) * [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno) diff --git a/integration/e2e/api_test.go b/integration/e2e/api_test.go index 7633f0e74bd..aa8497491a8 100644 --- a/integration/e2e/api_test.go +++ b/integration/e2e/api_test.go @@ -627,7 +627,7 @@ func callSearchTagValuesV2AndAssert(t *testing.T, svc *e2e.HTTPService, tagName, require.Equal(t, expected.TagValues, actualGrpcResp.TagValues) // assert metrics, and make sure it's non-zero when response is non-empty if len(grpcResp.TagValues) > 0 { - require.Greater(t, grpcResp.Metrics.InspectedBytes, uint64(100)) + require.Greater(t, grpcResp.Metrics.InspectedBytes, uint64(0)) } } diff --git a/modules/frontend/combiner/common.go b/modules/frontend/combiner/common.go index 8e687406675..7b73a93ac36 100644 --- a/modules/frontend/combiner/common.go +++ b/modules/frontend/combiner/common.go @@ -7,6 +7,8 @@ import ( "strings" "sync" + tempo_io "github.com/grafana/tempo/pkg/io" + "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" "github.com/gogo/status" @@ -90,7 +92,7 @@ func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error { switch res.Header.Get(api.HeaderContentType) { case api.HeaderAcceptProtobuf: - b, err := io.ReadAll(res.Body) + b, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength) if err != nil { return fmt.Errorf("error reading response body") } diff --git a/modules/frontend/combiner/trace_by_id.go b/modules/frontend/combiner/trace_by_id.go index dfdd886f431..27ba0948c38 100644 --- a/modules/frontend/combiner/trace_by_id.go +++ b/modules/frontend/combiner/trace_by_id.go @@ -11,6 +11,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/grafana/tempo/pkg/api" + tempo_io "github.com/grafana/tempo/pkg/io" "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempopb" ) @@ -67,7 +68,7 @@ func (c *traceByIDCombiner) AddResponse(r PipelineResponse) error { } // Read the body - buff, err := io.ReadAll(res.Body) + buff, err := tempo_io.ReadAllWithEstimate(res.Body, res.ContentLength) if err != nil { c.statusMessage = internalErrorMsg return fmt.Errorf("error reading response body: %w", err) diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index d4f73276f38..f7d856030c8 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -269,7 +269,7 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR resp, _, err := resps.Next(req.Context()) // metrics path will only ever have one response level.Info(logger).Log( - "msg", "search tag response", + "msg", "metrics summary response", "tenant", tenant, "path", req.URL.Path, "err", err) @@ -278,6 +278,34 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR }) } +// cloneRequestforQueriers returns a cloned pipeline.Request from the passed pipeline.Request ready for queriers. The caller is given an opportunity +// to modify the internal http.Request before it is returned using the modHTTP param. If modHTTP is nil, the internal http.Request is returned. +func cloneRequestforQueriers(parent pipeline.Request, tenant string, modHTTP func(*http.Request) (*http.Request, error)) (pipeline.Request, error) { + // first clone the http request with headers nil'ed out. this prevents the headers from being copied saving allocs + // here and especially downstream in the httpgrpc bridge. prepareRequestForQueriers will add the only headers that + // the queriers actually need. + req := parent.HTTPRequest() + saveHeaders := req.Header + req.Header = nil + clonedHTTPReq := req.Clone(req.Context()) + + req.Header = saveHeaders + clonedHTTPReq.Header = make(http.Header, 2) // cheating here. alloc 2 b/c we know that's how many headers prepareRequestForQueriers will add + + // give the caller a chance to modify the internal http request + if modHTTP != nil { + var err error + clonedHTTPReq, err = modHTTP(clonedHTTPReq) + if err != nil { + return nil, err + } + } + + prepareRequestForQueriers(clonedHTTPReq, tenant) + + return parent.CloneFromHTTPRequest(clonedHTTPReq), nil +} + // prepareRequestForQueriers modifies the request so they will be farmed correctly to the queriers // - adds the tenant header // - sets the requesturi (see below for details) diff --git a/modules/frontend/metrics_query_range_handler.go b/modules/frontend/metrics_query_range_handler.go index 4c0ed89afe1..54ffc29ff67 100644 --- a/modules/frontend/metrics_query_range_handler.go +++ b/modules/frontend/metrics_query_range_handler.go @@ -113,7 +113,7 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds float64, req *tempopb.QueryRangeRequest, resp *tempopb.QueryRangeResponse, err error) { if resp == nil { level.Info(logger).Log( - "msg", "query range results - no resp", + "msg", "query range response - no resp", "tenant", tenantID, "duration_seconds", durationSeconds, "error", err) @@ -123,7 +123,7 @@ func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds flo if resp.Metrics == nil { level.Info(logger).Log( - "msg", "query range results - no metrics", + "msg", "query range response - no metrics", "tenant", tenantID, "query", req.Query, "range_nanos", req.End-req.Start, @@ -133,7 +133,7 @@ func logQueryRangeResult(logger log.Logger, tenantID string, durationSeconds flo } level.Info(logger).Log( - "msg", "query range results", + "msg", "query range response", "tenant", tenantID, "query", req.Query, "range_nanos", req.End-req.Start, diff --git a/modules/frontend/metrics_query_range_sharder.go b/modules/frontend/metrics_query_range_sharder.go index 76c66500de4..bfffbf21bf7 100644 --- a/modules/frontend/metrics_query_range_sharder.go +++ b/modules/frontend/metrics_query_range_sharder.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "net/http" "time" "github.com/go-kit/log" //nolint:all deprecated @@ -113,7 +114,7 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline cutoff = time.Now().Add(-s.cfg.QueryBackendAfter) ) - generatorReq := s.generatorRequest(ctx, tenantID, pipelineRequest, *req, cutoff) + generatorReq := s.generatorRequest(tenantID, pipelineRequest, *req, cutoff) reqCh := make(chan pipeline.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics if generatorReq != nil { @@ -243,15 +244,13 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s exemplars = max(uint32(float64(exemplars)*float64(m.TotalRecords)/float64(pages)), 1) } - for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { - subR := parent.HTTPRequest().Clone(ctx) - - dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) - if err != nil { - // errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) - continue - } + dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) + if err != nil { + _ = level.Error(s.logger).Log("msg", "failed to convert dedicated columns in query range sharder. skipping", "block", m.BlockID, "err", err) + continue + } + for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { // Trim and align the request for this block. I.e. if the request is "Last Hour" we don't want to // cache the response for that, we want only the few minutes time range for this block. This has // size savings but the main thing is that the response is reuseable for any overlapping query. @@ -261,31 +260,34 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s continue } - queryRangeReq := &tempopb.QueryRangeRequest{ - Query: searchReq.Query, - Start: start, - End: end, - Step: step, - QueryMode: searchReq.QueryMode, - // New RF1 fields - BlockID: m.BlockID.String(), - StartPage: uint32(startPage), - PagesToSearch: uint32(pages), - Version: m.Version, - Encoding: m.Encoding.String(), - Size_: m.Size_, - FooterSize: m.FooterSize, - // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json - Exemplars: exemplars, + pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + queryRangeReq := &tempopb.QueryRangeRequest{ + Query: searchReq.Query, + Start: start, + End: end, + Step: step, + QueryMode: searchReq.QueryMode, + // New RF1 fields + BlockID: m.BlockID.String(), + StartPage: uint32(startPage), + PagesToSearch: uint32(pages), + Version: m.Version, + Encoding: m.Encoding.String(), + Size_: m.Size_, + FooterSize: m.FooterSize, + // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json + Exemplars: exemplars, + } + + return api.BuildQueryRangeRequest(r, queryRangeReq, dedColsJSON), nil + }) + if err != nil { + _ = level.Error(s.logger).Log("msg", "failed to cloneRequestForQuerirs in the query range sharder. skipping", "block", m.BlockID, "err", err) + continue } - subR = api.BuildQueryRangeRequest(subR, queryRangeReq, dedColsJSON) - - prepareRequestForQueriers(subR, tenantID) - pipelineR := parent.CloneFromHTTPRequest(subR) - // TODO: Handle sampling rate - key := queryRangeCacheKey(tenantID, queryHash, int64(queryRangeReq.Start), int64(queryRangeReq.End), m, int(queryRangeReq.StartPage), int(queryRangeReq.PagesToSearch)) + key := queryRangeCacheKey(tenantID, queryHash, int64(start), int64(end), m, int(step), pages) if len(key) > 0 { pipelineR.SetCacheKey(key) } @@ -306,7 +308,7 @@ func max(a, b uint32) uint32 { return b } -func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) *pipeline.HTTPRequest { +func (s *queryRangeSharder) generatorRequest(tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) pipeline.Request { traceql.TrimToAfter(&searchReq, cutoff) // if start == end then we don't need to query it if searchReq.Start == searchReq.End { @@ -315,12 +317,11 @@ func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID strin searchReq.QueryMode = querier.QueryModeRecent - subR := parent.HTTPRequest().Clone(ctx) - subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators - - prepareRequestForQueriers(subR, tenantID) + subR, _ := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + return api.BuildQueryRangeRequest(r, &searchReq, ""), nil + }) - return parent.CloneFromHTTPRequest(subR) + return subR } // maxDuration returns the max search duration allowed for this tenant. diff --git a/modules/frontend/pipeline/pipeline.go b/modules/frontend/pipeline/pipeline.go index 920cd4eb203..0dc9bae839e 100644 --- a/modules/frontend/pipeline/pipeline.go +++ b/modules/frontend/pipeline/pipeline.go @@ -13,7 +13,9 @@ var tracer = otel.Tracer("modules/frontend/pipeline") type Request interface { HTTPRequest() *http.Request Context() context.Context + WithContext(context.Context) + CloneFromHTTPRequest(request *http.Request) Request Weight() int SetWeight(int) @@ -23,7 +25,6 @@ type Request interface { SetResponseData(any) // add data that will be sent back with this requests response ResponseData() any - CloneFromHTTPRequest(request *http.Request) *HTTPRequest } type HTTPRequest struct { @@ -78,8 +79,13 @@ func (r *HTTPRequest) SetWeight(w int) { r.weight = w } -func (r *HTTPRequest) CloneFromHTTPRequest(request *http.Request) *HTTPRequest { - return &HTTPRequest{req: request, weight: r.weight} +func (r *HTTPRequest) CloneFromHTTPRequest(request *http.Request) Request { + return &HTTPRequest{ + req: request, + weight: r.weight, + cacheKey: r.cacheKey, + responseData: r.responseData, + } } // diff --git a/modules/frontend/search_handlers.go b/modules/frontend/search_handlers.go index 35875832016..1675271a27f 100644 --- a/modules/frontend/search_handlers.go +++ b/modules/frontend/search_handlers.go @@ -145,7 +145,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req if resp == nil { level.Info(logger).Log( - "msg", "search results - no resp", + "msg", "search response - no resp", "tenant", tenantID, "duration_seconds", durationSeconds, "status_code", statusCode, @@ -156,7 +156,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req if resp.Metrics == nil { level.Info(logger).Log( - "msg", "search results - no metrics", + "msg", "search response - no metrics", "tenant", tenantID, "query", req.Query, "range_seconds", req.End-req.Start, @@ -167,7 +167,7 @@ func logResult(logger log.Logger, tenantID string, durationSeconds float64, req } level.Info(logger).Log( - "msg", "search results", + "msg", "search response", "tenant", tenantID, "query", req.Query, "range_seconds", req.End-req.Start, diff --git a/modules/frontend/search_sharder.go b/modules/frontend/search_sharder.go index a8e1df35ec8..ddad508d30f 100644 --- a/modules/frontend/search_sharder.go +++ b/modules/frontend/search_sharder.go @@ -3,6 +3,7 @@ package frontend import ( "context" "fmt" + "net/http" "time" "github.com/go-kit/log" //nolint:all deprecated @@ -95,7 +96,7 @@ func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipelin // build request to search ingesters based on query_ingesters_until config and time range // pass subCtx in requests so we can cancel and exit early - err = s.ingesterRequests(ctx, tenantID, pipelineRequest, *searchReq, reqCh) + err = s.ingesterRequests(tenantID, pipelineRequest, *searchReq, reqCh) if err != nil { return nil, err } @@ -199,10 +200,10 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin // that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query. // since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from // unexpectedly changing the passed searchReq. -func (s *asyncSearchSharder) ingesterRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) error { +func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) error { // request without start or end, search only in ingester if searchReq.Start == 0 || searchReq.End == 0 { - return buildIngesterRequest(ctx, tenantID, parent, &searchReq, reqCh) + return buildIngesterRequest(tenantID, parent, &searchReq, reqCh) } ingesterUntil := uint32(time.Now().Add(-s.cfg.QueryIngestersUntil).Unix()) @@ -257,7 +258,7 @@ func (s *asyncSearchSharder) ingesterRequests(ctx context.Context, tenantID stri subReq.Start = shardStart subReq.End = shardEnd - err := buildIngesterRequest(ctx, tenantID, parent, &subReq, reqCh) + err := buildIngesterRequest(tenantID, parent, &subReq, reqCh) if err != nil { return err } @@ -310,36 +311,37 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent pipeline. } blockID := m.BlockID.String() - for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { - subR := parent.HTTPRequest().Clone(ctx) - dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) - if err != nil { - errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) - continue - } + dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) + if err != nil { + errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) + continue + } - subR, err = api.BuildSearchBlockRequest(subR, &tempopb.SearchBlockRequest{ - BlockID: blockID, - StartPage: uint32(startPage), - PagesToSearch: uint32(pages), - Encoding: m.Encoding.String(), - IndexPageSize: m.IndexPageSize, - TotalRecords: m.TotalRecords, - DataEncoding: m.DataEncoding, - Version: m.Version, - Size_: m.Size_, - FooterSize: m.FooterSize, - // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json - }, dedColsJSON) + for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { + pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + r, err = api.BuildSearchBlockRequest(r, &tempopb.SearchBlockRequest{ + BlockID: blockID, + StartPage: uint32(startPage), + PagesToSearch: uint32(pages), + Encoding: m.Encoding.String(), + IndexPageSize: m.IndexPageSize, + TotalRecords: m.TotalRecords, + DataEncoding: m.DataEncoding, + Version: m.Version, + Size_: m.Size_, + FooterSize: m.FooterSize, + // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json + }, dedColsJSON) + + return r, err + }) if err != nil { errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err)) continue } - prepareRequestForQueriers(subR, tenantID) key := searchJobCacheKey(tenantID, queryHash, int64(searchReq.Start), int64(searchReq.End), m, startPage, pages) - pipelineR := parent.CloneFromHTTPRequest(subR) pipelineR.SetCacheKey(key) select { @@ -396,14 +398,14 @@ func pagesPerRequest(m *backend.BlockMeta, bytesPerRequest int) int { return pagesPerQuery } -func buildIngesterRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, reqCh chan pipeline.Request) error { - subR := parent.HTTPRequest().Clone(ctx) - subR, err := api.BuildSearchRequest(subR, searchReq) +func buildIngesterRequest(tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, reqCh chan pipeline.Request) error { + subR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + return api.BuildSearchRequest(r, searchReq) + }) if err != nil { return err } - prepareRequestForQueriers(subR, tenantID) - reqCh <- parent.CloneFromHTTPRequest(subR) + reqCh <- subR return nil } diff --git a/modules/frontend/search_sharder_test.go b/modules/frontend/search_sharder_test.go index ed8eb3abe6f..3a8730d083c 100644 --- a/modules/frontend/search_sharder_test.go +++ b/modules/frontend/search_sharder_test.go @@ -494,7 +494,7 @@ func TestIngesterRequests(t *testing.T) { pr := pipeline.NewHTTPRequest(req) pr.SetWeight(2) - err = s.ingesterRequests(context.Background(), "test", pr, *searchReq, reqChan) + err = s.ingesterRequests("test", pr, *searchReq, reqChan) if tc.expectedError != nil { assert.Equal(t, tc.expectedError, err) continue diff --git a/modules/frontend/tag_handlers.go b/modules/frontend/tag_handlers.go index b61bc4beb8f..404009a48ca 100644 --- a/modules/frontend/tag_handlers.go +++ b/modules/frontend/tag_handlers.go @@ -45,7 +45,6 @@ func newTagsStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[com if err != nil { return err } - prepareRequestForQueriers(httpReq, tenant) var finalResponse *tempopb.SearchTagsResponse comb := combiner.NewTypedSearchTags(o.MaxBytesPerTagValuesQuery(tenant)) @@ -79,7 +78,6 @@ func newTagsV2StreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[c if err != nil { return err } - prepareRequestForQueriers(httpReq, tenant) var finalResponse *tempopb.SearchTagsV2Response comb := combiner.NewTypedSearchTagsV2(o.MaxBytesPerTagValuesQuery(tenant)) @@ -117,7 +115,6 @@ func newTagValuesStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTrippe if err != nil { return err } - prepareRequestForQueriers(httpReq, tenant) var finalResponse *tempopb.SearchTagValuesResponse comb := combiner.NewTypedSearchTagValues(o.MaxBytesPerTagValuesQuery(tenant)) @@ -155,7 +152,6 @@ func newTagValuesV2StreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTrip if err != nil { return err } - prepareRequestForQueriers(httpReq, tenant) var finalResponse *tempopb.SearchTagValuesV2Response comb := combiner.NewTypedSearchTagValuesV2(o.MaxBytesPerTagValuesQuery(tenant)) @@ -394,7 +390,7 @@ func logTagsRequest(logger log.Logger, tenantID, handler, scope string, rangeSec func logTagsResult(logger log.Logger, tenantID, handler, scope string, rangeSeconds uint32, durationSeconds float64, inspectedBytes uint64, err error) { level.Info(logger).Log( - "msg", "search tag results", + "msg", "search tag response", "tenant", tenantID, "handler", handler, "scope", scope, @@ -417,7 +413,7 @@ func logTagValuesRequest(logger log.Logger, tenantID, handler, tagName, query st func logTagValuesResult(logger log.Logger, tenantID, handler, tagName, query string, rangeSeconds uint32, durationSeconds float64, inspectedBytes uint64, err error) { level.Info(logger).Log( - "msg", "search tag values results", + "msg", "search tag values response", "tenant", tenantID, "handler", handler, "tag", tagName, diff --git a/modules/frontend/tag_sharder.go b/modules/frontend/tag_sharder.go index b3862ba889d..55f1f1e7942 100644 --- a/modules/frontend/tag_sharder.go +++ b/modules/frontend/tag_sharder.go @@ -190,9 +190,9 @@ func newAsyncTagSharder(reader tempodb.Reader, o overrides.Interface, cfg Search // until limit results are found func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { r := pipelineRequest.HTTPRequest() - requestCtx := r.Context() + ctx := pipelineRequest.Context() - tenantID, err := user.ExtractOrgID(requestCtx) + tenantID, err := user.ExtractOrgID(ctx) if err != nil { return pipeline.NewBadRequest(err), nil } @@ -201,8 +201,9 @@ func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline. if err != nil { return pipeline.NewBadRequest(err), nil } - ctx, span := tracer.Start(requestCtx, "frontend.ShardSearchTags") + ctx, span := tracer.Start(ctx, "frontend.ShardSearchTags") defer span.End() + pipelineRequest.WithContext(ctx) // calculate and enforce max search duration maxDuration := s.maxDuration(tenantID) @@ -213,7 +214,7 @@ func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline. // build request to search ingester based on query_ingesters_until config and time range // pass subCtx in requests, so we can cancel and exit early - ingesterReq, err := s.ingesterRequest(ctx, tenantID, pipelineRequest, searchReq) + ingesterReq, err := s.ingesterRequest(tenantID, pipelineRequest, searchReq) if err != nil { return nil, err } @@ -223,7 +224,7 @@ func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline. reqCh <- ingesterReq } - s.backendRequests(ctx, tenantID, r, searchReq, reqCh, func(err error) { + s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, reqCh, func(err error) { // todo: actually find a way to return this error to the user s.logger.Log("msg", "failed to build backend requests", "err", err) }) @@ -252,7 +253,7 @@ func (s searchTagSharder) blockMetas(start, end int64, tenantID string) []*backe // backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it. // it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs -func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tagSearchReq, reqCh chan<- pipeline.Request, errFn func(error)) { +func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tagSearchReq, reqCh chan<- pipeline.Request, errFn func(error)) { var blocks []*backend.BlockMeta // request without start or end, search only in ingester @@ -282,7 +283,7 @@ func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, // buildBackendRequests returns a slice of requests that cover all blocks in the store // that are covered by start/end. -func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- pipeline.Request, errFn func(error), searchReq tagSearchReq) { +func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- pipeline.Request, errFn func(error), searchReq tagSearchReq) { defer close(reqCh) hash := searchReq.hash() @@ -296,14 +297,13 @@ func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID str blockID := m.BlockID.String() for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { - subR := parent.Clone(ctx) - subR, err := searchReq.buildTagSearchBlockRequest(subR, blockID, startPage, pages, m) + pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + return searchReq.buildTagSearchBlockRequest(r, blockID, startPage, pages, m) + }) if err != nil { errFn(err) - return + continue } - prepareRequestForQueriers(subR, tenantID) - pipelineR := pipeline.NewHTTPRequest(subR) key := cacheKey(keyPrefix, tenantID, hash, int64(searchReq.start()), int64(searchReq.end()), m, startPage, pages) pipelineR.SetCacheKey(key) @@ -321,10 +321,10 @@ func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID str // that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query. // we should do a copy of the searchReq before use this function, as it is an interface, we cannot guaranteed be passed // by value. -func (s searchTagSharder) ingesterRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tagSearchReq) (*pipeline.HTTPRequest, error) { +func (s searchTagSharder) ingesterRequest(tenantID string, parent pipeline.Request, searchReq tagSearchReq) (pipeline.Request, error) { // request without start or end, search only in ingester if searchReq.start() == 0 || searchReq.end() == 0 { - return s.buildIngesterRequest(ctx, tenantID, parent, searchReq) + return s.buildIngesterRequest(tenantID, parent, searchReq) } now := time.Now() @@ -349,17 +349,17 @@ func (s searchTagSharder) ingesterRequest(ctx context.Context, tenantID string, } newSearchReq := searchReq.newWithRange(ingesterStart, ingesterEnd) - return s.buildIngesterRequest(ctx, tenantID, parent, newSearchReq) + return s.buildIngesterRequest(tenantID, parent, newSearchReq) } -func (s searchTagSharder) buildIngesterRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tagSearchReq) (*pipeline.HTTPRequest, error) { - subR := parent.HTTPRequest().Clone(ctx) - subR, err := searchReq.buildSearchTagRequest(subR) +func (s searchTagSharder) buildIngesterRequest(tenantID string, parent pipeline.Request, searchReq tagSearchReq) (pipeline.Request, error) { + subR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + return searchReq.buildSearchTagRequest(r) + }) if err != nil { return nil, err } - prepareRequestForQueriers(subR, tenantID) - return parent.CloneFromHTTPRequest(subR), nil + return subR, nil } // maxDuration returns the max search duration allowed for this tenant. diff --git a/modules/frontend/tag_sharder_test.go b/modules/frontend/tag_sharder_test.go index 150cd5e0dc2..e47fa87f35c 100644 --- a/modules/frontend/tag_sharder_test.go +++ b/modules/frontend/tag_sharder_test.go @@ -161,7 +161,7 @@ func TestTagsBackendRequests(t *testing.T) { req.endValue = uint32(tc.params.end) } - s.backendRequests(context.TODO(), "test", r, &req, reqCh, func(err error) { + s.backendRequests(context.TODO(), "test", pipeline.NewHTTPRequest(r), &req, reqCh, func(err error) { require.Equal(t, tc.expectedError, err) }) @@ -265,7 +265,7 @@ func TestTagsIngesterRequest(t *testing.T) { } copyReq := searchReq - actualReq, err := s.ingesterRequest(context.Background(), "test", pipelineReq, &searchReq) + actualReq, err := s.ingesterRequest("test", pipelineReq, &searchReq) if tc.expectedError != nil { assert.Equal(t, tc.expectedError, err) continue diff --git a/modules/frontend/traceid_sharder.go b/modules/frontend/traceid_sharder.go index 21bdfa47d24..cb17c09c12a 100644 --- a/modules/frontend/traceid_sharder.go +++ b/modules/frontend/traceid_sharder.go @@ -1,13 +1,12 @@ package frontend import ( - "context" "encoding/hex" "net/http" "github.com/go-kit/log" //nolint:all //deprecated - "github.com/grafana/dskit/user" + "github.com/grafana/dskit/user" "github.com/grafana/tempo/modules/frontend/combiner" "github.com/grafana/tempo/modules/frontend/pipeline" "github.com/grafana/tempo/modules/querier" @@ -40,13 +39,11 @@ func newAsyncTraceIDSharder(cfg *TraceByIDConfig, logger log.Logger) pipeline.As // RoundTrip implements http.RoundTripper func (s asyncTraceSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { - r := pipelineRequest.HTTPRequest() - - ctx, span := tracer.Start(r.Context(), "frontend.ShardQuery") + ctx, span := tracer.Start(pipelineRequest.Context(), "frontend.ShardQuery") defer span.End() - r = r.WithContext(ctx) + pipelineRequest.WithContext(ctx) - reqs, err := s.buildShardedRequests(ctx, r) + reqs, err := s.buildShardedRequests(pipelineRequest) if err != nil { return nil, err } @@ -66,35 +63,43 @@ func (s asyncTraceSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline } return pipeline.NewAsyncSharderFunc(ctx, int(concurrentShards), len(reqs), func(i int) pipeline.Request { - pipelineReq := pipelineRequest.CloneFromHTTPRequest(reqs[i]) + pipelineReq := reqs[i] return pipelineReq }, s.next), nil } // buildShardedRequests returns a slice of requests sharded on the precalculated // block boundaries -func (s *asyncTraceSharder) buildShardedRequests(ctx context.Context, parent *http.Request) ([]*http.Request, error) { +func (s *asyncTraceSharder) buildShardedRequests(parent pipeline.Request) ([]pipeline.Request, error) { userID, err := user.ExtractOrgID(parent.Context()) if err != nil { return nil, err } - reqs := make([]*http.Request, s.cfg.QueryShards) + reqs := make([]pipeline.Request, s.cfg.QueryShards) params := map[string]string{} + + reqs[0], err = cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) { + params[querier.QueryModeKey] = querier.QueryModeIngesters + return api.BuildQueryRequest(r, params), nil + }) + if err != nil { + return nil, err + } + // build sharded block queries - for i := 0; i < len(s.blockBoundaries); i++ { - reqs[i] = parent.Clone(ctx) - if i == 0 { - // ingester query - params[querier.QueryModeKey] = querier.QueryModeIngesters - } else { + for i := 1; i < len(s.blockBoundaries); i++ { + i := i // save the loop variable locally to make sure the closure grabs the correct var. + pipelineR, _ := cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) { // block queries params[querier.BlockStartKey] = hex.EncodeToString(s.blockBoundaries[i-1]) params[querier.BlockEndKey] = hex.EncodeToString(s.blockBoundaries[i]) params[querier.QueryModeKey] = querier.QueryModeBlocks - } - reqs[i] = api.BuildQueryRequest(reqs[i], params) - prepareRequestForQueriers(reqs[i], userID) + + return api.BuildQueryRequest(r, params), nil + }) + + reqs[i] = pipelineR } return reqs, nil diff --git a/modules/frontend/traceid_sharder_test.go b/modules/frontend/traceid_sharder_test.go index 74e3339c6c2..4bc9dbca631 100644 --- a/modules/frontend/traceid_sharder_test.go +++ b/modules/frontend/traceid_sharder_test.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/dskit/user" "github.com/stretchr/testify/require" + "github.com/grafana/tempo/modules/frontend/pipeline" "github.com/grafana/tempo/pkg/blockboundary" ) @@ -24,10 +25,10 @@ func TestBuildShardedRequests(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "blerg") req := httptest.NewRequest("GET", "/", nil).WithContext(ctx) - shardedReqs, err := sharder.buildShardedRequests(ctx, req) + shardedReqs, err := sharder.buildShardedRequests(pipeline.NewHTTPRequest(req)) require.NoError(t, err) require.Len(t, shardedReqs, queryShards) - require.Equal(t, "/querier?mode=ingesters", shardedReqs[0].RequestURI) - urisEqual(t, []string{"/querier?blockEnd=ffffffffffffffffffffffffffffffff&blockStart=00000000000000000000000000000000&mode=blocks"}, []string{shardedReqs[1].RequestURI}) + require.Equal(t, "/querier?mode=ingesters", shardedReqs[0].HTTPRequest().RequestURI) + urisEqual(t, []string{"/querier?blockEnd=ffffffffffffffffffffffffffffffff&blockStart=00000000000000000000000000000000&mode=blocks"}, []string{shardedReqs[1].HTTPRequest().RequestURI}) } diff --git a/modules/generator/generator_test.go b/modules/generator/generator_test.go index db05dfb5ef8..a3ad47c1b6c 100644 --- a/modules/generator/generator_test.go +++ b/modules/generator/generator_test.go @@ -225,7 +225,7 @@ func BenchmarkCollect(b *testing.B) { spanMetricsDimensions: []string{"k8s.cluster.name", "k8s.namespace.name"}, spanMetricsEnableTargetInfo: true, spanMetricsTargetInfoExcludedDimensions: []string{"excluded}"}, - // nativeHistograms: overrides.HistogramMethodBoth, + nativeHistograms: overrides.HistogramMethodBoth, } ) diff --git a/modules/generator/registry/counter.go b/modules/generator/registry/counter.go index 8f7f5af1a09..907a9e57b1f 100644 --- a/modules/generator/registry/counter.go +++ b/modules/generator/registry/counter.go @@ -137,6 +137,7 @@ func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, extern // add metric name baseLabels = append(baseLabels, labels.Label{Name: labels.MetricName, Value: c.metricName}) + // TODO: avoid allocation on each collection lb := labels.NewBuilder(baseLabels) for _, s := range c.series { @@ -166,7 +167,7 @@ func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, extern return } - // TODO support exemplars + // TODO: support exemplars } return diff --git a/modules/generator/registry/gauge.go b/modules/generator/registry/gauge.go index 3a8ec22d86b..e0ca82c882a 100644 --- a/modules/generator/registry/gauge.go +++ b/modules/generator/registry/gauge.go @@ -149,6 +149,7 @@ func (g *gauge) collectMetrics(appender storage.Appender, timeMs int64, external baseLabels = append(baseLabels, labels.Label{Name: name, Value: value}) } + // TODO: avoid allocation on each collection lb := labels.NewBuilder(baseLabels) for _, s := range g.series { @@ -166,7 +167,7 @@ func (g *gauge) collectMetrics(appender storage.Appender, timeMs int64, external if err != nil { return } - // TODO support exemplars + // TODO: support exemplars } return diff --git a/modules/generator/registry/native_histogram.go b/modules/generator/registry/native_histogram.go index 95596f6f315..9ea5f761ae8 100644 --- a/modules/generator/registry/native_histogram.go +++ b/modules/generator/registry/native_histogram.go @@ -1,6 +1,7 @@ package registry import ( + "fmt" "math" "sync" "time" @@ -39,11 +40,19 @@ type nativeHistogram struct { // generate. A diff in the configured value on the processors will cause a // reload of the process, and a new instance of the histogram to be created. histogramOverride HistogramMode + + externalLabels map[string]string + + // classic + nameCount string + nameSum string + nameBucket string } type nativeHistogramSeries struct { // labels should not be modified after creation - labels LabelPair + lb *labels.Builder + labels labels.Labels promHistogram prometheus.Histogram lastUpdated int64 histogram *dto.Histogram @@ -53,6 +62,11 @@ type nativeHistogramSeries struct { // This avoids Prometheus throwing away the first value in the series, // due to the transition from null -> x. firstSeries *atomic.Bool + + // classic + countLabels labels.Labels + sumLabels labels.Labels + // bucketLabels []labels.Labels } func (hs *nativeHistogramSeries) isNew() bool { @@ -68,7 +82,7 @@ var ( _ metric = (*nativeHistogram)(nil) ) -func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), traceIDLabelName string, histogramOverride HistogramMode) *nativeHistogram { +func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), traceIDLabelName string, histogramOverride HistogramMode, externalLabels map[string]string) *nativeHistogram { if onAddSeries == nil { onAddSeries = func(uint32) bool { return true @@ -90,6 +104,12 @@ func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32) traceIDLabelName: traceIDLabelName, buckets: buckets, histogramOverride: histogramOverride, + externalLabels: externalLabels, + + // classic + nameCount: fmt.Sprintf("%s_count", name), + nameSum: fmt.Sprintf("%s_sum", name), + nameBucket: fmt.Sprintf("%s_bucket", name), } } @@ -109,19 +129,15 @@ func (h *nativeHistogram) ObserveWithExemplar(labelValueCombo *LabelValueCombo, return } - newSeries := h.newSeries(labelValueCombo, value, traceID, multiplier) - h.series[hash] = newSeries + h.series[hash] = h.newSeries(labelValueCombo, value, traceID, multiplier) } func (h *nativeHistogram) newSeries(labelValueCombo *LabelValueCombo, value float64, traceID string, multiplier float64) *nativeHistogramSeries { newSeries := &nativeHistogramSeries{ - // TODO move these labels in HistogramOpts.ConstLabels? - labels: labelValueCombo.getLabelPair(), promHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: h.name(), - Help: "Native histogram for metric " + h.name(), - Buckets: h.buckets, - // TODO check if these values are sensible and break them out + Name: h.name(), + Help: "Native histogram for metric " + h.name(), + Buckets: h.buckets, NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 15 * time.Minute, @@ -132,6 +148,31 @@ func (h *nativeHistogram) newSeries(labelValueCombo *LabelValueCombo, value floa h.updateSeries(newSeries, value, traceID, multiplier) + lbls := labelValueCombo.getLabelPair() + lb := labels.NewBuilder(make(labels.Labels, 1+len(lbls.names))) + + // set series labels + for i, name := range lbls.names { + lb.Set(name, lbls.values[i]) + } + // set external labels + for name, value := range h.externalLabels { + lb.Set(name, value) + } + + lb.Set(labels.MetricName, h.metricName) + + newSeries.labels = lb.Labels() + newSeries.lb = lb + + // _count + lb.Set(labels.MetricName, h.nameCount) + newSeries.countLabels = lb.Labels() + + // _sum + lb.Set(labels.MetricName, h.nameSum) + newSeries.sumLabels = lb.Labels() + return newSeries } @@ -149,28 +190,13 @@ func (h *nativeHistogram) name() string { return h.metricName } -func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64, externalLabels map[string]string) (activeSeries int, err error) { +func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64, _ map[string]string) (activeSeries int, err error) { h.seriesMtx.Lock() defer h.seriesMtx.Unlock() - lbls := make(labels.Labels, 1+len(externalLabels)) - lb := labels.NewBuilder(lbls) activeSeries = 0 - lb.Set(labels.MetricName, h.metricName) - - // set external labels - for name, value := range externalLabels { - lb.Set(name, value) - } - for _, s := range h.series { - - // Set series-specific labels - for i, name := range s.labels.names { - lb.Set(name, s.labels.values[i]) - } - // Extract histogram encodedMetric := &dto.Metric{} @@ -180,14 +206,15 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64 return activeSeries, err } - // NOTE: Store the encoded histogram here so we can keep track of the exemplars - // that have been sent. The value is updated here, but the pointers remain - // the same, and so Reset() call below can be used to clear the exemplars. + // NOTE: Store the encoded histogram here so we can keep track of the + // exemplars that have been sent. The value is updated here, but the + // pointers remain the same, and so Reset() call below can be used to clear + // the exemplars. s.histogram = encodedMetric.GetHistogram() // If we are in "both" or "classic" mode, also emit classic histograms. if hasClassicHistograms(h.histogramOverride) { - classicSeries, classicErr := h.classicHistograms(appender, lb, timeMs, s) + classicSeries, classicErr := h.classicHistograms(appender, timeMs, s) if classicErr != nil { return activeSeries, classicErr } @@ -196,7 +223,7 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64 // If we are in "both" or "native" mode, also emit native histograms. if hasNativeHistograms(h.histogramOverride) { - nativeErr := h.nativeHistograms(appender, lb, timeMs, s) + nativeErr := h.nativeHistograms(appender, s.labels, timeMs, s) if nativeErr != nil { return activeSeries, nativeErr } @@ -215,7 +242,7 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64 Ts: convertTimestampToMs(encodedExemplar.GetTimestamp()), HasTs: true, } - _, err = appender.AppendExemplar(0, lb.Labels(), e) + _, err = appender.AppendExemplar(0, s.labels, e) if err != nil { return activeSeries, err } @@ -243,7 +270,7 @@ func (h *nativeHistogram) activeSeriesPerHistogramSerie() uint32 { return 1 } -func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lb *labels.Builder, timeMs int64, s *nativeHistogramSeries) (err error) { +func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lbls labels.Labels, timeMs int64, s *nativeHistogramSeries) (err error) { // Decode to Prometheus representation hist := promhistogram.Histogram{ Schema: s.histogram.GetSchema(), @@ -273,8 +300,7 @@ func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lb *labels } hist.NegativeBuckets = s.histogram.NegativeDelta - lb.Set(labels.MetricName, h.metricName) - _, err = appender.AppendHistogram(0, lb.Labels(), timeMs, &hist, nil) + _, err = appender.AppendHistogram(0, lbls, timeMs, &hist, nil) if err != nil { return err } @@ -282,11 +308,10 @@ func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lb *labels return } -func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *labels.Builder, timeMs int64, s *nativeHistogramSeries) (activeSeries int, err error) { +func (h *nativeHistogram) classicHistograms(appender storage.Appender, timeMs int64, s *nativeHistogramSeries) (activeSeries int, err error) { if s.isNew() { - lb.Set(labels.MetricName, h.metricName+"_count") endOfLastMinuteMs := getEndOfLastMinuteMs(timeMs) - _, err = appender.Append(0, lb.Labels(), endOfLastMinuteMs, 0) + _, err = appender.Append(0, s.countLabels, endOfLastMinuteMs, 0) if err != nil { return activeSeries, err } @@ -294,23 +319,21 @@ func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *label } // sum - lb.Set(labels.MetricName, h.metricName+"_sum") - _, err = appender.Append(0, lb.Labels(), timeMs, s.histogram.GetSampleSum()) + _, err = appender.Append(0, s.sumLabels, timeMs, s.histogram.GetSampleSum()) if err != nil { return activeSeries, err } activeSeries++ // count - lb.Set(labels.MetricName, h.metricName+"_count") - _, err = appender.Append(0, lb.Labels(), timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount())) + _, err = appender.Append(0, s.countLabels, timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount())) if err != nil { return activeSeries, err } activeSeries++ // bucket - lb.Set(labels.MetricName, h.metricName+"_bucket") + s.lb.Set(labels.MetricName, h.metricName+"_bucket") // the Prometheus histogram will sometimes add the +Inf bucket, it depends on whether there is an exemplar // for that bucket or not. To avoid adding it twice, keep track of it with this boolean. @@ -318,21 +341,20 @@ func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *label for _, bucket := range s.histogram.Bucket { // add "le" label - lb.Set(labels.BucketLabel, formatFloat(bucket.GetUpperBound())) + s.lb.Set(labels.BucketLabel, formatFloat(bucket.GetUpperBound())) if bucket.GetUpperBound() == math.Inf(1) { infBucketWasAdded = true } - ref, appendErr := appender.Append(0, lb.Labels(), timeMs, getIfGreaterThenZeroOr(bucket.GetCumulativeCountFloat(), bucket.GetCumulativeCount())) + ref, appendErr := appender.Append(0, s.lb.Labels(), timeMs, getIfGreaterThenZeroOr(bucket.GetCumulativeCountFloat(), bucket.GetCumulativeCount())) if appendErr != nil { return activeSeries, appendErr } activeSeries++ if bucket.Exemplar != nil && len(bucket.Exemplar.Label) > 0 { - // TODO are we appending the same exemplar twice? - _, err = appender.AppendExemplar(ref, lb.Labels(), exemplar.Exemplar{ + _, err = appender.AppendExemplar(ref, s.lb.Labels(), exemplar.Exemplar{ Labels: convertLabelPairToLabels(bucket.Exemplar.GetLabel()), Value: bucket.Exemplar.GetValue(), Ts: timeMs, @@ -346,9 +368,9 @@ func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *label if !infBucketWasAdded { // Add +Inf bucket - lb.Set(labels.BucketLabel, "+Inf") + s.lb.Set(labels.BucketLabel, "+Inf") - _, err = appender.Append(0, lb.Labels(), timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount())) + _, err = appender.Append(0, s.lb.Labels(), timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount())) if err != nil { return activeSeries, err } @@ -356,7 +378,7 @@ func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *label } // drop "le" label again - lb.Del(labels.BucketLabel) + s.lb.Del(labels.BucketLabel) return } diff --git a/modules/generator/registry/native_histogram_test.go b/modules/generator/registry/native_histogram_test.go index 91ad19e34e2..504c6d4d44e 100644 --- a/modules/generator/registry/native_histogram_test.go +++ b/modules/generator/registry/native_histogram_test.go @@ -20,7 +20,7 @@ func Test_ObserveWithExemplar_duplicate(t *testing.T) { return true } - h := newNativeHistogram("my_histogram", []float64{0.1, 0.2}, onAdd, nil, "trace_id", HistogramModeBoth) + h := newNativeHistogram("my_histogram", []float64{0.1, 0.2}, onAdd, nil, "trace_id", HistogramModeBoth, nil) lv := newLabelValueCombo([]string{"label"}, []string{"value-1"}) @@ -463,7 +463,7 @@ func Test_Histograms(t *testing.T) { } onAdd := func(uint32) bool { return true } - h := newNativeHistogram("test_histogram", tc.buckets, onAdd, nil, "trace_id", HistogramModeBoth) + h := newNativeHistogram("test_histogram", tc.buckets, onAdd, nil, "trace_id", HistogramModeBoth, nil) testHistogram(t, h, tc.collections) }) }) diff --git a/modules/generator/registry/registry.go b/modules/generator/registry/registry.go index 5b96f586b25..d7fca905032 100644 --- a/modules/generator/registry/registry.go +++ b/modules/generator/registry/registry.go @@ -156,7 +156,7 @@ func (r *ManagedRegistry) NewHistogram(name string, buckets []float64, histogram // are disabled, eventually the new implementation can handle all cases if hasNativeHistograms(histogramOverride) { - h = newNativeHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName, histogramOverride) + h = newNativeHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName, histogramOverride, r.externalLabels) } else { h = newHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName, r.externalLabels) } diff --git a/pkg/api/query_builder.go b/pkg/api/query_builder.go index 7a43191ffec..fdabf1ceb06 100644 --- a/pkg/api/query_builder.go +++ b/pkg/api/query_builder.go @@ -15,6 +15,7 @@ func newQueryBuilder(init string) *queryBuilder { builder: strings.Builder{}, } + qb.builder.Grow(100) // pre-allocate some space. ideally the caller could indicate roughly the expected size, but starting with 100 bytes significantly outperforms 0 qb.builder.WriteString(init) return qb }