diff --git a/integration/poller/poller_test.go b/integration/poller/poller_test.go index b8b83bbbbe2..e9c5bf934d9 100644 --- a/integration/poller/poller_test.go +++ b/integration/poller/poller_test.go @@ -178,12 +178,12 @@ func TestPollerOwnership(t *testing.T) { } l := blocklist.New() - mm, cm, err := blocklistPoller.Do(l) + mm, cm, durs, err := blocklistPoller.Do(l) require.NoError(t, err) // t.Logf("mm: %v", mm) // t.Logf("cm: %v", cm) - l.ApplyPollResults(mm, cm) + l.ApplyPollResults(mm, cm, durs) for testTenant, expected := range tenantExpected { metas := l.Metas(testTenant) @@ -321,7 +321,7 @@ func TestTenantDeletion(t *testing.T) { }, OwnsEverythingSharder, r, cc, w, logger) l := blocklist.New() - mm, cm, err := blocklistPoller.Do(l) + mm, cm, _, err := blocklistPoller.Do(l) require.NoError(t, err) t.Logf("mm: %v", mm) t.Logf("cm: %v", cm) @@ -339,7 +339,7 @@ func TestTenantDeletion(t *testing.T) { time.Sleep(500 * time.Millisecond) - _, _, err = blocklistPoller.Do(l) + _, _, _, err = blocklistPoller.Do(l) require.NoError(t, err) tennants, err = r.Tenants(ctx) @@ -357,7 +357,7 @@ func TestTenantDeletion(t *testing.T) { }, OwnsEverythingSharder, r, cc, w, logger) // Again - _, _, err = blocklistPoller.Do(l) + _, _, _, err = blocklistPoller.Do(l) require.NoError(t, err) tennants, err = r.Tenants(ctx) diff --git a/tempodb/blocklist/list.go b/tempodb/blocklist/list.go index 9514ba5bfa0..d54b54112c2 100644 --- a/tempodb/blocklist/list.go +++ b/tempodb/blocklist/list.go @@ -2,6 +2,7 @@ package blocklist import ( "sync" + "time" "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" @@ -13,11 +14,15 @@ type PerTenant map[string][]*backend.BlockMeta // PerTenantCompacted is a map of tenant ids to backend.CompactedBlockMetas type PerTenantCompacted map[string][]*backend.CompactedBlockMeta +// PerTenantPollDuration is a map of tenant ids to duration of the last poll. +type PerTenantPollDuration map[string]time.Duration + // List controls access to a per tenant blocklist and compacted blocklist type List struct { mtx sync.Mutex metas PerTenant compactedMetas PerTenantCompacted + pollDurations PerTenantPollDuration // used by the compactor to track local changes it is aware of added PerTenant @@ -30,6 +35,7 @@ func New() *List { return &List{ metas: make(PerTenant), compactedMetas: make(PerTenantCompacted), + pollDurations: make(PerTenantPollDuration), added: make(PerTenant), removed: make(PerTenant), @@ -81,7 +87,7 @@ func (l *List) CompactedMetas(tenantID string) []*backend.CompactedBlockMeta { // ApplyPollResults applies the PerTenant and PerTenantCompacted maps to this blocklist // Note that it also applies any known local changes and then wipes them out to be restored // in the next polling cycle. -func (l *List) ApplyPollResults(m PerTenant, c PerTenantCompacted) { +func (l *List) ApplyPollResults(m PerTenant, c PerTenantCompacted, pd PerTenantPollDuration) { l.mtx.Lock() defer l.mtx.Unlock() @@ -91,6 +97,11 @@ func (l *List) ApplyPollResults(m PerTenant, c PerTenantCompacted) { // now reapply all updates and clear for tenantID := range l.added { l.updateInternal(tenantID, l.added[tenantID], l.removed[tenantID], l.compactedAdded[tenantID], l.compactedRemoved[tenantID]) + + clear(l.pollDurations) + if v, ok := pd[tenantID]; ok { + l.pollDurations[tenantID] = v + } } clear(l.added) diff --git a/tempodb/blocklist/list_test.go b/tempodb/blocklist/list_test.go index 0bfa3250ff9..0d32c15abd1 100644 --- a/tempodb/blocklist/list_test.go +++ b/tempodb/blocklist/list_test.go @@ -107,7 +107,8 @@ func TestApplyPollResults(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { l := New() - l.ApplyPollResults(tc.metas, tc.compacted) + // TODO: test the durations + l.ApplyPollResults(tc.metas, tc.compacted, nil) actualTenants := l.Tenants() sort.Slice(actualTenants, func(i, j int) bool { return actualTenants[i] < actualTenants[j] }) @@ -639,7 +640,7 @@ func TestUpdatesSaved(t *testing.T) { for i, tc := range tests { t.Logf("step %d", i+1) - l.ApplyPollResults(tc.applyMetas, tc.applyCompacted) + l.ApplyPollResults(tc.applyMetas, tc.applyCompacted, nil) if tc.updateTenant != "" { l.Update(tc.updateTenant, tc.addMetas, tc.removeMetas, tc.addCompacted, nil) } diff --git a/tempodb/blocklist/poller.go b/tempodb/blocklist/poller.go index e8571497510..fb8c98c8651 100644 --- a/tempodb/blocklist/poller.go +++ b/tempodb/blocklist/poller.go @@ -137,7 +137,7 @@ func NewPoller(cfg *PollerConfig, sharder JobSharder, reader backend.Reader, com } // Do does the doing of getting a blocklist -func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { +func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, PerTenantPollDuration, error) { start := time.Now() defer func() { diff := time.Since(start).Seconds() @@ -155,7 +155,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { tenants, err := p.reader.Tenants(ctx) if err != nil { metricBlocklistErrors.WithLabelValues("").Inc() - return nil, nil, err + return nil, nil, nil, err } var ( @@ -164,6 +164,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { blocklist = PerTenant{} compactedBlocklist = PerTenantCompacted{} + pollDurations = PerTenantPollDuration{} tenantFailuresRemaining = atomic.NewInt32(int32(p.cfg.TolerateTenantFailures)) ) @@ -177,17 +178,35 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { wg.Add(1) go func(tenantID string) { - defer wg.Done() - var ( consecutiveErrorsRemaining = p.cfg.TolerateConsecutiveErrors newBlockList = make([]*backend.BlockMeta, 0) newCompactedBlockList = make([]*backend.CompactedBlockMeta, 0) + start = time.Now() err error ) + funcCtx, funcSpan := tracer.Start(ctx, "Poller.Do.tenant", + trace.WithAttributes( + attribute.String("tenant", tenantID), + attribute.Int("newBlockList", len(newBlockList)), + attribute.Int("newCompactedBlockList", len(newCompactedBlockList)), + ), + trace.WithLinks(trace.LinkFromContext(ctx)), + ) + defer func() { + if err != nil { + funcSpan.RecordError(err) + funcSpan.SetStatus(codes.Error, "tenant poll failed") + } else { + funcSpan.SetStatus(codes.Error, "") + } + funcSpan.End() + wg.Done() + }() + for consecutiveErrorsRemaining >= 0 { - newBlockList, newCompactedBlockList, err = p.pollTenantAndCreateIndex(ctx, tenantID, previous) + newBlockList, newCompactedBlockList, err = p.pollTenantAndCreateIndex(funcCtx, tenantID, previous) if err == nil { break } @@ -202,6 +221,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { level.Error(p.logger).Log("msg", "failed to poll or create index for tenant", "tenant", tenantID, "err", err) blocklist[tenantID] = previous.Metas(tenantID) compactedBlocklist[tenantID] = previous.CompactedMetas(tenantID) + pollDurations[tenantID] = time.Since(start) tenantFailuresRemaining.Dec() @@ -211,6 +231,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { if len(newBlockList) > 0 || len(newCompactedBlockList) > 0 { blocklist[tenantID] = newBlockList compactedBlocklist[tenantID] = newCompactedBlockList + pollDurations[tenantID] = time.Since(start) metricBlocklistLength.WithLabelValues(tenantID).Set(float64(len(newBlockList))) @@ -231,10 +252,10 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { wg.Wait() if tenantFailuresRemaining.Load() < 0 { - return nil, nil, errors.New("too many tenant failures; abandoning polling cycle") + return nil, nil, nil, errors.New("too many tenant failures; abandoning polling cycle") } - return blocklist, compactedBlocklist, nil + return blocklist, compactedBlocklist, pollDurations, nil } func (p *Poller) pollTenantAndCreateIndex( diff --git a/tempodb/blocklist/poller_test.go b/tempodb/blocklist/poller_test.go index d96f552a417..0dd5b732c57 100644 --- a/tempodb/blocklist/poller_test.go +++ b/tempodb/blocklist/poller_test.go @@ -173,7 +173,9 @@ func TestTenantIndexBuilder(t *testing.T) { }, &mockJobSharder{ owns: true, }, r, c, w, log.NewNopLogger()) - actualList, actualCompactedList, err := poller.Do(b) + actualList, actualCompactedList, durations, err := poller.Do(b) + + assert.Equal(t, nil, durations) // confirm return as expected assert.Equal(t, tc.expectedList, actualList) @@ -279,7 +281,7 @@ func TestTenantIndexFallback(t *testing.T) { }, &mockJobSharder{ owns: tc.isTenantIndexBuilder, }, r, c, w, log.NewNopLogger()) - _, _, err := poller.Do(b) + _, _, _, err := poller.Do(b) assert.Equal(t, tc.expectsError, err != nil) assert.Equal(t, tc.expectsTenantIndexWritten, w.IndexCompactedMeta != nil) @@ -646,7 +648,7 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) { EmptyTenantDeletionAge: testEmptyTenantIndexAge, }, s, r, c, w, log.NewLogfmtLogger(os.Stdout)) - _, _, err := poller.Do(b) + _, _, _, err := poller.Do(b) if tc.expectedError != nil { assert.ErrorContains(t, err, tc.expectedError.Error()) @@ -935,9 +937,11 @@ func TestPollComparePreviousResults(t *testing.T) { TolerateTenantFailures: tc.tollerateTenantFailures, }, s, r, c, w, log.NewNopLogger()) - metas, compactedMetas, err := poller.Do(previous) + metas, compactedMetas, durations, err := poller.Do(previous) require.Equal(t, tc.err, err) + require.Equal(t, len(metas), len(durations)) + require.Equal(t, len(tc.expectedPerTenant), len(metas)) for tenantID, expectedMetas := range tc.expectedPerTenant { l := metas[tenantID] @@ -1182,7 +1186,7 @@ func newMockReader(list PerTenant, compactedList PerTenantCompacted, expectsErro func newBlocklist(metas PerTenant, compactedMetas PerTenantCompacted) *List { l := New() - l.ApplyPollResults(metas, compactedMetas) + l.ApplyPollResults(metas, compactedMetas, nil) return l } diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 69093448cda..1982b8a085d 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -113,7 +113,7 @@ func (rw *readerWriter) doCompaction(ctx context.Context) { return } - // Get the meta file of all non-compacted blocks for the given tenant + // Get the meta file of all non-compacted blocks for the given tenant, blocklist := rw.blocklist.Metas(tenantID) window := rw.compactorOverrides.MaxCompactionRangeForTenant(tenantID) diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 86ceebed9bc..d4e32b9f090 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -589,13 +589,13 @@ func (rw *readerWriter) pollingLoop(ctx context.Context) { } func (rw *readerWriter) pollBlocklist() { - blocklist, compactedBlocklist, err := rw.blocklistPoller.Do(rw.blocklist) + blocklist, compactedBlocklist, pollDurations, err := rw.blocklistPoller.Do(rw.blocklist) if err != nil { level.Error(rw.logger).Log("msg", "failed to poll blocklist", "err", err) return } - rw.blocklist.ApplyPollResults(blocklist, compactedBlocklist) + rw.blocklist.ApplyPollResults(blocklist, compactedBlocklist, pollDurations) } // includeBlock indicates whether a given block should be included in a backend search