Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compaction] Add signals for duplicate rf1 data #4296

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/tempo-cli/cmd-rewrite-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func rewriteBlock(ctx context.Context, r backend.Reader, w backend.Writer, meta
SpansDiscarded: func(_, _, _ string, _ int) {},
DisconnectedTrace: func() {},
RootlessTrace: func() {},
DedupedSpans: func(_, _ int) {},
}

compactor := enc.NewCompactor(opts)
Expand Down
12 changes: 12 additions & 0 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ var (
Name: "compaction_outstanding_blocks",
Help: "Number of blocks remaining to be compacted before next maintenance cycle",
}, []string{"tenant"})
metricDedupedSpans = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempodb",
Name: "compaction_spans_combined_total",
Help: "Number of spans that are deduped per replication factor.",
}, []string{"replication_factor"})
)

func (rw *readerWriter) compactionLoop(ctx context.Context) {
Expand Down Expand Up @@ -166,6 +171,10 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
metricCompactionErrors.Inc()
}

if !rw.compactorSharder.Owns(hashString) {
level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString)
}

// after a maintenance cycle bail out
if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) {
measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
Expand Down Expand Up @@ -263,6 +272,9 @@ func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.Block
RootlessTrace: func() {
dataquality.WarnRootlessTrace(tenantID, dataquality.PhaseTraceCompactorCombine)
},
DedupedSpans: func(replFactor, dedupedSpans int) {
metricDedupedSpans.WithLabelValues(strconv.Itoa(replFactor)).Add(float64(dedupedSpans))
},
}

compactor := enc.NewCompactor(opts)
Expand Down
1 change: 1 addition & 0 deletions tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type CompactionOptions struct {
SpansDiscarded func(traceID string, rootSpanName string, rootServiceName string, spans int)
DisconnectedTrace func()
RootlessTrace func()
DedupedSpans func(replFactor, dedupedSpans int)
}

type Iterator interface {
Expand Down
6 changes: 4 additions & 2 deletions tempodb/encoding/vparquet4/combiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (c *Combiner) Consume(tr *Trace) (spanCount int) {
}

// ConsumeWithFinal consumes the trace, but allows for performance savings when
// it is known that this is the last expected input trace.
// it is known that this is the last expected input trace. the spanCount returned
// is the number of duplicate spans between the two traces
func (c *Combiner) ConsumeWithFinal(tr *Trace, final bool) (spanCount int) {
if tr == nil {
return
Expand Down Expand Up @@ -110,9 +111,10 @@ func (c *Combiner) ConsumeWithFinal(tr *Trace, final bool) (spanCount int) {

if len(notFoundSpans) > 0 {
ils.Spans = notFoundSpans
spanCount += len(notFoundSpans)
notFoundILS = append(notFoundILS, ils)
}

spanCount += len(ils.Spans) - len(notFoundSpans)
}

// if there were some spans not found in A, add everything left in the batch
Expand Down
147 changes: 147 additions & 0 deletions tempodb/encoding/vparquet4/combiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ func TestCombiner(t *testing.T) {
c.Consume(b)
return c.Result()
},
func(a, b *Trace) (*Trace, int, bool) {
c := NewCombiner()
c.Consume(a)
c.ConsumeWithFinal(b, true)
return c.Result()
},
}

tests := []struct {
Expand Down Expand Up @@ -369,6 +375,147 @@ func TestCombiner(t *testing.T) {
}
}

func TestCombinerReturnsDuplicates(t *testing.T) {
tests := []struct {
name string
traceA *Trace
traceB *Trace
expectedDupes int
}{
{
name: "nil traceA",
traceA: nil,
traceB: &Trace{},
expectedDupes: 0,
},
{
name: "nil traceB",
traceA: &Trace{},
traceB: nil,
expectedDupes: 0,
},
{
name: "empty traces",
traceA: &Trace{},
traceB: &Trace{},
expectedDupes: 0,
},
{
name: "no dupes",
traceA: &Trace{
TraceID: []byte{0x00, 0x01},
RootServiceName: "serviceNameA",
ResourceSpans: []ResourceSpans{
{
Resource: Resource{
ServiceName: "serviceNameA",
},
ScopeSpans: []ScopeSpans{
{
Spans: []Span{
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 0,
NestedSetLeft: 1,
NestedSetRight: 2,
},
},
},
},
},
},
},
traceB: &Trace{
TraceID: []byte{0x00, 0x01},
RootServiceName: "serviceNameB",
ResourceSpans: []ResourceSpans{
{
Resource: Resource{
ServiceName: "serviceNameB",
},
ScopeSpans: []ScopeSpans{
{
Spans: []Span{
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02},
ParentSpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 0,
},
},
},
},
},
},
},
expectedDupes: 0,
},
{
name: "one dupe",
traceA: &Trace{
TraceID: []byte{0x00, 0x01},
RootServiceName: "serviceNameA",
ResourceSpans: []ResourceSpans{
{
Resource: Resource{
ServiceName: "serviceNameA",
},
ScopeSpans: []ScopeSpans{
{
Spans: []Span{
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 0,
NestedSetLeft: 1,
NestedSetRight: 2,
},
},
},
},
},
},
},
traceB: &Trace{
TraceID: []byte{0x00, 0x01},
RootServiceName: "serviceNameB",
ResourceSpans: []ResourceSpans{
{
Resource: Resource{
ServiceName: "serviceNameB",
},
ScopeSpans: []ScopeSpans{
{
Spans: []Span{
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 0,
},
{
SpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03},
ParentSpanID: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
StatusCode: 2,
},
},
},
},
},
},
},
expectedDupes: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cmb := NewCombiner()

cmb.Consume(tt.traceA)
actualDupes := cmb.Consume(tt.traceB)

assert.Equal(t, tt.expectedDupes, actualDupes)
})
}
}

func BenchmarkCombine(b *testing.B) {
batchCount := 100
spanCounts := []int{
Expand Down
7 changes: 5 additions & 2 deletions tempodb/encoding/vparquet4/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
}

var (
replicationFactor = inputs[0].ReplicationFactor
nextCompactionLevel = compactionLevel + 1
sch = parquet.SchemaOf(new(Trace))
)
Expand Down Expand Up @@ -110,15 +111,17 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,

// Time to combine.
cmb := NewCombiner()
dedupedSpans := 0
for i, row := range rows {
tr := new(Trace)
err := sch.Reconstruct(tr, row)
if err != nil {
return nil, err
}
cmb.ConsumeWithFinal(tr, i == len(rows)-1)
dedupedSpans += cmb.ConsumeWithFinal(tr, i == len(rows)-1)
pool.Put(row)
}
c.opts.DedupedSpans(int(replicationFactor), dedupedSpans)
tr, _, connected := cmb.Result()
if !connected {
c.opts.DisconnectedTrace()
Expand Down Expand Up @@ -160,7 +163,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
TenantID: inputs[0].TenantID,
CompactionLevel: nextCompactionLevel,
TotalObjects: recordsPerBlock, // Just an estimate
ReplicationFactor: inputs[0].ReplicationFactor,
ReplicationFactor: replicationFactor,
DedicatedColumns: inputs[0].DedicatedColumns,
}

Expand Down