Skip to content

Commit

Permalink
Metrics Generator: Max limit on number of failed flushes (#4254)
Browse files Browse the repository at this point in the history
* dont retry failed flushes

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* add sleep for backoff

Signed-off-by: Joe Elliott <[email protected]>

* add err

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Oct 31, 2024
1 parent 61fa0a5 commit d035888
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero)
* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
Expand Down
6 changes: 6 additions & 0 deletions modules/generator/processor/localblocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,10 @@ var (
Name: "flush_queue_size",
Help: "Size of the flush queue",
}, []string{"tenant"})
metricFailedFlushes = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_flushes_total",
Help: "The total number of failed flushes",
})
)
37 changes: 30 additions & 7 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ import (

var tracer = otel.Tracer("modules/generator/processor/localblocks")

const timeBuffer = 5 * time.Minute
const (
timeBuffer = 5 * time.Minute
maxFlushAttempts = 100
)

// ProcessorOverrides is just the set of overrides needed here.
type ProcessorOverrides interface {
Expand Down Expand Up @@ -276,16 +279,36 @@ func (p *Processor) flushLoop() {

op := o.(*flushOp)
op.attempts++

if op.attempts > maxFlushAttempts {
_ = level.Error(p.logger).Log("msg", "failed to flush block after max attempts", "tenant", p.tenant, "block", op.blockID, "attempts", op.attempts)

// attempt to delete the block
p.blocksMtx.Lock()
err := p.wal.LocalBackend().ClearBlock(op.blockID, p.tenant)
if err != nil {
_ = level.Error(p.logger).Log("msg", "failed to clear corrupt block", "tenant", p.tenant, "block", op.blockID, "err", err)
}
delete(p.completeBlocks, op.blockID)
p.blocksMtx.Unlock()

continue
}

err := p.flushBlock(op.blockID)
if err != nil {
_ = level.Error(p.logger).Log("msg", "failed to flush a block", "err", err)
_ = level.Info(p.logger).Log("msg", "re-queueing block for flushing", "block", op.blockID, "attempts", op.attempts, "err", err)
metricFailedFlushes.Inc()

_ = level.Info(p.logger).Log("msg", "re-queueing block for flushing", "block", op.blockID, "attempts", op.attempts)
op.at = time.Now().Add(op.backoff())
delay := op.backoff()
op.at = time.Now().Add(delay)

if _, err := p.flushqueue.Enqueue(op); err != nil {
_ = level.Error(p.logger).Log("msg", "failed to requeue block for flushing", "err", err)
}
go func() {
time.Sleep(delay)
if _, err := p.flushqueue.Enqueue(op); err != nil {
_ = level.Error(p.logger).Log("msg", "failed to requeue block for flushing", "err", err)
}
}()
}
}
}
Expand Down

0 comments on commit d035888

Please sign in to comment.