Skip to content

Commit

Permalink
Add per-tenant duration to the blocklist and trace per-tenant polling
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Nov 7, 2024
1 parent b6d7289 commit 435bf74
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 23 deletions.
10 changes: 5 additions & 5 deletions integration/poller/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ func TestPollerOwnership(t *testing.T) {
}

l := blocklist.New()
mm, cm, err := blocklistPoller.Do(l)
mm, cm, durs, err := blocklistPoller.Do(l)
require.NoError(t, err)
// t.Logf("mm: %v", mm)
// t.Logf("cm: %v", cm)

l.ApplyPollResults(mm, cm)
l.ApplyPollResults(mm, cm, durs)

for testTenant, expected := range tenantExpected {
metas := l.Metas(testTenant)
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestTenantDeletion(t *testing.T) {
}, OwnsEverythingSharder, r, cc, w, logger)

l := blocklist.New()
mm, cm, err := blocklistPoller.Do(l)
mm, cm, _, err := blocklistPoller.Do(l)
require.NoError(t, err)
t.Logf("mm: %v", mm)
t.Logf("cm: %v", cm)
Expand All @@ -339,7 +339,7 @@ func TestTenantDeletion(t *testing.T) {

time.Sleep(500 * time.Millisecond)

_, _, err = blocklistPoller.Do(l)
_, _, _, err = blocklistPoller.Do(l)
require.NoError(t, err)

tennants, err = r.Tenants(ctx)
Expand All @@ -357,7 +357,7 @@ func TestTenantDeletion(t *testing.T) {
}, OwnsEverythingSharder, r, cc, w, logger)

// Again
_, _, err = blocklistPoller.Do(l)
_, _, _, err = blocklistPoller.Do(l)
require.NoError(t, err)

tennants, err = r.Tenants(ctx)
Expand Down
13 changes: 12 additions & 1 deletion tempodb/blocklist/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blocklist

import (
"sync"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
Expand All @@ -13,11 +14,15 @@ type PerTenant map[string][]*backend.BlockMeta
// PerTenantCompacted is a map of tenant ids to backend.CompactedBlockMetas
type PerTenantCompacted map[string][]*backend.CompactedBlockMeta

// PerTenantPollDuration is a map of tenant ids to duration of the last poll.
type PerTenantPollDuration map[string]time.Duration

// List controls access to a per tenant blocklist and compacted blocklist
type List struct {
mtx sync.Mutex
metas PerTenant
compactedMetas PerTenantCompacted
pollDurations PerTenantPollDuration

// used by the compactor to track local changes it is aware of
added PerTenant
Expand All @@ -30,6 +35,7 @@ func New() *List {
return &List{
metas: make(PerTenant),
compactedMetas: make(PerTenantCompacted),
pollDurations: make(PerTenantPollDuration),

added: make(PerTenant),
removed: make(PerTenant),
Expand Down Expand Up @@ -81,7 +87,7 @@ func (l *List) CompactedMetas(tenantID string) []*backend.CompactedBlockMeta {
// ApplyPollResults applies the PerTenant and PerTenantCompacted maps to this blocklist
// Note that it also applies any known local changes and then wipes them out to be restored
// in the next polling cycle.
func (l *List) ApplyPollResults(m PerTenant, c PerTenantCompacted) {
func (l *List) ApplyPollResults(m PerTenant, c PerTenantCompacted, pd PerTenantPollDuration) {
l.mtx.Lock()
defer l.mtx.Unlock()

Expand All @@ -91,6 +97,11 @@ func (l *List) ApplyPollResults(m PerTenant, c PerTenantCompacted) {
// now reapply all updates and clear
for tenantID := range l.added {
l.updateInternal(tenantID, l.added[tenantID], l.removed[tenantID], l.compactedAdded[tenantID], l.compactedRemoved[tenantID])

clear(l.pollDurations)
if v, ok := pd[tenantID]; ok {
l.pollDurations[tenantID] = v
}
}

clear(l.added)
Expand Down
5 changes: 3 additions & 2 deletions tempodb/blocklist/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestApplyPollResults(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
l := New()
l.ApplyPollResults(tc.metas, tc.compacted)
// TODO: test the durations
l.ApplyPollResults(tc.metas, tc.compacted, nil)

actualTenants := l.Tenants()
sort.Slice(actualTenants, func(i, j int) bool { return actualTenants[i] < actualTenants[j] })
Expand Down Expand Up @@ -639,7 +640,7 @@ func TestUpdatesSaved(t *testing.T) {
for i, tc := range tests {
t.Logf("step %d", i+1)

l.ApplyPollResults(tc.applyMetas, tc.applyCompacted)
l.ApplyPollResults(tc.applyMetas, tc.applyCompacted, nil)
if tc.updateTenant != "" {
l.Update(tc.updateTenant, tc.addMetas, tc.removeMetas, tc.addCompacted, nil)
}
Expand Down
35 changes: 28 additions & 7 deletions tempodb/blocklist/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewPoller(cfg *PollerConfig, sharder JobSharder, reader backend.Reader, com
}

// Do does the doing of getting a blocklist
func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, PerTenantPollDuration, error) {
start := time.Now()
defer func() {
diff := time.Since(start).Seconds()
Expand All @@ -155,7 +155,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
tenants, err := p.reader.Tenants(ctx)
if err != nil {
metricBlocklistErrors.WithLabelValues("").Inc()
return nil, nil, err
return nil, nil, nil, err
}

var (
Expand All @@ -164,6 +164,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {

blocklist = PerTenant{}
compactedBlocklist = PerTenantCompacted{}
pollDurations = PerTenantPollDuration{}

tenantFailuresRemaining = atomic.NewInt32(int32(p.cfg.TolerateTenantFailures))
)
Expand All @@ -177,17 +178,35 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {

wg.Add(1)
go func(tenantID string) {
defer wg.Done()

var (
consecutiveErrorsRemaining = p.cfg.TolerateConsecutiveErrors
newBlockList = make([]*backend.BlockMeta, 0)
newCompactedBlockList = make([]*backend.CompactedBlockMeta, 0)
start = time.Now()
err error
)

funcCtx, funcSpan := tracer.Start(ctx, "Poller.Do.tenant",
trace.WithAttributes(
attribute.String("tenant", tenantID),
attribute.Int("newBlockList", len(newBlockList)),
attribute.Int("newCompactedBlockList", len(newCompactedBlockList)),
),
trace.WithLinks(trace.LinkFromContext(ctx)),
)
defer func() {
if err != nil {
funcSpan.RecordError(err)
funcSpan.SetStatus(codes.Error, "tenant poll failed")
} else {
funcSpan.SetStatus(codes.Error, "")
}
funcSpan.End()
wg.Done()
}()

for consecutiveErrorsRemaining >= 0 {
newBlockList, newCompactedBlockList, err = p.pollTenantAndCreateIndex(ctx, tenantID, previous)
newBlockList, newCompactedBlockList, err = p.pollTenantAndCreateIndex(funcCtx, tenantID, previous)
if err == nil {
break
}
Expand All @@ -202,6 +221,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
level.Error(p.logger).Log("msg", "failed to poll or create index for tenant", "tenant", tenantID, "err", err)
blocklist[tenantID] = previous.Metas(tenantID)
compactedBlocklist[tenantID] = previous.CompactedMetas(tenantID)
pollDurations[tenantID] = time.Since(start)

tenantFailuresRemaining.Dec()

Expand All @@ -211,6 +231,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
if len(newBlockList) > 0 || len(newCompactedBlockList) > 0 {
blocklist[tenantID] = newBlockList
compactedBlocklist[tenantID] = newCompactedBlockList
pollDurations[tenantID] = time.Since(start)

metricBlocklistLength.WithLabelValues(tenantID).Set(float64(len(newBlockList)))

Expand All @@ -231,10 +252,10 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
wg.Wait()

if tenantFailuresRemaining.Load() < 0 {
return nil, nil, errors.New("too many tenant failures; abandoning polling cycle")
return nil, nil, nil, errors.New("too many tenant failures; abandoning polling cycle")
}

return blocklist, compactedBlocklist, nil
return blocklist, compactedBlocklist, pollDurations, nil
}

func (p *Poller) pollTenantAndCreateIndex(
Expand Down
14 changes: 9 additions & 5 deletions tempodb/blocklist/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ func TestTenantIndexBuilder(t *testing.T) {
}, &mockJobSharder{
owns: true,
}, r, c, w, log.NewNopLogger())
actualList, actualCompactedList, err := poller.Do(b)
actualList, actualCompactedList, durations, err := poller.Do(b)

assert.Equal(t, nil, durations)

// confirm return as expected
assert.Equal(t, tc.expectedList, actualList)
Expand Down Expand Up @@ -279,7 +281,7 @@ func TestTenantIndexFallback(t *testing.T) {
}, &mockJobSharder{
owns: tc.isTenantIndexBuilder,
}, r, c, w, log.NewNopLogger())
_, _, err := poller.Do(b)
_, _, _, err := poller.Do(b)

assert.Equal(t, tc.expectsError, err != nil)
assert.Equal(t, tc.expectsTenantIndexWritten, w.IndexCompactedMeta != nil)
Expand Down Expand Up @@ -646,7 +648,7 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) {
EmptyTenantDeletionAge: testEmptyTenantIndexAge,
}, s, r, c, w, log.NewLogfmtLogger(os.Stdout))

_, _, err := poller.Do(b)
_, _, _, err := poller.Do(b)

if tc.expectedError != nil {
assert.ErrorContains(t, err, tc.expectedError.Error())
Expand Down Expand Up @@ -935,9 +937,11 @@ func TestPollComparePreviousResults(t *testing.T) {
TolerateTenantFailures: tc.tollerateTenantFailures,
}, s, r, c, w, log.NewNopLogger())

metas, compactedMetas, err := poller.Do(previous)
metas, compactedMetas, durations, err := poller.Do(previous)
require.Equal(t, tc.err, err)

require.Equal(t, len(metas), len(durations))

require.Equal(t, len(tc.expectedPerTenant), len(metas))
for tenantID, expectedMetas := range tc.expectedPerTenant {
l := metas[tenantID]
Expand Down Expand Up @@ -1182,7 +1186,7 @@ func newMockReader(list PerTenant, compactedList PerTenantCompacted, expectsErro
func newBlocklist(metas PerTenant, compactedMetas PerTenantCompacted) *List {
l := New()

l.ApplyPollResults(metas, compactedMetas)
l.ApplyPollResults(metas, compactedMetas, nil)

return l
}
2 changes: 1 addition & 1 deletion tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
return
}

// Get the meta file of all non-compacted blocks for the given tenant
// Get the meta file of all non-compacted blocks for the given tenant,
blocklist := rw.blocklist.Metas(tenantID)

window := rw.compactorOverrides.MaxCompactionRangeForTenant(tenantID)
Expand Down
4 changes: 2 additions & 2 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,13 +589,13 @@ func (rw *readerWriter) pollingLoop(ctx context.Context) {
}

func (rw *readerWriter) pollBlocklist() {
blocklist, compactedBlocklist, err := rw.blocklistPoller.Do(rw.blocklist)
blocklist, compactedBlocklist, pollDurations, err := rw.blocklistPoller.Do(rw.blocklist)
if err != nil {
level.Error(rw.logger).Log("msg", "failed to poll blocklist", "err", err)
return
}

rw.blocklist.ApplyPollResults(blocklist, compactedBlocklist)
rw.blocklist.ApplyPollResults(blocklist, compactedBlocklist, pollDurations)
}

// includeBlock indicates whether a given block should be included in a backend search
Expand Down

0 comments on commit 435bf74

Please sign in to comment.