Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott committed Nov 7, 2024
1 parent 871f75e commit f09ea56
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 23 deletions.
30 changes: 15 additions & 15 deletions modules/frontend/combiner/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ func (s *ShardCompletionTracker) addShards(shards []SearchShards) uint32 {
s.foundResponses = temp
}

for s.incrementCurShardIfComplete() {
}
s.incrementCurShardIfComplete()

return s.completedThroughSeconds
}
Expand Down Expand Up @@ -205,23 +204,24 @@ func (s *ShardCompletionTracker) addShardIdx(shardIdx int) uint32 {
}

s.foundResponses[shardIdx]++
for s.incrementCurShardIfComplete() {
}
s.incrementCurShardIfComplete()

return s.completedThroughSeconds
}

func (s *ShardCompletionTracker) incrementCurShardIfComplete() bool {
if s.curShard >= len(s.shards) {
return false
}

if s.foundResponses[s.curShard] == int(s.shards[s.curShard].TotalJobs) {
s.completedThroughSeconds = s.shards[s.curShard].CompletedThroughSeconds
s.curShard++
// incrementCurShardIfComplete tests to see if the current shard is complete and increments it if so.
// it does this repeatedly until it finds a shard that is not complete.
func (s *ShardCompletionTracker) incrementCurShardIfComplete() {
for {
if s.curShard >= len(s.shards) {
break
}

return true
if s.foundResponses[s.curShard] == int(s.shards[s.curShard].TotalJobs) {
s.completedThroughSeconds = s.shards[s.curShard].CompletedThroughSeconds
s.curShard++
} else {
break
}
}

return false
}
10 changes: 4 additions & 6 deletions modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipelin
}

// pass subCtx in requests so we can cancel and exit early
jobMetrics = s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, jobMetrics, reqCh, func(err error) {
s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, jobMetrics, reqCh, func(err error) {
// todo: actually find a way to return this error to the user
s.logger.Log("msg", "search: failed to build backend requests", "err", err)
})
Expand Down Expand Up @@ -136,11 +136,11 @@ func (s *asyncSearchSharder) blockMetas(start, end uint32, tenantID string) []*b

// backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it.
// it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs
func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, resp *combiner.SearchJobResponse, reqCh chan<- pipeline.Request, errFn func(error)) *combiner.SearchJobResponse {
func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, resp *combiner.SearchJobResponse, reqCh chan<- pipeline.Request, errFn func(error)) {
// request without start or end, search only in ingester
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
return resp
return
}

// calculate duration (start and end) to search the backend blocks
Expand All @@ -149,7 +149,7 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin
// no need to search backend
if start == end {
close(reqCh)
return resp
return
}

blocks := s.blockMetas(start, end, tenantID)
Expand All @@ -171,8 +171,6 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin
go func() {
buildBackendRequests(ctx, tenantID, parent, searchReq, blockIter, reqCh, errFn)
}()

return resp
}

// ingesterRequest returns a new start and end time range for the backend as well as an http request
Expand Down
4 changes: 3 additions & 1 deletion modules/frontend/search_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ func TestBackendRequests(t *testing.T) {

ctx, cancelCause := context.WithCancelCause(context.Background())
pipelineRequest := pipeline.NewHTTPRequest(r)
searchJobResponse := s.backendRequests(ctx, "test", pipelineRequest, searchReq, &combiner.SearchJobResponse{}, reqCh, cancelCause)

searchJobResponse := &combiner.SearchJobResponse{}
s.backendRequests(ctx, "test", pipelineRequest, searchReq, searchJobResponse, reqCh, cancelCause)
require.Equal(t, tc.expectedJobs, searchJobResponse.TotalJobs)
require.Equal(t, tc.expectedBlocks, searchJobResponse.TotalBlocks)
require.Equal(t, tc.expectedBlockBytes, searchJobResponse.TotalBytes)
Expand Down
1 change: 0 additions & 1 deletion pkg/traceql/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func newAnyCombiner(limit int) *anyCombiner {
// addSpanset adds a new spanset to the combiner. It only performs the asTraceSearchMetadata
// conversion if the spanset will be added
func (c *anyCombiner) addSpanset(new *Spanset) {
// else let's see if it's worth converting this to a metadata and adding it
// if it's already in the list, then we should add it
if _, ok := c.trs[util.TraceIDToHexString(new.TraceID)]; ok {
c.AddMetadata(asTraceSearchMetadata(new))
Expand Down

0 comments on commit f09ea56

Please sign in to comment.