From f00ed6a63e4af65d8df8be72f0f6330bd92a85e3 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Tue, 17 Dec 2024 11:47:53 -0500 Subject: [PATCH] Blocklist update fix (#4446) * Fix issue where block remains if it was added and compacted within the same cycle * cleanup and tests * changelog --- CHANGELOG.md | 1 + tempodb/blocklist/list.go | 102 +++++---- tempodb/blocklist/list_test.go | 399 ++++++++++++--------------------- tempodb/retention_test.go | 6 +- tempodb/tempodb_test.go | 10 +- 5 files changed, 209 insertions(+), 309 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dc66e5c94f..57039257bd1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -104,6 +104,7 @@ querier: * [BUGFIX] Initialize histogram buckets to 0 to avoid downsampling. [#4366](https://github.com/grafana/tempo/pull/4366) (@javiermolinar) * [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#4259](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov) * [BUGFIX] Fixed an issue in the generator where the first batch was counted 2x against a traces size. [#4365](https://github.com/grafana/tempo/pull/4365) (@joe-elliott) +* [BUGFIX] Fix compaction bug in SingleBinaryMode that could lead to 2x, 3x, etc TraceQL metrics results [#4446](https://github.com/grafana/tempo/pull/4446) (@mdisibio) * [BUGFIX] Unstable compactors can occassionally duplicate data. Check for job ownership during compaction and cancel a job if ownership changes. [#4420](https://github.com/grafana/tempo/pull/4420) (@joe-elliott) # v2.6.1 diff --git a/tempodb/blocklist/list.go b/tempodb/blocklist/list.go index 9514ba5bfa0..55d9c2d8087 100644 --- a/tempodb/blocklist/list.go +++ b/tempodb/blocklist/list.go @@ -1,9 +1,9 @@ package blocklist import ( + "slices" "sync" - "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" ) @@ -125,64 +125,70 @@ func (l *List) Update(tenantID string, add []*backend.BlockMeta, remove []*backe // updateInternal exists to do the work of applying updates to held PerTenant and PerTenantCompacted maps // it must be called under lock func (l *List) updateInternal(tenantID string, add []*backend.BlockMeta, remove []*backend.BlockMeta, compactedAdd []*backend.CompactedBlockMeta, compactedRemove []*backend.CompactedBlockMeta) { - // ******** Regular blocks ******** - blocklist := l.metas[tenantID] - - matchedRemovals := make(map[uuid.UUID]struct{}) - for _, b := range blocklist { - for _, rem := range remove { - if b.BlockID == rem.BlockID { - matchedRemovals[(uuid.UUID)(rem.BlockID)] = struct{}{} - break - } + hasID := func(id backend.UUID) func(*backend.BlockMeta) bool { + return func(b *backend.BlockMeta) bool { + return b.BlockID == id } } - existingMetas := make(map[uuid.UUID]struct{}) - newblocklist := make([]*backend.BlockMeta, 0, len(blocklist)-len(matchedRemovals)+len(add)) - // rebuild the blocklist dropping all removals - for _, b := range blocklist { - existingMetas[(uuid.UUID)(b.BlockID)] = struct{}{} - if _, ok := matchedRemovals[(uuid.UUID)(b.BlockID)]; !ok { - newblocklist = append(newblocklist, b) + hasIDC := func(id backend.UUID) func(*backend.CompactedBlockMeta) bool { + return func(b *backend.CompactedBlockMeta) bool { + return b.BlockID == id } } - // add new blocks (only if they don't already exist) - for _, b := range add { - if _, ok := existingMetas[(uuid.UUID)(b.BlockID)]; !ok { - newblocklist = append(newblocklist, b) - } - } - - l.metas[tenantID] = newblocklist - // ******** Compacted blocks ******** - compactedBlocklist := l.compactedMetas[tenantID] - - compactedRemovals := map[uuid.UUID]struct{}{} - for _, c := range compactedBlocklist { - for _, rem := range compactedRemove { - if c.BlockID == rem.BlockID { - compactedRemovals[(uuid.UUID)(rem.BlockID)] = struct{}{} - break + // ******** Regular blocks ******** + if len(add) > 0 || len(remove) > 0 || len(compactedAdd) > 0 || len(compactedRemove) > 0 { + var ( + existing = l.metas[tenantID] + final = make([]*backend.BlockMeta, 0, max(0, len(existing)+len(add)-len(remove))) + ) + + // rebuild dropping all removals + for _, b := range existing { + if slices.ContainsFunc(remove, hasID(b.BlockID)) { + continue } + final = append(final, b) } - } + // add new if they don't already exist and weren't also removed + for _, b := range add { + if slices.ContainsFunc(final, hasID(b.BlockID)) || + slices.ContainsFunc(remove, hasID(b.BlockID)) || + slices.ContainsFunc(compactedAdd, hasIDC(b.BlockID)) || + slices.ContainsFunc(compactedRemove, hasIDC(b.BlockID)) { + continue + } - existingMetas = make(map[uuid.UUID]struct{}) - newCompactedBlocklist := make([]*backend.CompactedBlockMeta, 0, len(compactedBlocklist)-len(compactedRemovals)+len(compactedAdd)) - // rebuild the blocklist dropping all removals - for _, b := range compactedBlocklist { - existingMetas[(uuid.UUID)(b.BlockID)] = struct{}{} - if _, ok := compactedRemovals[(uuid.UUID)(b.BlockID)]; !ok { - newCompactedBlocklist = append(newCompactedBlocklist, b) + final = append(final, b) } + + l.metas[tenantID] = final } - for _, b := range compactedAdd { - if _, ok := existingMetas[(uuid.UUID)(b.BlockID)]; !ok { - newCompactedBlocklist = append(newCompactedBlocklist, b) + + // ******** Compacted blocks ******** + if len(compactedAdd) > 0 || len(compactedRemove) > 0 { + var ( + existing = l.compactedMetas[tenantID] + final = make([]*backend.CompactedBlockMeta, 0, max(0, len(existing)+len(compactedAdd)-len(compactedRemove))) + ) + + // rebuild dropping all removals + for _, b := range existing { + if slices.ContainsFunc(compactedRemove, hasIDC(b.BlockID)) { + continue + } + final = append(final, b) + } + // add new if they don't already exist and weren't also removed + for _, b := range compactedAdd { + if slices.ContainsFunc(existing, hasIDC(b.BlockID)) || + slices.ContainsFunc(compactedRemove, hasIDC(b.BlockID)) { + continue + } + final = append(final, b) } - } - l.compactedMetas[tenantID] = newCompactedBlocklist + l.compactedMetas[tenantID] = final + } } diff --git a/tempodb/blocklist/list_test.go b/tempodb/blocklist/list_test.go index 0bfa3250ff9..7d870368f28 100644 --- a/tempodb/blocklist/list_test.go +++ b/tempodb/blocklist/list_test.go @@ -125,11 +125,21 @@ func TestApplyPollResults(t *testing.T) { } func TestUpdate(t *testing.T) { + var ( + _1 = meta("00000000-0000-0000-0000-000000000001") + _2 = meta("00000000-0000-0000-0000-000000000002") + _3 = meta("00000000-0000-0000-0000-000000000003") + _2c = compactedMeta("00000000-0000-0000-0000-000000000002") + _3c = compactedMeta("00000000-0000-0000-0000-000000000003") + ) + tests := []struct { name string existing []*backend.BlockMeta add []*backend.BlockMeta remove []*backend.BlockMeta + addC []*backend.CompactedBlockMeta + removeC []*backend.CompactedBlockMeta expected []*backend.BlockMeta }{ { @@ -142,160 +152,73 @@ func TestUpdate(t *testing.T) { { name: "add to nil", existing: nil, - add: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - remove: nil, - expected: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, + add: []*backend.BlockMeta{_1}, + remove: nil, + expected: []*backend.BlockMeta{_1}, }, { - name: "add to existing", - existing: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - add: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - remove: nil, - expected: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, + name: "add to existing", + existing: []*backend.BlockMeta{_1}, + add: []*backend.BlockMeta{_2}, + remove: nil, + expected: []*backend.BlockMeta{_1, _2}, }, { name: "remove from nil", existing: nil, add: nil, - remove: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, + remove: []*backend.BlockMeta{_2}, expected: nil, }, { - name: "remove nil", - existing: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - add: nil, - remove: nil, - expected: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, + name: "remove nil", + existing: []*backend.BlockMeta{_2}, + add: nil, + remove: nil, + expected: []*backend.BlockMeta{_2}, }, { - name: "remove existing", - existing: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - add: nil, - remove: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - expected: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, + name: "remove existing", + existing: []*backend.BlockMeta{_1, _2}, + add: nil, + remove: []*backend.BlockMeta{_1}, + expected: []*backend.BlockMeta{_2}, }, { - name: "remove no match", - existing: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - add: nil, - remove: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - expected: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, + name: "remove no match", + existing: []*backend.BlockMeta{_1}, + add: nil, + remove: []*backend.BlockMeta{_2}, + expected: []*backend.BlockMeta{_1}, }, { - name: "add and remove", - existing: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - add: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000003"), - }, - }, - remove: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - expected: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000003"), - }, - }, + name: "add and remove", + existing: []*backend.BlockMeta{_1, _2}, + add: []*backend.BlockMeta{_3}, + remove: []*backend.BlockMeta{_2}, + expected: []*backend.BlockMeta{_1, _3}, }, { - name: "add already exists", - existing: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - add: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - remove: nil, - expected: []*backend.BlockMeta{ - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - { - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, + name: "add already exists", + existing: []*backend.BlockMeta{_1}, + add: []*backend.BlockMeta{_1, _2}, + remove: nil, + expected: []*backend.BlockMeta{_1, _2}, + }, + { + name: "not added if also removed", + existing: []*backend.BlockMeta{_1}, + add: []*backend.BlockMeta{_2}, + remove: []*backend.BlockMeta{_2}, + expected: []*backend.BlockMeta{_1}, + }, + { + name: "not added if also compacted", + existing: []*backend.BlockMeta{_1}, + add: []*backend.BlockMeta{_2, _3}, + addC: []*backend.CompactedBlockMeta{_2c}, + removeC: []*backend.CompactedBlockMeta{_3c}, + expected: []*backend.BlockMeta{_1}, }, } @@ -304,18 +227,21 @@ func TestUpdate(t *testing.T) { l := New() l.metas[testTenantID] = tt.existing - l.Update(testTenantID, tt.add, tt.remove, nil, nil) - - assert.Equal(t, len(tt.expected), len(l.metas[testTenantID])) + l.Update(testTenantID, tt.add, tt.remove, tt.addC, tt.removeC) - for i := range tt.expected { - assert.Equal(t, tt.expected[i].BlockID, l.metas[testTenantID][i].BlockID) - } + require.Equal(t, len(tt.expected), len(l.metas[testTenantID])) + require.ElementsMatch(t, tt.expected, l.metas[testTenantID]) }) } } func TestUpdateCompacted(t *testing.T) { + var ( + _1 = compactedMeta("00000000-0000-0000-0000-000000000001") + _2 = compactedMeta("00000000-0000-0000-0000-000000000002") + _3 = compactedMeta("00000000-0000-0000-0000-000000000003") + ) + tests := []struct { name string existing []*backend.CompactedBlockMeta @@ -332,124 +258,34 @@ func TestUpdateCompacted(t *testing.T) { { name: "add to nil", existing: nil, - add: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - }, - expected: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - }, + add: []*backend.CompactedBlockMeta{_1}, + expected: []*backend.CompactedBlockMeta{_1}, }, { - name: "add to existing", - existing: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - }, - add: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - }, - expected: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - }, + name: "add to existing", + existing: []*backend.CompactedBlockMeta{_1}, + add: []*backend.CompactedBlockMeta{_2}, + expected: []*backend.CompactedBlockMeta{_1, _2}, }, { - name: "add already exists", - existing: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - }, - add: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - }, - expected: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - }, + name: "add already exists", + existing: []*backend.CompactedBlockMeta{_1}, + add: []*backend.CompactedBlockMeta{_1, _2}, + expected: []*backend.CompactedBlockMeta{_1, _2}, }, { - name: "add and remove", - existing: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - }, - add: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000003"), - }, - }, - }, - remove: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000002"), - }, - }, - }, - expected: []*backend.CompactedBlockMeta{ - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000001"), - }, - }, - { - BlockMeta: backend.BlockMeta{ - BlockID: backend.MustParse("00000000-0000-0000-0000-000000000003"), - }, - }, - }, + name: "add and remove", + existing: []*backend.CompactedBlockMeta{_1, _2}, + add: []*backend.CompactedBlockMeta{_3}, + remove: []*backend.CompactedBlockMeta{_2}, + expected: []*backend.CompactedBlockMeta{_1, _3}, + }, + { + name: "not added if also removed", + existing: []*backend.CompactedBlockMeta{_1}, + add: []*backend.CompactedBlockMeta{_2}, + remove: []*backend.CompactedBlockMeta{_2}, + expected: []*backend.CompactedBlockMeta{_1}, }, } @@ -656,3 +492,58 @@ func TestUpdatesSaved(t *testing.T) { assert.Equal(t, tc.expectedCompacted, actualCompacted) } } + +func BenchmarkUpdate(b *testing.B) { + var ( + l = New() + numBlocks = 100000 // Realistic number + existing = make([]*backend.BlockMeta, 0, numBlocks) + add = []*backend.BlockMeta{ + meta("00000000-0000-0000-0000-000000000001"), + meta("00000000-0000-0000-0000-000000000002"), + } + remove = []*backend.BlockMeta{ + meta("00000000-0000-0000-0000-000000000003"), + meta("00000000-0000-0000-0000-000000000004"), + } + numCompacted = 1000 // Realistic number + compacted = make([]*backend.CompactedBlockMeta, 0, numCompacted) + compactedAdd = []*backend.CompactedBlockMeta{ + compactedMeta("00000000-0000-0000-0000-000000000005"), + compactedMeta("00000000-0000-0000-0000-000000000006"), + } + compactedRemove = []*backend.CompactedBlockMeta{ + compactedMeta("00000000-0000-0000-0000-000000000007"), + compactedMeta("00000000-0000-0000-0000-000000000008"), + } + ) + + for i := 0; i < numBlocks; i++ { + existing = append(existing, meta(uuid.NewString())) + } + for i := 0; i < numCompacted; i++ { + compacted = append(compacted, compactedMeta(uuid.NewString())) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + l.metas[testTenantID] = existing + l.compactedMetas[testTenantID] = compacted + l.Update(testTenantID, add, remove, compactedAdd, compactedRemove) + } +} + +func meta(id string) *backend.BlockMeta { + return &backend.BlockMeta{ + BlockID: backend.MustParse(id), + } +} + +func compactedMeta(id string) *backend.CompactedBlockMeta { + return &backend.CompactedBlockMeta{ + BlockMeta: backend.BlockMeta{ + BlockID: backend.MustParse(id), + }, + } +} diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index af5862276d1..b0c7b4d8134 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -46,8 +46,8 @@ func TestRetention(t *testing.T) { err = c.EnableCompaction(ctx, &CompactorConfig{ ChunkSizeBytes: 10, MaxCompactionRange: time.Hour, - BlockRetention: 0, - CompactedBlockRetention: 0, + BlockRetention: time.Hour, + CompactedBlockRetention: time.Hour, }, &mockSharder{}, &mockOverrides{}) require.NoError(t, err) @@ -71,10 +71,12 @@ func TestRetention(t *testing.T) { checkBlocklists(t, (uuid.UUID)(blockID), 1, 0, rw) // retention should mark it compacted + rw.compactorCfg.BlockRetention = 0 r.(*readerWriter).doRetention(ctx) checkBlocklists(t, (uuid.UUID)(blockID), 0, 1, rw) // retention again should clear it + rw.compactorCfg.CompactedBlockRetention = 0 r.(*readerWriter).doRetention(ctx) checkBlocklists(t, (uuid.UUID)(blockID), 0, 0, rw) } diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 0d993a675aa..7e17616c7fb 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -237,25 +237,25 @@ func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expected blocklist := rw.blocklist.Metas(testTenantID) require.Len(t, blocklist, expectedB) if expectedB > 0 && expectedID != uuid.Nil { - assert.Equal(t, expectedID, (uuid.UUID)(blocklist[0].BlockID)) + require.Equal(t, expectedID, (uuid.UUID)(blocklist[0].BlockID)) } // confirm blocklists are in starttime ascending order lastTime := time.Time{} for _, b := range blocklist { - assert.True(t, lastTime.Before(b.StartTime) || lastTime.Equal(b.StartTime)) + require.True(t, lastTime.Before(b.StartTime) || lastTime.Equal(b.StartTime)) lastTime = b.StartTime } compactedBlocklist := rw.blocklist.CompactedMetas(testTenantID) - assert.Len(t, compactedBlocklist, expectedCB) + require.Len(t, compactedBlocklist, expectedCB) if expectedCB > 0 && expectedID != uuid.Nil { - assert.Equal(t, expectedID, (uuid.UUID)(compactedBlocklist[0].BlockID)) + require.Equal(t, expectedID, (uuid.UUID)(compactedBlocklist[0].BlockID)) } lastTime = time.Time{} for _, b := range compactedBlocklist { - assert.True(t, lastTime.Before(b.StartTime) || lastTime.Equal(b.StartTime)) + require.True(t, lastTime.Before(b.StartTime) || lastTime.Equal(b.StartTime)) lastTime = b.StartTime } }