Skip to content

Commit

Permalink
fix: exemplars metadata and timestamps (grafana#1299)
Browse files Browse the repository at this point in the history
* fix: load exemplars metadata from a segment stub

* fix: fail fast

* fix: make exemplar query to honor time

* fix: incorrect time field
  • Loading branch information
kolesnikovae authored Jul 25, 2022
1 parent c163b37 commit 1a239f2
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 42 deletions.
31 changes: 25 additions & 6 deletions pkg/server/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/sirupsen/logrus"

Expand All @@ -29,6 +30,10 @@ type mergeRequest struct {
EndTime string `json:"endTime"`
Profiles []string `json:"profiles"`
MaxNodes int `json:"maxNodes"`

// For consistency with render handler: `startTime` and `endTime` take precedence.
From string `json:"from"`
Until string `json:"until"`
}

type mergeResponse struct {
Expand Down Expand Up @@ -71,12 +76,7 @@ func (mh *MergeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
maxNodes = req.MaxNodes
}

out, err := mh.storage.MergeExemplars(r.Context(), storage.MergeExemplarsInput{
StartTime: attime.Parse(req.StartTime),
EndTime: attime.Parse(req.EndTime),
AppName: req.AppName,
ProfileIDs: req.Profiles,
})
out, err := mh.storage.MergeExemplars(r.Context(), mergeExemplarsInputFromMergeRequest(req))
if err != nil {
mh.httpUtils.WriteInternalServerError(r, w, err, "failed to retrieve data")
return
Expand Down Expand Up @@ -106,3 +106,22 @@ func (mh *MergeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mh.stats.StatsInc("merge")
mh.httpUtils.WriteResponseJSON(r, w, resp)
}

func mergeExemplarsInputFromMergeRequest(req mergeRequest) storage.MergeExemplarsInput {
return storage.MergeExemplarsInput{
AppName: req.AppName,
StartTime: pickTime(req.StartTime, req.From),
EndTime: pickTime(req.EndTime, req.Until),
ProfileIDs: req.Profiles,
}
}

func pickTime(primary, fallback string) time.Time {
if primary != "" {
return attime.Parse(primary)
}
if fallback != "" {
return attime.Parse(fallback)
}
return time.Time{}
}
24 changes: 24 additions & 0 deletions pkg/storage/exemplars.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ func (e *exemplars) fetch(ctx context.Context, appName string, profileIDs []stri
return err
case errors.Is(err, badger.ErrKeyNotFound):
case err == nil:
// TODO(kolesnikovae): Optimize:
// It makes sense to lookup the dictionary keys only after all
// exemplars fetched and merged.
err = item.Value(func(val []byte) error {
e.metrics.exemplarsReadBytes.Observe(float64(len(val)))
var x exemplarEntry
Expand Down Expand Up @@ -365,6 +368,27 @@ func (e *exemplars) truncateN(before time.Time, count int) (bool, error) {
return true, err
}

func (s *Storage) ensureAppSegmentExists(in *PutInput) error {
k := segment.AppSegmentKey(in.Key.AppName())
r, err := s.segments.GetOrCreate(k)
if err != nil {
return fmt.Errorf("segments cache for %v: %w", k, err)
}
st := r.(*segment.Segment)
if !isMetadataEqual(st, in) {
st.SetMetadata(in.SpyName, in.SampleRate, in.Units, in.AggregationType)
s.segments.Put(k, st)
}
return err
}

func isMetadataEqual(s *segment.Segment, in *PutInput) bool {
return in.SpyName == s.SpyName() &&
in.AggregationType == s.AggregationType() &&
in.SampleRate == s.SampleRate() &&
in.Units == s.Units()
}

func (b *exemplarsBatch) insert(_ context.Context, input *PutInput) error {
if len(b.entries) == exemplarsPerBatch {
return errBatchIsFull
Expand Down
85 changes: 65 additions & 20 deletions pkg/storage/exemplars_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pyroscope-io/pyroscope/pkg/testing"
)

var _ = Describe("exemplars", func() {
var _ = Describe("Exemplars retrieval", func() {
st := time.Now()
et := st.Add(10 * time.Second)

Expand All @@ -54,11 +54,6 @@ var _ = Describe("exemplars", func() {
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())

put(s, map[string]string{
"__name__": "app.cpu",
"span_name": "foo",
// w/o profile_id, just to create the segment.
})
put(s, map[string]string{
"__name__": "app.cpu",
"span_name": "foo",
Expand Down Expand Up @@ -201,14 +196,6 @@ var _ = Describe("Exemplars retention policy", func() {
Val: tree.Clone(big.NewRat(1, 1)),
})).ToNot(HaveOccurred())

// Just to create the segment.
k3, _ := segment.ParseKey("app.cpu{}")
Expect(s.Put(context.TODO(), &PutInput{
StartTime: t3,
EndTime: t4,
Key: k3,
Val: tree.Clone(big.NewRat(1, 1)),
})).ToNot(HaveOccurred())
s.exemplars.Sync()
rp := &segment.RetentionPolicy{ExemplarsRetentionTime: t3}
s.exemplars.enforceRetentionPolicy(context.Background(), rp)
Expand All @@ -232,12 +219,6 @@ var _ = Describe("Exemplars retention policy", func() {
})
})

func randomBytesHex(n int) string {
b := make([]byte, n)
rand.Read(b)
return hex.EncodeToString(b)
}

var _ = Describe("Concurrent exemplars insertion", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
Expand Down Expand Up @@ -355,3 +336,67 @@ var _ = Describe("Exemplar serialization", func() {
})
})
})

var _ = Describe("Exemplar timestamps", func() {
Context("exemplars query", func() {
It("selects all entries if no time range is provided or timestamps are not present", func() {
for i := 0; i < 0xF; i++ {
e := exemplarEntry{
StartTime: bitAt(i, 3),
EndTime: bitAt(i, 2),
}
startTime := bitAt(i, 1)
endTime := bitAt(i, 0)

Expect(exemplarMatchesTimeRange(e, startTime, endTime)).To(BeTrue())
}
})

It("selects matched entries", func() {
startTime := time.Now().UnixNano()
endTime := startTime + 3
e := exemplarEntry{
StartTime: startTime,
EndTime: endTime,
}

for _, r := range [][2]int64{
{0, 0},
{1, -1},
{-1, 1},

{0, 1},
{1, 0},
{1, 1},

{0, -1},
{-1, 0},
{-1, -1},
} {
Expect(exemplarMatchesTimeRange(e, startTime+r[0], endTime+r[1])).To(BeTrue())
}

for _, r := range [][2]int64{
{endTime, endTime},
{endTime, endTime + 1},
{startTime, startTime},
{startTime - 1, startTime},
} {
Expect(exemplarMatchesTimeRange(e, r[0], r[1])).To(BeFalse())
}
})
})
})

func randomBytesHex(n int) string {
b := make([]byte, n)
rand.Read(b)
return hex.EncodeToString(b)
}

func bitAt(n, b int) int64 {
if n&(1<<b) > 0 {
return 1
}
return 0
}
2 changes: 2 additions & 0 deletions pkg/storage/segment/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func (k *Key) ProfileID() (string, bool) {
return id, ok
}

func AppSegmentKey(appName string) string { return appName + "{}" }

func TreeKey(k string, depth int, unixTime int64) string {
return k + ":" + strconv.Itoa(depth) + ":" + strconv.FormatInt(unixTime, 10)
}
Expand Down
88 changes: 72 additions & 16 deletions pkg/storage/storage_merge_exemplars.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
"math"
"math/big"
"time"

Expand Down Expand Up @@ -61,31 +62,86 @@ type exemplarsMerge struct {

func (s *Storage) mergeExemplars(ctx context.Context, mi MergeExemplarsInput) (out exemplarsMerge, err error) {
out.tree = tree.New()
startTime := unixNano(mi.StartTime)
endTime := unixNano(mi.EndTime)
err = s.exemplars.fetch(ctx, mi.AppName, mi.ProfileIDs, func(e exemplarEntry) error {
out.tree.Merge(e.Tree)
out.count++
out.lastEntry = &e
if exemplarMatchesTimeRange(e, startTime, endTime) {
out.tree.Merge(e.Tree)
out.count++
out.lastEntry = &e
}
return nil
})
if err != nil || out.lastEntry == nil {
return out, err
}
out.segment, err = s.findSegmentForExemplar(out.lastEntry)
return out, err
// Note that exemplar entry labels don't contain the app name and profile ID.
if out.lastEntry.Labels == nil {
out.lastEntry.Labels = make(map[string]string)
}
r, ok := s.segments.Lookup(segment.AppSegmentKey(mi.AppName))
if !ok {
return out, fmt.Errorf("no metadata found for app %q", mi.AppName)
}
out.segment = r.(*segment.Segment)
return out, nil
}

func (s *Storage) findSegmentForExemplar(e *exemplarEntry) (*segment.Segment, error) {
// Note that exemplar entry labels doesn't contain the app name and profile ID.
if e.Labels == nil {
e.Labels = make(map[string]string)
func unixNano(t time.Time) int64 {
if t.IsZero() {
return 0
}
labels := map[string]string{"__name__": e.AppName}
for k, v := range e.Labels {
labels[k] = v
return t.UnixNano()
}

// exemplarMatchesTimeRange reports whether the exemplar is eligible for the
// given time range. Potentially, we could take exact fraction and scale the
// exemplar proportionally, in the way we do it in aggregate queries. However,
// with exemplars down-sampling does not seem to be a good idea as it may be
// confusing.
//
// For backward compatibility, an exemplar is considered eligible if the time
// range is not specified, or if the exemplar does not have timestamps.
func exemplarMatchesTimeRange(e exemplarEntry, startTime, endTime int64) bool {
if startTime == 0 || endTime == 0 || e.StartTime == 0 || e.EndTime == 0 {
return true
}
r, ok := s.segments.Lookup(segment.NewKey(labels).Normalized())
if !ok {
return nil, fmt.Errorf("no metadata found for profile %q", e.ProfileID)
return !math.IsNaN(overlap(startTime, endTime, e.StartTime, e.EndTime))
}

// overlap returns the overlap of the ranges
// indicating the exemplar time range fraction.
//
// query: from – until
// exemplar: start – end
//
// Special cases:
// +Inf - query matches or includes exemplar
// NaN - ranges don't overlap
//
func overlap(from, until, start, end int64) float64 {
span := end - start
o := min(until, end) - max(from, start)
switch {
case o <= 0:
return math.NaN()
case o == span:
return math.Inf(0)
default:
return float64(o) / float64(span)
}
}

func min(a, b int64) int64 {
if b < a {
return b
}
return a
}

func max(a, b int64) int64 {
if b > a {
return b
}
return r.(*segment.Segment), nil
return a
}
3 changes: 3 additions & 0 deletions pkg/storage/storage_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func (s *Storage) Put(ctx context.Context, pi *PutInput) error {

s.putTotal.Inc()
if pi.Key.HasProfileID() {
if err := s.ensureAppSegmentExists(pi); err != nil {
return err
}
return s.exemplars.insert(ctx, pi)
}

Expand Down

0 comments on commit 1a239f2

Please sign in to comment.