Skip to content

Commit

Permalink
add signals for duplicate rf1 data (#4296)
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Nov 7, 2024
1 parent b6d7289 commit 403fdcf
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 4 deletions.
1 change: 1 addition & 0 deletions cmd/tempo-cli/cmd-rewrite-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta
SpansDiscarded: func(_, _, _ string, _ int) {},
DisconnectedTrace: func() {},
RootlessTrace: func() {},
DedupedSpans: func(_, _ int) {},
}

compactor := enc.NewCompactor(opts)
Expand Down
12 changes: 12 additions & 0 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ var (
Name: "compaction_outstanding_blocks",
Help: "Number of blocks remaining to be compacted before next maintenance cycle",
}, []string{"tenant"})
metricDedupedSpans = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempodb",
Name: "compaction_spans_combined_total",
Help: "Number of spans that are deduped per replication factor.",
}, []string{"replication_factor"})
)

func (rw *readerWriter) compactionLoop(ctx context.Context) {
Expand Down Expand Up @@ -166,6 +171,10 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
metricCompactionErrors.Inc()
}

if !rw.compactorSharder.Owns(hashString) {
level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString)
}

// after a maintenance cycle bail out
if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
Expand Down Expand Up @@ -263,6 +272,9 @@ func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.Block
RootlessTrace: func() {
dataquality.WarnRootlessTrace(tenantID, dataquality.PhaseTraceCompactorCombine)
},
DedupedSpans: func(replFactor, dedupedSpans int) {
metricDedupedSpans.WithLabelValues(strconv.Itoa(replFactor)).Add(float64(dedupedSpans))
},
}

compactor := enc.NewCompactor(opts)
Expand Down
1 change: 1 addition & 0 deletions tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type CompactionOptions struct {
SpansDiscarded func(traceID string, rootSpanName string, rootServiceName string, spans int)
DisconnectedTrace func()
RootlessTrace func()
DedupedSpans func(replFactor, dedupedSpans int)
}

type Iterator interface {
Expand Down
6 changes: 4 additions & 2 deletions tempodb/encoding/vparquet4/combiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (c *Combiner) Consume(tr *Trace) (spanCount int) {
}

// ConsumeWithFinal consumes the trace, but allows for performance savings when
// it is known that this is the last expected input trace.
// it is known that this is the last expected input trace. the spanCount returned
// is the number of duplicate spans between the two traces
func (c *Combiner) ConsumeWithFinal(tr *Trace, final bool) (spanCount int) {
if tr == nil {
return
Expand Down Expand Up @@ -110,9 +111,10 @@ func (c *Combiner) ConsumeWithFinal(tr *Trace, final bool) (spanCount int) {

if len(notFoundSpans) > 0 {
ils.Spans = notFoundSpans
spanCount += len(notFoundSpans)
notFoundILS = append(notFoundILS, ils)
}

spanCount += len(ils.Spans) - len(notFoundSpans)
}

// if there were some spans not found in A, add everything left in the batch
Expand Down
147 changes: 147 additions & 0 deletions tempodb/encoding/vparquet4/combiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ func TestCombiner(t *testing.T) {
c.Consume(b)
return c.Result()
},
func(a, b *Trace) (*Trace, int, bool) {
c := NewCombiner()
c.Consume(a)
c.ConsumeWithFinal(b, true)
return c.Result()
},
}

tests := []struct {
Expand Down Expand Up @@ -369,6 +375,147 @@ func TestCombiner(t *testing.T) {
}
}

func TestCombinerReturnsDuplicates(t *testing.T) {
tests := []struct {
name string
traceA *Trace
traceB *Trace
expectedDupes int
}{
{
name: "nil traceA",
traceA: nil,
traceB: &Trace{},
expectedDupes: 0,
},
{
name: "nil traceB",
traceA: &Trace{},
traceB: nil,
expectedDupes: 0,
},
{
name: "empty traces",
traceA: &Trace{},
traceB: &Trace{},
expectedDupes: 0,
},
{
name: "no dupes",
traceA: &Trace{
TraceID: []byte{0x00, 0x01},
RootServiceName: "serviceNameA",
ResourceSpans: []ResourceSpans{
{
Resource: Resource{
ServiceName: "serviceNameA",
},
ScopeSpans: []ScopeSpans{
{
Spans: []Span{
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 0,
NestedSetLeft: 1,
NestedSetRight: 2,
},
},
},
},
},
},
},
traceB: &Trace{
TraceID: []byte{0x00, 0x01},
RootServiceName: "serviceNameB",
ResourceSpans: []ResourceSpans{
{
Resource: Resource{
ServiceName: "serviceNameB",
},
ScopeSpans: []ScopeSpans{
{
Spans: []Span{
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02},
ParentSpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 0,
},
},
},
},
},
},
},
expectedDupes: 0,
},
{
name: "one dupe",
traceA: &Trace{
TraceID: []byte{0x00, 0x01},
RootServiceName: "serviceNameA",
ResourceSpans: []ResourceSpans{
{
Resource: Resource{
ServiceName: "serviceNameA",
},
ScopeSpans: []ScopeSpans{
{
Spans: []Span{
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 0,
NestedSetLeft: 1,
NestedSetRight: 2,
},
},
},
},
},
},
},
traceB: &Trace{
TraceID: []byte{0x00, 0x01},
RootServiceName: "serviceNameB",
ResourceSpans: []ResourceSpans{
{
Resource: Resource{
ServiceName: "serviceNameB",
},
ScopeSpans: []ScopeSpans{
{
Spans: []Span{
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 0,
},
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03},
ParentSpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 2,
},
},
},
},
},
},
},
expectedDupes: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cmb := NewCombiner()

cmb.Consume(tt.traceA)
actualDupes := cmb.Consume(tt.traceB)

assert.Equal(t, tt.expectedDupes, actualDupes)
})
}
}

func BenchmarkCombine(b *testing.B) {
batchCount := 100
spanCounts := []int{
Expand Down
7 changes: 5 additions & 2 deletions tempodb/encoding/vparquet4/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
}

var (
replicationFactor = inputs[0].ReplicationFactor
nextCompactionLevel = compactionLevel + 1
sch = parquet.SchemaOf(new(Trace))
)
Expand Down Expand Up @@ -110,15 +111,17 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,

// Time to combine.
cmb := NewCombiner()
dedupedSpans := 0
for i, row := range rows {
tr := new(Trace)
err := sch.Reconstruct(tr, row)
if err != nil {
return nil, err
}
cmb.ConsumeWithFinal(tr, i == len(rows)-1)
dedupedSpans += cmb.ConsumeWithFinal(tr, i == len(rows)-1)
pool.Put(row)
}
c.opts.DedupedSpans(int(replicationFactor), dedupedSpans)
tr, _, connected := cmb.Result()
if !connected {
c.opts.DisconnectedTrace()
Expand Down Expand Up @@ -160,7 +163,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
TenantID: inputs[0].TenantID,
CompactionLevel: nextCompactionLevel,
TotalObjects: recordsPerBlock, // Just an estimate
ReplicationFactor: inputs[0].ReplicationFactor,
ReplicationFactor: replicationFactor,
DedicatedColumns: inputs[0].DedicatedColumns,
}

Expand Down

0 comments on commit 403fdcf

Please sign in to comment.