From f62245094c55d61f80847320f8123740bc5bdee9 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 7 Nov 2024 14:42:08 -0500 Subject: [PATCH] add signals for duplicate rf1 data Signed-off-by: Joe Elliott --- cmd/tempo-cli/cmd-rewrite-blocks.go | 1 + tempodb/compactor.go | 12 ++ tempodb/encoding/common/interfaces.go | 1 + tempodb/encoding/vparquet4/combiner.go | 6 +- tempodb/encoding/vparquet4/combiner_test.go | 147 ++++++++++++++++++++ tempodb/encoding/vparquet4/compactor.go | 7 +- 6 files changed, 170 insertions(+), 4 deletions(-) diff --git a/cmd/tempo-cli/cmd-rewrite-blocks.go b/cmd/tempo-cli/cmd-rewrite-blocks.go index ea5f22069be..7e20c802e3b 100644 --- a/cmd/tempo-cli/cmd-rewrite-blocks.go +++ b/cmd/tempo-cli/cmd-rewrite-blocks.go @@ -141,6 +141,7 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta SpansDiscarded: func(_, _, _ string, _ int) {}, DisconnectedTrace: func() {}, RootlessTrace: func() {}, + DedupedSpans: func(_, _ int) {}, } compactor := enc.NewCompactor(opts) diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 69093448cda..517844cf126 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -65,6 +65,11 @@ var ( Name: "compaction_outstanding_blocks", Help: "Number of blocks remaining to be compacted before next maintenance cycle", }, []string{"tenant"}) + metricDedupedSpans = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tempodb", + Name: "compaction_spans_combined_total", + Help: "Number of spans that are deduped per replication factor.", + }, []string{"replication_factor"}) ) func (rw *readerWriter) compactionLoop(ctx context.Context) { @@ -166,6 +171,10 @@ func (rw *readerWriter) doCompaction(ctx context.Context) { metricCompactionErrors.Inc() } + if !rw.compactorSharder.Owns(hashString) { + level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString) + } + // after a maintenance cycle bail out if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) { measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns) @@ -263,6 +272,9 @@ func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.Block RootlessTrace: func() { dataquality.WarnRootlessTrace(tenantID, dataquality.PhaseTraceCompactorCombine) }, + DedupedSpans: func(replFactor, dedupedSpans int) { + metricDedupedSpans.WithLabelValues(strconv.Itoa(replFactor)).Add(float64(dedupedSpans)) + }, } compactor := enc.NewCompactor(opts) diff --git a/tempodb/encoding/common/interfaces.go b/tempodb/encoding/common/interfaces.go index 5d9446e63d4..ef3cd6b9969 100644 --- a/tempodb/encoding/common/interfaces.go +++ b/tempodb/encoding/common/interfaces.go @@ -88,6 +88,7 @@ type CompactionOptions struct { SpansDiscarded func(traceID string, rootSpanName string, rootServiceName string, spans int) DisconnectedTrace func() RootlessTrace func() + DedupedSpans func(replFactor, dedupedSpans int) } type Iterator interface { diff --git a/tempodb/encoding/vparquet4/combiner.go b/tempodb/encoding/vparquet4/combiner.go index 838c32cc463..12aaa17d754 100644 --- a/tempodb/encoding/vparquet4/combiner.go +++ b/tempodb/encoding/vparquet4/combiner.go @@ -42,7 +42,8 @@ func (c *Combiner) Consume(tr *Trace) (spanCount int) { } // ConsumeWithFinal consumes the trace, but allows for performance savings when -// it is known that this is the last expected input trace. +// it is known that this is the last expected input trace. the spanCount returned +// is the number of duplicate spans between the two traces func (c *Combiner) ConsumeWithFinal(tr *Trace, final bool) (spanCount int) { if tr == nil { return @@ -110,9 +111,10 @@ func (c *Combiner) ConsumeWithFinal(tr *Trace, final bool) (spanCount int) { if len(notFoundSpans) > 0 { ils.Spans = notFoundSpans - spanCount += len(notFoundSpans) notFoundILS = append(notFoundILS, ils) } + + spanCount += len(ils.Spans) - len(notFoundSpans) } // if there were some spans not found in A, add everything left in the batch diff --git a/tempodb/encoding/vparquet4/combiner_test.go b/tempodb/encoding/vparquet4/combiner_test.go index d77823fd8cb..d16ef473a51 100644 --- a/tempodb/encoding/vparquet4/combiner_test.go +++ b/tempodb/encoding/vparquet4/combiner_test.go @@ -18,6 +18,12 @@ func TestCombiner(t *testing.T) { c.Consume(b) return c.Result() }, + func(a, b *Trace) (*Trace, int, bool) { + c := NewCombiner() + c.Consume(a) + c.ConsumeWithFinal(b, true) + return c.Result() + }, } tests := []struct { @@ -369,6 +375,147 @@ func TestCombiner(t *testing.T) { } } +func TestCombinerReturnsDuplicates(t *testing.T) { + tests := []struct { + name string + traceA *Trace + traceB *Trace + expectedDupes int + }{ + { + name: "nil traceA", + traceA: nil, + traceB: &Trace{}, + expectedDupes: 0, + }, + { + name: "nil traceB", + traceA: &Trace{}, + traceB: nil, + expectedDupes: 0, + }, + { + name: "empty traces", + traceA: &Trace{}, + traceB: &Trace{}, + expectedDupes: 0, + }, + { + name: "no dupes", + traceA: &Trace{ + TraceID: []byte{0x00, 0x01}, + RootServiceName: "serviceNameA", + ResourceSpans: []ResourceSpans{ + { + Resource: Resource{ + ServiceName: "serviceNameA", + }, + ScopeSpans: []ScopeSpans{ + { + Spans: []Span{ + { + SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + StatusCode: 0, + NestedSetLeft: 1, + NestedSetRight: 2, + }, + }, + }, + }, + }, + }, + }, + traceB: &Trace{ + TraceID: []byte{0x00, 0x01}, + RootServiceName: "serviceNameB", + ResourceSpans: []ResourceSpans{ + { + Resource: Resource{ + ServiceName: "serviceNameB", + }, + ScopeSpans: []ScopeSpans{ + { + Spans: []Span{ + { + SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02}, + ParentSpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + StatusCode: 0, + }, + }, + }, + }, + }, + }, + }, + expectedDupes: 0, + }, + { + name: "one dupe", + traceA: &Trace{ + TraceID: []byte{0x00, 0x01}, + RootServiceName: "serviceNameA", + ResourceSpans: []ResourceSpans{ + { + Resource: Resource{ + ServiceName: "serviceNameA", + }, + ScopeSpans: []ScopeSpans{ + { + Spans: []Span{ + { + SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + StatusCode: 0, + NestedSetLeft: 1, + NestedSetRight: 2, + }, + }, + }, + }, + }, + }, + }, + traceB: &Trace{ + TraceID: []byte{0x00, 0x01}, + RootServiceName: "serviceNameB", + ResourceSpans: []ResourceSpans{ + { + Resource: Resource{ + ServiceName: "serviceNameB", + }, + ScopeSpans: []ScopeSpans{ + { + Spans: []Span{ + { + SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + StatusCode: 0, + }, + { + SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03}, + ParentSpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + StatusCode: 2, + }, + }, + }, + }, + }, + }, + }, + expectedDupes: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cmb := NewCombiner() + + cmb.Consume(tt.traceA) + actualDupes := cmb.Consume(tt.traceB) + + assert.Equal(t, tt.expectedDupes, actualDupes) + }) + } +} + func BenchmarkCombine(b *testing.B) { batchCount := 100 spanCounts := []int{ diff --git a/tempodb/encoding/vparquet4/compactor.go b/tempodb/encoding/vparquet4/compactor.go index cb3532b5540..3b7d258eb19 100644 --- a/tempodb/encoding/vparquet4/compactor.go +++ b/tempodb/encoding/vparquet4/compactor.go @@ -67,6 +67,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader, } var ( + replicationFactor = inputs[0].ReplicationFactor nextCompactionLevel = compactionLevel + 1 sch = parquet.SchemaOf(new(Trace)) ) @@ -110,15 +111,17 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader, // Time to combine. cmb := NewCombiner() + dedupedSpans := 0 for i, row := range rows { tr := new(Trace) err := sch.Reconstruct(tr, row) if err != nil { return nil, err } - cmb.ConsumeWithFinal(tr, i == len(rows)-1) + dedupedSpans += cmb.ConsumeWithFinal(tr, i == len(rows)-1) pool.Put(row) } + c.opts.DedupedSpans(int(replicationFactor), dedupedSpans) tr, _, connected := cmb.Result() if !connected { c.opts.DisconnectedTrace() @@ -160,7 +163,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader, TenantID: inputs[0].TenantID, CompactionLevel: nextCompactionLevel, TotalObjects: recordsPerBlock, // Just an estimate - ReplicationFactor: inputs[0].ReplicationFactor, + ReplicationFactor: replicationFactor, DedicatedColumns: inputs[0].DedicatedColumns, }