diff --git a/CHANGELOG.md b/CHANGELOG.md index a323b244ba5..a9b8bbb742b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ * [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno) * [BUGFIX] Fix counter samples being downsampled by backdate to the previous minute the initial sample when the series is new [#44236](https://github.com/grafana/tempo/pull/4236) (@javiermolinar) * [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar) +* [BUGFIX] Gave context to orphaned spans related to various maintenance processes. [#4260](https://github.com/grafana/tempo/pull/4260) (@joe-elliott) # v2.6.1 diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index 6591b34087c..ec3458eda43 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -354,6 +354,9 @@ func (p *Processor) completeBlock() error { b = firstWalBlock ) + ctx, span := tracer.Start(ctx, "Processor.CompleteBlock") + defer span.End() + iter, err := b.Iterator() if err != nil { return err diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index ebb39ec00cc..9bb3afd2291 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -176,7 +176,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { } if blockID != uuid.Nil { - level.Info(log.Logger).Log("msg", "head block cut. enqueueing flush op", "userid", instance.instanceID, "block", blockID) + level.Info(log.Logger).Log("msg", "head block cut. enqueueing flush op", "tenant", instance.instanceID, "block", blockID) // jitter to help when flushing many instances at the same time // no jitter if immediate (initiated via /flush handler for example) i.enqueue(&flushOp{ @@ -211,7 +211,7 @@ func (i *Ingester) flushLoop(j int) { var err error if op.kind == opKindComplete { - retry, err = i.handleComplete(op) + retry, err = i.handleComplete(context.Background(), op) } else { retry, err = i.handleFlush(context.Background(), op.userID, op.blockID) } @@ -243,7 +243,11 @@ func handleAbandonedOp(op *flushOp) { "op", op.kind, "block", op.blockID.String(), "attempts", op.attempts) } -func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) { +func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool, err error) { + ctx, sp := tracer.Start(ctx, "ingester.Complete", trace.WithAttributes(attribute.String("tenant", op.userID), attribute.String("blockID", op.blockID.String()))) + defer sp.End() + withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "tenant", op.userID, "block", op.blockID.String()) + // No point in proceeding if shutdown has been initiated since // we won't be able to queue up the next flush op if i.flushQueues.IsStopped() { @@ -252,20 +256,20 @@ func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) { } start := time.Now() - level.Info(log.Logger).Log("msg", "completing block", "userid", op.userID, "blockID", op.blockID) + level.Info(log.Logger).Log("msg", "completing block", "tenant", op.userID, "blockID", op.blockID) instance, err := i.getOrCreateInstance(op.userID) if err != nil { return false, err } - err = instance.CompleteBlock(op.blockID) - level.Info(log.Logger).Log("msg", "block completed", "userid", op.userID, "blockID", op.blockID, "duration", time.Since(start)) + err = instance.CompleteBlock(ctx, op.blockID) + level.Info(log.Logger).Log("msg", "block completed", "tenant", op.userID, "blockID", op.blockID, "duration", time.Since(start)) if err != nil { handleFailedOp(op, err) if op.attempts >= maxCompleteAttempts { level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS", - "userID", op.userID, "attempts", op.attempts, "block", op.blockID.String()) + "tenant", op.userID, "attempts", op.attempts, "block", op.blockID.String()) // Delete WAL and move on err = instance.ClearCompletingBlock(op.blockID) @@ -306,9 +310,9 @@ func withSpan(logger gklog.Logger, sp trace.Span) gklog.Logger { } func (i *Ingester) handleFlush(ctx context.Context, userID string, blockID uuid.UUID) (retry bool, err error) { - ctx, sp := tracer.Start(ctx, "flush", trace.WithAttributes(attribute.String("organization", userID), attribute.String("blockID", blockID.String()))) + ctx, sp := tracer.Start(ctx, "ingester.Flush", trace.WithAttributes(attribute.String("tenant", userID), attribute.String("blockID", blockID.String()))) defer sp.End() - withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "userid", userID, "block", blockID.String()) + withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "tenant", userID, "block", blockID.String()) instance, err := i.getOrCreateInstance(userID) if err != nil { diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index f9d40b5aa52..6aafa5c5c9c 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -193,7 +193,7 @@ func TestWalDropsZeroLength(t *testing.T) { blockID, err := instance.CutBlockIfReady(0, 0, true) require.NoError(t, err) - err = instance.CompleteBlock(blockID) + err = instance.CompleteBlock(context.Background(), blockID) require.NoError(t, err) err = instance.ClearCompletingBlock(blockID) @@ -408,7 +408,7 @@ func TestDedicatedColumns(t *testing.T) { inst.blocksMtx.RUnlock() // Complete block - err = inst.CompleteBlock(blockID) + err = inst.CompleteBlock(context.Background(), blockID) require.NoError(t, err) // TODO: This check should be included as part of the read path diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 0614a58d3cf..5d467e1e3fa 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -313,7 +313,7 @@ func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes } // CompleteBlock moves a completingBlock to a completeBlock. The new completeBlock has the same ID. -func (i *instance) CompleteBlock(blockID uuid.UUID) error { +func (i *instance) CompleteBlock(ctx context.Context, blockID uuid.UUID) error { i.blocksMtx.Lock() var completingBlock common.WALBlock for _, iterBlock := range i.completingBlocks { @@ -328,8 +328,6 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) error { return fmt.Errorf("error finding completingBlock") } - ctx := context.Background() - backendBlock, err := i.writer.CompleteBlockWithBackend(ctx, completingBlock, i.localReader, i.localWriter) if err != nil { return fmt.Errorf("error completing wal block with local backend: %w", err) diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 98349bb0356..2de7b25f4a6 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -296,7 +296,7 @@ func (i *instance) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsRequ } if distinctValues.Exceeded() { - level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit) + level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "tenant", userID, "limit", limit) } collected := distinctValues.Strings() @@ -382,7 +382,7 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop } if distinctValues.Exceeded() { - level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID, "limit", limit, "size", distinctValues.Size()) + level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "tenant", userID, "limit", limit, "size", distinctValues.Size()) } return &tempopb.SearchTagValuesResponse{ @@ -589,7 +589,7 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag } if valueCollector.Exceeded() { - _ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", valueCollector.Size()) + _ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "tenant", userID, "limit", limit, "size", valueCollector.Size()) } resp := &tempopb.SearchTagValuesV2Response{ diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index cc91b3b4b27..84a20a920d2 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -65,7 +65,7 @@ func TestInstanceSearch(t *testing.T) { checkEqual(t, ids, sr) // Test after completing a block - err = i.CompleteBlock(blockID) + err = i.CompleteBlock(context.Background(), blockID) require.NoError(t, err) sr, err = i.Search(context.Background(), req) @@ -133,7 +133,7 @@ func TestInstanceSearchTraceQL(t *testing.T) { checkEqual(t, ids, sr) // Test after completing a block - err = i.CompleteBlock(blockID) + err = i.CompleteBlock(context.Background(), blockID) require.NoError(t, err) sr, err = i.Search(context.Background(), req) @@ -222,7 +222,7 @@ func TestInstanceSearchWithStartAndEnd(t *testing.T) { searchAndAssert(req, uint32(100)) // Test after completing a block - err = i.CompleteBlock(blockID) + err = i.CompleteBlock(context.Background(), blockID) require.NoError(t, err) searchAndAssert(req, uint32(200)) @@ -267,7 +267,7 @@ func TestInstanceSearchTags(t *testing.T) { testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) // Test after completing a block - err = i.CompleteBlock(blockID) + err = i.CompleteBlock(context.Background(), blockID) require.NoError(t, err) testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues) @@ -332,7 +332,7 @@ func TestInstanceSearchTagAndValuesV2(t *testing.T) { testSearchTagsAndValuesV2(t, userCtx, i, tagKey, queryThatMatches, expectedTagValues, expectedEventTagValues, expectedLinkTagValues) // Test after completing a block - err = i.CompleteBlock(blockID) + err = i.CompleteBlock(context.Background(), blockID) require.NoError(t, err) require.NoError(t, i.ClearCompletingBlock(blockID)) // Clear the completing block @@ -681,7 +681,7 @@ func TestInstanceSearchDoesNotRace(t *testing.T) { // Cut wal, complete, delete wal, then flush blockID, _ := i.CutBlockIfReady(0, 0, true) if blockID != uuid.Nil { - err := i.CompleteBlock(blockID) + err := i.CompleteBlock(context.Background(), blockID) require.NoError(t, err) err = i.ClearCompletingBlock(blockID) require.NoError(t, err) @@ -837,7 +837,7 @@ func TestInstanceSearchMetrics(t *testing.T) { require.Less(t, numBytes, m.InspectedBytes) // Test after completing a block - err = i.CompleteBlock(blockID) + err = i.CompleteBlock(context.Background(), blockID) require.NoError(t, err) err = i.ClearCompletingBlock(blockID) require.NoError(t, err) diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 414d6fdba74..8ffed343daa 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -50,7 +50,7 @@ func TestInstance(t *testing.T) { require.NoError(t, err, "unexpected error cutting block") require.NotEqual(t, blockID, uuid.Nil) - err = i.CompleteBlock(blockID) + err = i.CompleteBlock(context.Background(), blockID) require.NoError(t, err, "unexpected error completing block") block := i.GetBlockToBeFlushed(blockID) @@ -103,7 +103,7 @@ func TestInstanceFind(t *testing.T) { queryAll(t, i, ids, traces) - err = i.CompleteBlock(blockID) + err = i.CompleteBlock(context.Background(), blockID) require.NoError(t, err) queryAll(t, i, ids, traces) @@ -185,7 +185,7 @@ func TestInstanceDoesNotRace(t *testing.T) { go concurrent(func() { blockID, _ := i.CutBlockIfReady(0, 0, false) if blockID != uuid.Nil { - err := i.CompleteBlock(blockID) + err := i.CompleteBlock(context.Background(), blockID) require.NoError(t, err, "unexpected error completing block") block := i.GetBlockToBeFlushed(blockID) require.NotNil(t, block) @@ -487,7 +487,7 @@ func TestInstanceCutBlockIfReady(t *testing.T) { blockID, err := instance.CutBlockIfReady(tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate) require.NoError(t, err) - err = instance.CompleteBlock(blockID) + err = instance.CompleteBlock(context.Background(), blockID) if tc.expectedToCutBlock { require.NoError(t, err, "unexpected error completing block") } @@ -738,7 +738,7 @@ func BenchmarkInstanceFindTraceByIDFromCompleteBlock(b *testing.B) { require.NoError(b, err) id, err := instance.CutBlockIfReady(0, 0, true) require.NoError(b, err) - err = instance.CompleteBlock(id) + err = instance.CompleteBlock(context.Background(), id) require.NoError(b, err) require.Equal(b, 1, len(instance.completeBlocks)) @@ -776,7 +776,7 @@ func benchmarkInstanceSearch(b testing.TB) { // force the traces to be in a complete block id, err := instance.CutBlockIfReady(0, 0, true) require.NoError(b, err) - err = instance.CompleteBlock(id) + err = instance.CompleteBlock(context.Background(), id) require.NoError(b, err) require.Equal(b, 1, len(instance.completeBlocks)) @@ -880,7 +880,7 @@ func BenchmarkInstanceContention(t *testing.B) { go concurrent(func() { blockID, _ := i.CutBlockIfReady(0, 0, false) if blockID != uuid.Nil { - err := i.CompleteBlock(blockID) + err := i.CompleteBlock(context.Background(), blockID) require.NoError(t, err, "unexpected error completing block") err = i.ClearCompletingBlock(blockID) require.NoError(t, err, "unexpected error clearing wal block") diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index 843b079b96d..ef026c45701 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel" "github.com/grafana/tempo/cmd/tempo/build" "github.com/grafana/tempo/tempodb/backend" @@ -36,6 +37,8 @@ var ( stabilityCheckInterval = 5 * time.Second stabilityMinimumRequired = 6 + + tracer = otel.Tracer("usagestats/Reporter") ) type Reporter struct { @@ -158,6 +161,9 @@ func ensureStableKey(ctx context.Context, kvClient kv.Client, logger log.Logger) } func (rep *Reporter) init(ctx context.Context) { + ctx, span := tracer.Start(ctx, "UsageReporter.init") + defer span.End() + if rep.conf.Leader { rep.cluster = rep.initLeader(ctx) return diff --git a/tempodb/blocklist/poller.go b/tempodb/blocklist/poller.go index e2fc1ed2751..e8571497510 100644 --- a/tempodb/blocklist/poller.go +++ b/tempodb/blocklist/poller.go @@ -149,7 +149,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, span := tracer.Start(ctx, "Poller.Do") + ctx, span := tracer.Start(ctx, "Poller.Do") defer span.End() tenants, err := p.reader.Tenants(ctx)