From 5bd7e2694864dc92c2102388314d0492320b0a45 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 31 Oct 2024 08:09:32 -0400 Subject: [PATCH 1/4] dont retry failed flushes Signed-off-by: Joe Elliott --- CHANGELOG.md | 1 + .../processor/localblocks/metrics.go | 6 +++++ .../processor/localblocks/processor.go | 26 ++++++++++++++++--- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5a405c1291..0e7d6aab889 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ * [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero) * [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero) * [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero) +* [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#????](https://github.com/grafana/tempo/pull/????) (@joe-elliott) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott) * [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno) diff --git a/modules/generator/processor/localblocks/metrics.go b/modules/generator/processor/localblocks/metrics.go index 3573f9982ba..aaa8bbe41f9 100644 --- a/modules/generator/processor/localblocks/metrics.go +++ b/modules/generator/processor/localblocks/metrics.go @@ -74,4 +74,10 @@ var ( Name: "flush_queue_size", Help: "Size of the flush queue", }, []string{"tenant"}) + metricFailedFlushes = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "failed_flushes_total", + Help: "The total number of failed flushes", + }) ) diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index d2ad9bd7e44..e0d9778a9ca 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -34,7 +34,10 @@ import ( var tracer = otel.Tracer("modules/generator/processor/localblocks") -const timeBuffer = 5 * time.Minute +const ( + timeBuffer = 5 * time.Minute + maxFlushAttempts = 100 +) // ProcessorOverrides is just the set of overrides needed here. type ProcessorOverrides interface { @@ -276,13 +279,28 @@ func (p *Processor) flushLoop() { op := o.(*flushOp) op.attempts++ + + if op.attempts > maxFlushAttempts { + _ = level.Error(p.logger).Log("msg", "failed to flush block after max attempts", "tenant", p.tenant, "block", op.blockID, "attempts", op.attempts) + + // attempt to delete the block + p.blocksMtx.Lock() + err := p.wal.LocalBackend().ClearBlock(op.blockID, p.tenant) + if err != nil { + _ = level.Error(p.logger).Log("msg", "failed to clear corrupt block", "tenant", p.tenant, "block", op.blockID, "err", err) + } + delete(p.completeBlocks, op.blockID) + p.blocksMtx.Unlock() + + continue + } + err := p.flushBlock(op.blockID) if err != nil { - _ = level.Error(p.logger).Log("msg", "failed to flush a block", "err", err) - _ = level.Info(p.logger).Log("msg", "re-queueing block for flushing", "block", op.blockID, "attempts", op.attempts) - op.at = time.Now().Add(op.backoff()) + metricFailedFlushes.Inc() + op.at = time.Now().Add(op.backoff()) if _, err := p.flushqueue.Enqueue(op); err != nil { _ = level.Error(p.logger).Log("msg", "failed to requeue block for flushing", "err", err) } From ec38e50624de96ec87d8acede2925c69fbd65b9f Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 31 Oct 2024 08:11:08 -0400 Subject: [PATCH 2/4] changelog Signed-off-by: Joe Elliott --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e7d6aab889..0970cc8b8c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,7 +44,7 @@ * [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero) * [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero) * [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero) -* [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#????](https://github.com/grafana/tempo/pull/????) (@joe-elliott) +* [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott) * [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno) From e57ca0a40290e8106270da1ae8509d4b37a118a2 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 31 Oct 2024 08:16:05 -0400 Subject: [PATCH 3/4] add sleep for backoff Signed-off-by: Joe Elliott --- .../generator/processor/localblocks/processor.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index e0d9778a9ca..87363e016ee 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -300,10 +300,15 @@ func (p *Processor) flushLoop() { _ = level.Info(p.logger).Log("msg", "re-queueing block for flushing", "block", op.blockID, "attempts", op.attempts) metricFailedFlushes.Inc() - op.at = time.Now().Add(op.backoff()) - if _, err := p.flushqueue.Enqueue(op); err != nil { - _ = level.Error(p.logger).Log("msg", "failed to requeue block for flushing", "err", err) - } + delay := op.backoff() + op.at = time.Now().Add(delay) + + go func() { + time.Sleep(delay) + if _, err := p.flushqueue.Enqueue(op); err != nil { + _ = level.Error(p.logger).Log("msg", "failed to requeue block for flushing", "err", err) + } + }() } } } From 7db4f37c150bad10eae8c17519f23b3815e7df9d Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 31 Oct 2024 13:13:46 -0400 Subject: [PATCH 4/4] add err Signed-off-by: Joe Elliott --- modules/generator/processor/localblocks/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index 87363e016ee..6591b34087c 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -297,7 +297,7 @@ func (p *Processor) flushLoop() { err := p.flushBlock(op.blockID) if err != nil { - _ = level.Info(p.logger).Log("msg", "re-queueing block for flushing", "block", op.blockID, "attempts", op.attempts) + _ = level.Info(p.logger).Log("msg", "re-queueing block for flushing", "block", op.blockID, "attempts", op.attempts, "err", err) metricFailedFlushes.Inc() delay := op.backoff()