Skip to content

Commit

Permalink
FIX-699: Add capability to completely flush on shutdown (#2538)
Browse files Browse the repository at this point in the history
  • Loading branch information
mghildiy authored Jun 7, 2023
1 parent 3795ff5 commit ad6532f
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [CHANGE] Make vParquet2 the default block format [#2526](https://github.com/grafana/tempo/pull/2526) (@stoewer)
* [CHANGE] Disable tempo-query by default in Jsonnet libs. [#2462](https://github.com/grafana/tempo/pull/2462) (@electron0zero)
* [FEATURE] New experimental API to derive on-demand RED metrics grouped by any attribute, and new metrics generator processor [#2368](https://github.com/grafana/tempo/pull/2368) [#2418](https://github.com/grafana/tempo/pull/2418) [#2424](https://github.com/grafana/tempo/pull/2424) [#2442](https://github.com/grafana/tempo/pull/2442) [#2480](https://github.com/grafana/tempo/pull/2480) [#2481](https://github.com/grafana/tempo/pull/2481) [#2501](https://github.com/grafana/tempo/pull/2501) (@mdisibio @zalegrala)
* [ENHANCEMENT] Add capability to flush all remaining traces to backend when ingester is stopped [#2538](https://github.com/grafana/tempo/pull/2538)
* [ENHANCEMENT] Fill parent ID column and nested set columns [#2487](https://github.com/grafana/tempo/pull/2487) (@stoewer)
* [ENHANCEMENT] Add metrics generator config option to allow customizable ring port [#2399](https://github.com/grafana/tempo/pull/2399) (@mdisibio)
* [ENHANCEMENT] Improve performance of TraceQL regex [#2484](https://github.com/grafana/tempo/pull/2484) (@mdisibio)
Expand Down
3 changes: 3 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ ingester:
# duration to keep blocks in the ingester after they have been flushed
# (default: 15m)
[ complete_block_timeout: <duration>]

# Flush all traces to backend when ingester is stopped
[flush_all_on_shutdown: <bool> | default = false]
```
## Metrics-generator
Expand Down
2 changes: 2 additions & 0 deletions modules/ingester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Config struct {
MaxBlockBytes uint64 `yaml:"max_block_bytes"`
CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"`
OverrideRingKey string `yaml:"override_ring_key"`
FlushAllOnShutdown bool `yaml:"flush_all_on_shutdown"`

AutocompleteFilteringEnabled bool `yaml:"-"`
}
Expand All @@ -40,6 +41,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.ConcurrentFlushes = 4
cfg.FlushCheckPeriod = 10 * time.Second
cfg.FlushOpTimeout = 5 * time.Minute
cfg.FlushAllOnShutdown = false

f.DurationVar(&cfg.MaxTraceIdle, prefix+".trace-idle-period", 10*time.Second, "Duration after which to consider a trace complete if no spans have been received")
f.DurationVar(&cfg.MaxBlockDuration, prefix+".max-block-duration", 30*time.Minute, "Maximum duration which the head block can be appended to before cutting it.")
Expand Down
8 changes: 2 additions & 6 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,8 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) {
// stop accepting new writes
i.markUnavailable()

// move all data into flushQueue
i.sweepAllInstances(true)

for !i.flushQueues.IsEmpty() {
time.Sleep(100 * time.Millisecond)
}
// flush any remaining traces
i.flushRemaining()

// stop ingester service
_ = services.StopAndAwaitTerminated(context.Background(), i)
Expand Down
18 changes: 18 additions & 0 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,28 @@ func (i *Ingester) loop(ctx context.Context) error {
}
}

// complete the flushing
// ExclusiveQueues.activekeys keeps track of flush operations due for processing
// ExclusiveQueues.IsEmpty check uses ExclusiveQueues.activeKeys to determine if flushQueues is empty or not
// sweepAllInstances prepares remaining traces to be flushed by flushLoop routine, also updating ExclusiveQueues.activekeys with keys for new flush operations
// ExclusiveQueues.activeKeys is cleared of a flush operation when a processing of flush operation is either successful or doesn't return retry signal
// This ensures that i.flushQueues is empty only when all traces are flushed
func (i *Ingester) flushRemaining() {
i.sweepAllInstances(true)
for !i.flushQueues.IsEmpty() {
time.Sleep(100 * time.Millisecond)
}
}

// stopping is run when ingester is asked to stop
func (i *Ingester) stopping(_ error) error {
i.markUnavailable()

// flush any remaining traces
if i.cfg.FlushAllOnShutdown {
i.flushRemaining()
}

if i.flushQueues != nil {
i.flushQueues.Stop()
i.flushQueuesDone.Wait()
Expand Down

0 comments on commit ad6532f

Please sign in to comment.