Skip to content

Commit

Permalink
fix(merge): add unit test and correct some corner cases
Browse files Browse the repository at this point in the history
  • Loading branch information
SOF3 committed Aug 18, 2023
1 parent 6e6c908 commit 49c47cc
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 71 deletions.
79 changes: 38 additions & 41 deletions pkg/frontend/reader/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"k8s.io/apimachinery/pkg/util/sets"

jaegerbackend "github.com/kubewharf/kelemetry/pkg/frontend/backend"
Expand Down Expand Up @@ -55,24 +54,24 @@ func (tr RawTree) FromThumbnail(self *RawTree, tt *jaegerbackend.TraceThumbnail)
self.Tree = tt.Spans
}

func (merger *Merger[M]) AddTraces(trees []TraceWithMetadata[M]) error {
func (merger *Merger[M]) AddTraces(trees []TraceWithMetadata[M]) (_affected sets.Set[objKey], _err error) {
if merger.objects == nil {
merger.objects = make(map[objKey]*object[M])
}

affected := sets.New[objKey]()
for _, trace := range trees {
key, _ := zconstants.ObjectKeyFromSpan(trace.Tree.Root)
key := zconstants.ObjectKeyFromSpan(trace.Tree.Root)
affected.Insert(key)

if obj, hasPrev := merger.objects[key]; hasPrev {
if err := obj.merge(trace.Tree, trace.Metadata); err != nil {
return err
return nil, err
}
} else {
obj, err := newObject[M](key, trace.Tree, trace.Metadata)
if err != nil {
return err
return nil, err
}

merger.objects[key] = obj
Expand All @@ -83,7 +82,7 @@ func (merger *Merger[M]) AddTraces(trees []TraceWithMetadata[M]) error {
merger.objects[key].identifyLinks()
}

return nil
return affected, nil
}

type followLinkPool[M any] struct {
Expand All @@ -94,12 +93,15 @@ type followLinkPool[M any] struct {
merger *Merger[M]
}

func (fl *followLinkPool[M]) scheduleKnown(obj *object[M], limit *atomic.Int32, linkSelector tfconfig.LinkSelector) {
func (fl *followLinkPool[M]) scheduleFrom(obj *object[M], followLimit *atomic.Int32, linkSelector tfconfig.LinkSelector) {
admittedLinks := []TargetLink{}

for _, link := range obj.links {
if _, known := fl.knownKeys[link.Key]; known {
admittedLinks = append(admittedLinks, link)
continue
}
if limit.Add(-1) < 0 {
if followLimit.Add(-1) < 0 {
continue
}

Expand All @@ -110,21 +112,31 @@ func (fl *followLinkPool[M]) scheduleKnown(obj *object[M], limit *atomic.Int32,

subSelector := linkSelector.Admit(parentKey, childKey, parentIsSource, link.Class)
if subSelector != nil {
admittedLinks = append(admittedLinks, link)
fl.knownKeys.Insert(link.Key)
fl.schedule(link.Key, subSelector, int32(fl.endTime.Sub(fl.startTime)/(time.Minute*30)))
fl.schedule(link.Key, subSelector, followLimit, int32(fl.endTime.Sub(fl.startTime)/(time.Minute*30)))
}
}

obj.links = admittedLinks
}

func (fl *followLinkPool[M]) schedule(key objKey, linkSelector tfconfig.LinkSelector, limit int32) {
func (fl *followLinkPool[M]) schedule(key objKey, linkSelector tfconfig.LinkSelector, followLimit *atomic.Int32, traceLimit int32) {
fl.sem.Schedule(func(ctx context.Context) (semaphore.Publish, error) {
thumbnails, err := fl.lister(ctx, key, fl.startTime, fl.endTime, int(limit))
thumbnails, err := fl.lister(ctx, key, fl.startTime, fl.endTime, int(traceLimit))
if err != nil {
return nil, fmt.Errorf("fetching linked traces: %w", err)
}

return func() error {
fl.merger.AddTraces(thumbnails)
affected, err := fl.merger.AddTraces(thumbnails)
if err != nil {
return err
}

for key := range affected {
fl.scheduleFrom(fl.merger.objects[key], followLimit, linkSelector)
}

return nil
}, nil
Expand All @@ -133,30 +145,6 @@ func (fl *followLinkPool[M]) schedule(key objKey, linkSelector tfconfig.LinkSele

type ListFunc[M any] func(ctx context.Context, key objKey, startTime time.Time, endTime time.Time, limit int) ([]TraceWithMetadata[M], error)

Check failure on line 146 in pkg/frontend/reader/merge/merge.go

View workflow job for this annotation

GitHub Actions / golangci-lint

line is 141 characters (lll)

func ListWithBackend[M any](backend jaegerbackend.Backend, convertMetadata func(any) M) ListFunc[M] {
return func(ctx context.Context, key objKey, startTime time.Time, endTime time.Time, limit int) ([]TraceWithMetadata[M], error) {
tts, err := backend.List(ctx, &spanstore.TraceQueryParameters{
Tags: zconstants.KeyToSpanTags(key),
StartTimeMin: startTime,
StartTimeMax: endTime,
NumTraces: int(limit),
})
if err != nil {
return nil, err
}

twmList := make([]TraceWithMetadata[M], len(tts))
for i, tt := range tts {
twmList[i] = TraceWithMetadata[M]{
Tree: tt.Spans,
Metadata: convertMetadata(tt.Identifier),
}
}

return twmList, nil
}
}

func (merger *Merger[M]) FollowLinks(
ctx context.Context,
linkSelector tfconfig.LinkSelector,
Expand Down Expand Up @@ -191,7 +179,7 @@ func (merger *Merger[M]) FollowLinks(
remainingLimit.Store(limit)
}

fl.scheduleKnown(obj, remainingLimit, linkSelector)
fl.scheduleFrom(obj, remainingLimit, linkSelector)
}

if err := fl.sem.Run(ctx); err != nil {
Expand Down Expand Up @@ -558,6 +546,7 @@ func mergeLinkedTraces[M any](objects []*object[M], abLinks abLinkMap) (*tftree.
})

tree := trees[rootKey].tree
treeObjects := sets.New(rootKey)

pendingObjects := []objKey{rootKey}
for len(pendingObjects) > 0 {
Expand All @@ -571,31 +560,39 @@ func mergeLinkedTraces[M any](objects []*object[M], abLinks abLinkMap) (*tftree.

parentSpan := trees[subj].tree.Root
if link.Class != "" {
virtualSpan := createVirtualSpan(tree.Root.TraceID, parentSpan, link.Class)
virtualSpan := createVirtualSpan(tree.Root.TraceID, parentSpan, "", link.Class)
tree.Add(virtualSpan, parentSpan.SpanID)
parentSpan = virtualSpan
}

if treeObjects.Has(link.Key) {
parentSpan.Warnings = append(parentSpan.Warnings, fmt.Sprintf("repeated object %v omitted", link.Key))
// duplicate
continue
}

subtree, hasSubtree := trees[link.Key]
if !hasSubtree {
// this link was not fetched, e.g. because of fetch limit or link selector
continue
}

tree.AddTree(subtree.tree, parentSpan.SpanID)
treeObjects.Insert(link.Key)
pendingObjects = append(pendingObjects, link.Key)
}
}

return tree, nil
}

func createVirtualSpan(traceId model.TraceID, span *model.Span, class string) *model.Span {
func createVirtualSpan(traceId model.TraceID, span *model.Span, opName string, svcName string) *model.Span {
spanId := model.SpanID(rand.Uint64())

return &model.Span{
TraceID: traceId,
SpanID: spanId,
OperationName: class,
OperationName: opName,
Flags: 0,
StartTime: span.StartTime,
Duration: span.Duration,
Expand All @@ -607,7 +604,7 @@ func createVirtualSpan(traceId model.TraceID, span *model.Span, class string) *m
},
},
Process: &model.Process{
ServiceName: class,
ServiceName: svcName,
},
ProcessID: "1",
}
Expand Down
Loading

0 comments on commit 49c47cc

Please sign in to comment.