diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 27d094c441..1d3fc2c5fc 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -233,8 +233,9 @@ type Ingester struct { TSDBState TSDBState // Rate of pushed samples. Only used by V2-ingester to limit global samples push rate. - ingestionRate *util_math.EwmaRate - inflightPushRequests atomic.Int64 + ingestionRate *util_math.EwmaRate + inflightPushRequests atomic.Int64 + maxInflightPushRequests util_math.MaxTracker inflightQueryRequests atomic.Int64 maxInflightQueryRequests util_math.MaxTracker @@ -710,7 +711,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, - &i.inflightPushRequests, + &i.maxInflightPushRequests, &i.maxInflightQueryRequests, cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled) i.validateMetrics = validation.NewValidateMetrics(registerer) @@ -792,7 +793,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe false, i.getInstanceLimits, nil, - &i.inflightPushRequests, + &i.maxInflightPushRequests, &i.maxInflightQueryRequests, cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled, ) @@ -918,8 +919,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { metadataPurgeTicker := time.NewTicker(metadataPurgePeriod) defer metadataPurgeTicker.Stop() - maxInflightRequestResetTicker := time.NewTicker(maxInflightRequestResetPeriod) - defer maxInflightRequestResetTicker.Stop() + maxTrackerResetTicker := time.NewTicker(maxInflightRequestResetPeriod) + defer maxTrackerResetTicker.Stop() for { select { @@ -937,8 +938,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error { case <-activeSeriesTickerChan: i.updateActiveSeries(ctx) - case <-maxInflightRequestResetTicker.C: + case <-maxTrackerResetTicker.C: i.maxInflightQueryRequests.Tick() + i.maxInflightPushRequests.Tick() case <-userTSDBConfigTicker.C: i.updateUserTSDBConfigs() case <-ctx.Done(): @@ -1068,6 +1070,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // We will report *this* request in the error too. inflight := i.inflightPushRequests.Inc() + i.maxInflightPushRequests.Track(inflight) defer i.inflightPushRequests.Dec() gl := i.getInstanceLimits() diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index b1c7edc50d..645f995868 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -3,7 +3,6 @@ package ingester import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" @@ -53,10 +52,12 @@ type ingesterMetrics struct { maxUsersGauge prometheus.GaugeFunc maxSeriesGauge prometheus.GaugeFunc maxIngestionRate prometheus.GaugeFunc - ingestionRate prometheus.GaugeFunc maxInflightPushRequests prometheus.GaugeFunc - inflightRequests prometheus.GaugeFunc - inflightQueryRequests prometheus.GaugeFunc + + // Current Usage + ingestionRate prometheus.GaugeFunc + inflightRequests prometheus.GaugeFunc + inflightQueryRequests prometheus.GaugeFunc // Posting Cache Metrics expandedPostingsCacheMetrics *tsdb.ExpandedPostingsCacheMetrics @@ -67,7 +68,7 @@ func newIngesterMetrics(r prometheus.Registerer, activeSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, - inflightPushRequests *atomic.Int64, + inflightPushRequests *util_math.MaxTracker, maxInflightQueryRequests *util_math.MaxTracker, postingsCacheEnabled bool, ) *ingesterMetrics { diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 56214c9a50..031dec9482 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -8,7 +8,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" - "go.uber.org/atomic" util_math "github.com/cortexproject/cortex/pkg/util/math" ) @@ -16,10 +15,10 @@ import ( func TestIngesterMetrics(t *testing.T) { mainReg := prometheus.NewPedanticRegistry() ingestionRate := util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) - inflightPushRequests := &atomic.Int64{} + inflightPushRequests := util_math.MaxTracker{} maxInflightQueryRequests := util_math.MaxTracker{} maxInflightQueryRequests.Track(98) - inflightPushRequests.Store(14) + inflightPushRequests.Track(14) m := newIngesterMetrics(mainReg, false, @@ -33,7 +32,7 @@ func TestIngesterMetrics(t *testing.T) { } }, ingestionRate, - inflightPushRequests, + &inflightPushRequests, &maxInflightQueryRequests, false)