Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott committed Nov 1, 2024
1 parent 02eb410 commit 4730c54
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 21 deletions.
5 changes: 3 additions & 2 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,9 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR
})
}

// cloneChildRequest returns a cloned http.Request from the pipeline.Request.
func cloneChildRequest(parent pipeline.Request, tenant string, modHTTP func(*http.Request) (*http.Request, error)) (pipeline.Request, error) {
// cloneRequestforQueriers returns a cloned pipeline.Request from the passed pipeline.Request ready for queriers. The caller is given an opportunity
// to modify the internal http.Request before it is returned using the modHTTP param. If modHTTP is nil, the internal http.Request is returned.
func cloneRequestforQueriers(parent pipeline.Request, tenant string, modHTTP func(*http.Request) (*http.Request, error)) (pipeline.Request, error) {
// first clone the http request with headers nil'ed out. this prevents the headers from being copied saving allocs
// here and especially downstream in the httpgrpc bridge. prepareRequestForQueriers will add the only headers that
// the queriers actually need.
Expand Down
10 changes: 7 additions & 3 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s

dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
// errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
_ = level.Error(s.logger).Log("msg", "failed to convert dedicated columns in query range sharder. skipping", "block", m.BlockID, "err", err)
continue
}

Expand All @@ -260,7 +260,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
continue
}

pipelineR, _ := cloneChildRequest(parent, tenantID, func(r *http.Request) (*http.Request, error) {
pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
queryRangeReq := &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: start,
Expand All @@ -281,6 +281,10 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s

return api.BuildQueryRangeRequest(r, queryRangeReq, dedColsJSON), nil
})
if err != nil {
_ = level.Error(s.logger).Log("msg", "failed to cloneRequestForQuerirs in the query range sharder. skipping", "block", m.BlockID, "err", err)
continue
}

// TODO: Handle sampling rate
key := queryRangeCacheKey(tenantID, queryHash, int64(start), int64(end), m, int(step), pages)
Expand Down Expand Up @@ -313,7 +317,7 @@ func (s *queryRangeSharder) generatorRequest(tenantID string, parent pipeline.Re

searchReq.QueryMode = querier.QueryModeRecent

subR, _ := cloneChildRequest(parent, tenantID, func(r *http.Request) (*http.Request, error) {
subR, _ := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
return api.BuildQueryRangeRequest(r, &searchReq, ""), nil
})

Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.
}

for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
pipelineR, err := cloneChildRequest(parent, tenantID, func(r *http.Request) (*http.Request, error) {
pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
r, err = api.BuildSearchBlockRequest(r, &tempopb.SearchBlockRequest{
BlockID: blockID,
StartPage: uint32(startPage),
Expand Down Expand Up @@ -399,7 +399,7 @@ func pagesPerRequest(m *backend.BlockMeta, bytesPerRequest int) int {
}

func buildIngesterRequest(tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, reqCh chan pipeline.Request) error {
subR, err := cloneChildRequest(parent, tenantID, func(r *http.Request) (*http.Request, error) {
subR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
return api.BuildSearchRequest(r, searchReq)
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/tag_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID str

blockID := m.BlockID.String()
for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
pipelineR, err := cloneChildRequest(parent, tenantID, func(r *http.Request) (*http.Request, error) {
pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
return searchReq.buildTagSearchBlockRequest(r, blockID, startPage, pages, m)
})
if err != nil {
Expand Down Expand Up @@ -352,7 +352,7 @@ func (s searchTagSharder) ingesterRequest(tenantID string, parent pipeline.Reque
}

func (s searchTagSharder) buildIngesterRequest(tenantID string, parent pipeline.Request, searchReq tagSearchReq) (pipeline.Request, error) {
subR, err := cloneChildRequest(parent, tenantID, func(r *http.Request) (*http.Request, error) {
subR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
return searchReq.buildSearchTagRequest(r)
})
if err != nil {
Expand Down
29 changes: 17 additions & 12 deletions modules/frontend/traceid_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"net/http"

"github.com/go-kit/log" //nolint:all //deprecated
"github.com/grafana/dskit/user"

"github.com/grafana/dskit/user"
"github.com/grafana/tempo/modules/frontend/combiner"
"github.com/grafana/tempo/modules/frontend/pipeline"
"github.com/grafana/tempo/modules/querier"
Expand Down Expand Up @@ -78,19 +78,24 @@ func (s *asyncTraceSharder) buildShardedRequests(parent pipeline.Request) ([]pip

reqs := make([]pipeline.Request, s.cfg.QueryShards)
params := map[string]string{}

reqs[0], err = cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
params[querier.QueryModeKey] = querier.QueryModeIngesters
return api.BuildQueryRequest(r, params), nil
})
if err != nil {
return nil, err
}

// build sharded block queries
for i := 0; i < len(s.blockBoundaries); i++ {
for i := 1; i < len(s.blockBoundaries); i++ {
i := i // save the loop variable locally to make sure the closure grabs the correct var.
pipelineR, _ := cloneChildRequest(parent, userID, func(r *http.Request) (*http.Request, error) {
if i == 0 {
// ingester query
params[querier.QueryModeKey] = querier.QueryModeIngesters
} else {
// block queries
params[querier.BlockStartKey] = hex.EncodeToString(s.blockBoundaries[i-1])
params[querier.BlockEndKey] = hex.EncodeToString(s.blockBoundaries[i])
params[querier.QueryModeKey] = querier.QueryModeBlocks
}
pipelineR, _ := cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
// block queries
params[querier.BlockStartKey] = hex.EncodeToString(s.blockBoundaries[i-1])
params[querier.BlockEndKey] = hex.EncodeToString(s.blockBoundaries[i])
params[querier.QueryModeKey] = querier.QueryModeBlocks

return api.BuildQueryRequest(r, params), nil
})

Expand Down

0 comments on commit 4730c54

Please sign in to comment.