Skip to content

Commit

Permalink
feat: query pprof from store-gateway (#2694)
Browse files Browse the repository at this point in the history
* feat: query pprof from store-gateway

* Add basic unit test
  • Loading branch information
kolesnikovae authored Nov 16, 2023
1 parent 826665a commit 67c47fd
Show file tree
Hide file tree
Showing 7 changed files with 514 additions and 173 deletions.
125 changes: 89 additions & 36 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,59 +1181,112 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
otlog.Object("hints", request.Hints),
)

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), nil)
if err != nil {
return err
}

iters, err := SelectMatchingProfiles(ctx, request, queriers)
queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), request.Hints)
if err != nil {
return err
}

// send batches of profiles to client and filter via bidi stream.
selectedProfiles, err := filterProfiles[
BidiServerMerge[*ingestv1.MergeProfilesPprofResponse, *ingestv1.MergeProfilesPprofRequest],
*ingestv1.MergeProfilesPprofResponse,
*ingestv1.MergeProfilesPprofRequest](ctx, iters, defaultBatchSize, stream)
if err != nil {
return err
deduplicationNeeded := true
if request.Hints != nil && request.Hints.Block != nil {
deduplicationNeeded = request.Hints.Block.Deduplication
}

result := make([]*profile.Profile, 0, len(queriers))
var lock sync.Mutex
result := make([]*profile.Profile, 0, len(queriers))
g, ctx := errgroup.WithContext(ctx)
for i, querier := range queriers {
i := i
querier := querier
if len(selectedProfiles[i]) == 0 {
continue

// depending on if new need deduplication or not there are two different code paths.
if !deduplicationNeeded {
// signal the end of the profile streaming by sending an empty response.
sp.LogFields(otlog.String("msg", "no profile streaming as no deduplication needed"))
if err = stream.Send(&ingestv1.MergeProfilesPprofResponse{}); err != nil {
return err
}
// Sort profiles for better read locality.
// Merge async the result so we can continue streaming profiles.
g.Go(util.RecoverPanic(func() error {
merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(selectedProfiles[i])))
if err != nil {
return err

// in this path we can just merge the profiles from each block and send the result to the client.
for _, querier := range queriers {
querier := querier
g.Go(util.RecoverPanic(func() error {

iters, err := querier.SelectMatchingProfiles(ctx, request)
if err != nil {
return err
}
defer func() {
iters.Close()
}()

profiles, err := iter.Slice(iters)
if err != nil {
return err
}

if len(profiles) == 0 {
return nil
}

merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(profiles)))
if err != nil {
return err
}

lock.Lock()
result = append(result, merge)
lock.Unlock()
return nil
}))
}
} else {
// in this path we have to go thorugh every profile and deduplicate them.
iters, err := SelectMatchingProfiles(ctx, request, queriers)
if err != nil {
return err
}

// send batches of profiles to client and filter via bidi stream.
selectedProfiles, err := filterProfiles[
BidiServerMerge[*ingestv1.MergeProfilesPprofResponse, *ingestv1.MergeProfilesPprofRequest],
*ingestv1.MergeProfilesPprofResponse,
*ingestv1.MergeProfilesPprofRequest](ctx, iters, defaultBatchSize, stream)
if err != nil {
return err
}

for i, querier := range queriers {
querier := querier
i := i
if len(selectedProfiles[i]) == 0 {
continue
}
lock.Lock()
defer lock.Unlock()
result = append(result, merge)
return nil
}))
}
// Sort profiles for better read locality.
// Merge async the result so we can continue streaming profiles.
g.Go(util.RecoverPanic(func() error {
merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(selectedProfiles[i])))
if err != nil {
return err
}
lock.Lock()
result = append(result, merge)
lock.Unlock()
return nil
}))
}

// Signals the end of the profile streaming by sending an empty response.
// This allows the client to not block other streaming ingesters.
if err := stream.Send(&ingestv1.MergeProfilesPprofResponse{}); err != nil {
return err
// Signals the end of the profile streaming by sending an empty response.
// This allows the client to not block other streaming ingesters.
sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming"))
if err = stream.Send(&ingestv1.MergeProfilesPprofResponse{}); err != nil {
return err
}
}

if err := g.Wait(); err != nil {
if err = g.Wait(); err != nil {
return err
}

if len(result) == 0 {
result = append(result, &profile.Profile{})
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"golang.org/x/sync/errgroup"

googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
Expand Down Expand Up @@ -138,6 +139,62 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se
return selectMergeTree(gCtx, responses)
}

func (q *Querier) selectProfileFromIngesters(ctx context.Context, req *querierv1.SelectMergeProfileRequest, plan blockPlan) (*googlev1.Profile, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectProfile Ingesters")
defer sp.Finish()
profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
_, err = parser.ParseMetricSelector(req.LabelSelector)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesPprof]
if plan != nil {
responses, err = forAllPlannedIngesters(ctx, q.ingesterQuerier, plan, func(ctx context.Context, ic IngesterQueryClient, hints *ingestv1.Hints) (clientpool.BidiClientMergeProfilesPprof, error) {
return ic.MergeProfilesPprof(ctx), nil
})
} else {
responses, err = forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesPprof, error) {
return ic.MergeProfilesPprof(ctx), nil
})
}
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
// send the first initial request to all ingesters.
g, gCtx := errgroup.WithContext(ctx)
for idx := range responses {
r := responses[idx]
hints, ok := plan[r.addr]
if !ok && plan != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("no hints found for replica %s", r.addr))
}

g.Go(util.RecoverPanic(func() error {
return r.response.Send(&ingestv1.MergeProfilesPprofRequest{
Request: &ingestv1.SelectProfilesRequest{
LabelSelector: req.LabelSelector,
Start: req.Start,
End: req.End,
Type: profileType,
Hints: &ingestv1.Hints{Block: hints},
},
})
}))
}
if err = g.Wait(); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}

// merge all profiles
return selectMergePprofProfile(gCtx, profileType, responses)
}

func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest, plan map[string]*ingestv1.BlockHints) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries Ingesters")
defer sp.Finish()
Expand Down
104 changes: 69 additions & 35 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/grafana/pyroscope/pkg/clientpool"
"github.com/grafana/pyroscope/pkg/iter"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/util"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/util/math"
"github.com/grafana/pyroscope/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -678,6 +678,15 @@ func (sq storeQuery) MergeSpanProfileRequest(req *querierv1.SelectMergeSpanProfi
}
}

func (sq storeQuery) MergeProfileRequest(req *querierv1.SelectMergeProfileRequest) *querierv1.SelectMergeProfileRequest {
return &querierv1.SelectMergeProfileRequest{
ProfileTypeID: req.ProfileTypeID,
LabelSelector: req.LabelSelector,
Start: int64(sq.start),
End: int64(sq.end),
}
}

func (sq storeQuery) SeriesRequest(req *querierv1.SeriesRequest) *ingestv1.SeriesRequest {
return &ingestv1.SeriesRequest{
Start: int64(sq.start),
Expand Down Expand Up @@ -741,50 +750,75 @@ func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[q
sp.Finish()
}()

profileType, err := phlaremodel.ParseProfileTypeSelector(req.Msg.ProfileTypeID)
profile, err := q.selectProfile(ctx, req.Msg)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
return nil, err
}
_, err = parser.ParseMetricSelector(req.Msg.LabelSelector)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
profile.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano()
profile.TimeNanos = model.Time(req.Msg.End).UnixNano()
return connect.NewResponse(profile), nil
}

func (q *Querier) selectProfile(ctx context.Context, req *querierv1.SelectMergeProfileRequest) (*googlev1.Profile, error) {
// determine the block hints
plan, err := q.blockSelect(ctx, model.Time(req.Start), model.Time(req.End))
if isEndpointNotExistingErr(err) {
level.Warn(spanlogger.FromContext(ctx, q.logger)).Log(
"msg", "block select not supported on at least one component, fallback to use full dataset",
"err", err,
)
plan = nil
} else if err != nil {
return nil, fmt.Errorf("error during block select: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()

responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesPprof, error) {
return ic.MergeProfilesPprof(ctx), nil
})
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
// no store gateways configured so just query the ingesters
if q.storeGatewayQuerier == nil {
return q.selectProfileFromIngesters(ctx, req, plan)
}
// send the first initial request to all ingesters.
g, gCtx := errgroup.WithContext(ctx)
for _, r := range responses {
r := r
g.Go(util.RecoverPanic(func() error {
return r.response.Send(&ingestv1.MergeProfilesPprofRequest{
Request: &ingestv1.SelectProfilesRequest{
LabelSelector: req.Msg.LabelSelector,
Start: req.Msg.Start,
End: req.Msg.End,
Type: profileType,
},
})
}))

storeQueries := splitQueryToStores(model.Time(req.Start), model.Time(req.End), model.Now(), q.cfg.QueryStoreAfter, plan)
if !storeQueries.ingester.shouldQuery && !storeQueries.storeGateway.shouldQuery {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("start and end time are outside of the ingester and store gateway retention"))
}
if err := g.Wait(); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)

storeQueries.Log(level.Debug(spanlogger.FromContext(ctx, q.logger)))

if plan == nil && !storeQueries.ingester.shouldQuery {
return q.selectProfileFromStoreGateway(ctx, storeQueries.storeGateway.MergeProfileRequest(req), plan)
}
if plan == nil && !storeQueries.storeGateway.shouldQuery {
return q.selectProfileFromIngesters(ctx, storeQueries.ingester.MergeProfileRequest(req), plan)
}

// merge all profiles
profile, err := selectMergePprofProfile(gCtx, profileType, responses)
if err != nil {
g, ctx := errgroup.WithContext(ctx)
var lock sync.Mutex
var merge pprof.ProfileMerge
g.Go(func() error {
ingesterProfile, err := q.selectProfileFromIngesters(ctx, storeQueries.ingester.MergeProfileRequest(req), plan)
if err != nil {
return err
}
lock.Lock()
err = merge.Merge(ingesterProfile)
lock.Unlock()
return err
})
g.Go(func() error {
storegatewayProfile, err := q.selectProfileFromStoreGateway(ctx, storeQueries.storeGateway.MergeProfileRequest(req), plan)
if err != nil {
return err
}
lock.Lock()
err = merge.Merge(storegatewayProfile)
lock.Unlock()
return err
})
if err := g.Wait(); err != nil {
return nil, err
}
profile.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano()
profile.TimeNanos = model.Time(req.Msg.End).UnixNano()
return connect.NewResponse(profile), nil

return merge.Profile(), nil
}

func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) (*connect.Response[querierv1.SelectSeriesResponse], error) {
Expand Down
Loading

0 comments on commit 67c47fd

Please sign in to comment.