From 77c7138d8112cf9837fbc571b5c9f823ec04df31 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 7 May 2024 14:19:42 -0700 Subject: [PATCH] Clean up ingester client inflightPushRequests on close (#5932) * Clean up ingester client inflightPushRequests on close Signed-off-by: Alex Le * fix test Signed-off-by: Alex Le --------- Signed-off-by: Alex Le --- pkg/ingester/client/client.go | 11 +++++++---- pkg/ingester/client/client_test.go | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 7bb1a6a29c..c62b9c59f4 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -48,9 +48,10 @@ type closableHealthAndIngesterClient struct { IngesterClient grpc_health_v1.HealthClient conn ClosableClientConn + addr string maxInflightPushRequests int64 inflightRequests atomic.Int64 - inflightPushRequests prometheus.Gauge + inflightPushRequests *prometheus.GaugeVec } func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { @@ -72,9 +73,9 @@ func (c *closableHealthAndIngesterClient) Push(ctx context.Context, in *cortexpb func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*cortexpb.WriteResponse, error)) (*cortexpb.WriteResponse, error) { currentInflight := c.inflightRequests.Inc() - c.inflightPushRequests.Inc() + c.inflightPushRequests.WithLabelValues(c.addr).Inc() defer func() { - c.inflightPushRequests.Dec() + c.inflightPushRequests.WithLabelValues(c.addr).Dec() c.inflightRequests.Dec() }() if c.maxInflightPushRequests > 0 && currentInflight > c.maxInflightPushRequests { @@ -97,12 +98,14 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error IngesterClient: NewIngesterClient(conn), HealthClient: grpc_health_v1.NewHealthClient(conn), conn: conn, + addr: addr, maxInflightPushRequests: cfg.MaxInflightPushRequests, - inflightPushRequests: ingesterClientInflightPushRequests.WithLabelValues(addr), + inflightPushRequests: ingesterClientInflightPushRequests, }, nil } func (c *closableHealthAndIngesterClient) Close() error { + c.inflightPushRequests.DeleteLabelValues(c.addr) return c.conn.Close() } diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 270db7d458..55e9a40a9e 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -104,8 +104,9 @@ func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequ client := &closableHealthAndIngesterClient{ IngesterClient: &mockIngester{}, conn: &mockClientConn{}, + addr: "dummy_addr", maxInflightPushRequests: maxInflightPushRequests, - inflightPushRequests: prometheus.NewGauge(prometheus.GaugeOpts{}), + inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}), } client.inflightRequests.Add(currentInflightRequests) return client