diff --git a/CHANGELOG.md b/CHANGELOG.md index f5a405c1291..0970cc8b8c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ * [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero) * [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero) * [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero) +* [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott) * [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno) diff --git a/modules/generator/processor/localblocks/metrics.go b/modules/generator/processor/localblocks/metrics.go index 3573f9982ba..aaa8bbe41f9 100644 --- a/modules/generator/processor/localblocks/metrics.go +++ b/modules/generator/processor/localblocks/metrics.go @@ -74,4 +74,10 @@ var ( Name: "flush_queue_size", Help: "Size of the flush queue", }, []string{"tenant"}) + metricFailedFlushes = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "failed_flushes_total", + Help: "The total number of failed flushes", + }) ) diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index d2ad9bd7e44..6591b34087c 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -34,7 +34,10 @@ import ( var tracer = otel.Tracer("modules/generator/processor/localblocks") -const timeBuffer = 5 * time.Minute +const ( + timeBuffer = 5 * time.Minute + maxFlushAttempts = 100 +) // ProcessorOverrides is just the set of overrides needed here. type ProcessorOverrides interface { @@ -276,16 +279,36 @@ func (p *Processor) flushLoop() { op := o.(*flushOp) op.attempts++ + + if op.attempts > maxFlushAttempts { + _ = level.Error(p.logger).Log("msg", "failed to flush block after max attempts", "tenant", p.tenant, "block", op.blockID, "attempts", op.attempts) + + // attempt to delete the block + p.blocksMtx.Lock() + err := p.wal.LocalBackend().ClearBlock(op.blockID, p.tenant) + if err != nil { + _ = level.Error(p.logger).Log("msg", "failed to clear corrupt block", "tenant", p.tenant, "block", op.blockID, "err", err) + } + delete(p.completeBlocks, op.blockID) + p.blocksMtx.Unlock() + + continue + } + err := p.flushBlock(op.blockID) if err != nil { - _ = level.Error(p.logger).Log("msg", "failed to flush a block", "err", err) + _ = level.Info(p.logger).Log("msg", "re-queueing block for flushing", "block", op.blockID, "attempts", op.attempts, "err", err) + metricFailedFlushes.Inc() - _ = level.Info(p.logger).Log("msg", "re-queueing block for flushing", "block", op.blockID, "attempts", op.attempts) - op.at = time.Now().Add(op.backoff()) + delay := op.backoff() + op.at = time.Now().Add(delay) - if _, err := p.flushqueue.Enqueue(op); err != nil { - _ = level.Error(p.logger).Log("msg", "failed to requeue block for flushing", "err", err) - } + go func() { + time.Sleep(delay) + if _, err := p.flushqueue.Enqueue(op); err != nil { + _ = level.Error(p.logger).Log("msg", "failed to requeue block for flushing", "err", err) + } + }() } } }