From 7af1e8963f03c82e7644a681fc526d98c7f00055 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 17 Dec 2024 13:59:26 -0800 Subject: [PATCH 1/2] Improve streaming on MetricsForLabelMatchersStream method Signed-off-by: alanprot --- pkg/ingester/ingester.go | 70 ++++++++++++++++++++--------------- pkg/ingester/ingester_test.go | 21 +++++++++++ 2 files changed, 61 insertions(+), 30 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 27d094c441..2956ab4ce1 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1696,27 +1696,42 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR // MetricsForLabelMatchers returns all the metrics which match a set of matchers. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) { - result, cleanup, err := i.metricsForLabelMatchersCommon(ctx, req) + result := &client.MetricsForLabelMatchersResponse{} + cleanup, err := i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error { + result.Metric = append(result.Metric, &cortexpb.Metric{ + Labels: cortexpb.FromLabelsToLabelAdapters(l), + }) + return nil + }) defer cleanup() return result, err } func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) error { - result, cleanup, err := i.metricsForLabelMatchersCommon(stream.Context(), req) + result := &client.MetricsForLabelMatchersStreamResponse{} + + cleanup, err := i.metricsForLabelMatchersCommon(stream.Context(), req, func(l labels.Labels) error { + result.Metric = append(result.Metric, &cortexpb.Metric{ + Labels: cortexpb.FromLabelsToLabelAdapters(l), + }) + + if len(result.Metric) > metadataStreamBatchSize { + err := client.SendMetricsForLabelMatchersStream(stream, result) + if err != nil { + return err + } + result.Metric = result.Metric[:0] + } + return nil + }) defer cleanup() if err != nil { return err } - for i := 0; i < len(result.Metric); i += metadataStreamBatchSize { - j := i + metadataStreamBatchSize - if j > len(result.Metric) { - j = len(result.Metric) - } - resp := &client.MetricsForLabelMatchersStreamResponse{ - Metric: result.Metric[i:j], - } - err := client.SendMetricsForLabelMatchersStream(stream, resp) + // Send last batch + if len(result.Metric) > 0 { + err := client.SendMetricsForLabelMatchersStream(stream, result) if err != nil { return err } @@ -1728,36 +1743,36 @@ func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatc // metricsForLabelMatchersCommon returns all the metrics which match a set of matchers. // this should be used by MetricsForLabelMatchers and MetricsForLabelMatchersStream. // the cleanup function should be called in order to close the querier -func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, func(), error) { +func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *client.MetricsForLabelMatchersRequest, acc func(labels.Labels) error) (func(), error) { cleanup := func() {} if err := i.checkRunning(); err != nil { - return nil, cleanup, err + return cleanup, err } userID, err := tenant.TenantID(ctx) if err != nil { - return nil, cleanup, err + return cleanup, err } db := i.getTSDB(userID) if db == nil { - return &client.MetricsForLabelMatchersResponse{}, cleanup, nil + return cleanup, nil } // Parse the request _, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req) if err != nil { - return nil, cleanup, err + return cleanup, err } mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryIngestersWithin) if err != nil { - return nil, cleanup, err + return cleanup, err } q, err := db.Querier(mint, maxt) if err != nil { - return nil, cleanup, err + return cleanup, err } cleanup = func() { @@ -1780,7 +1795,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien for _, matchers := range matchersSet { // Interrupt if the context has been canceled. if ctx.Err() != nil { - return nil, cleanup, ctx.Err() + return cleanup, ctx.Err() } seriesSet := q.Select(ctx, true, hints, matchers...) @@ -1791,28 +1806,23 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien mergedSet = q.Select(ctx, false, hints, matchersSet[0]...) } - // Generate the response merging all series sets. - result := &client.MetricsForLabelMatchersResponse{ - Metric: make([]*cortexpb.Metric, 0), - } - cnt := 0 for mergedSet.Next() { cnt++ // Interrupt if the context has been canceled. if cnt%util.CheckContextEveryNIterations == 0 && ctx.Err() != nil { - return nil, cleanup, ctx.Err() + return cleanup, ctx.Err() + } + if err := acc(mergedSet.At().Labels()); err != nil { + return cleanup, err } - result.Metric = append(result.Metric, &cortexpb.Metric{ - Labels: cortexpb.FromLabelsToLabelAdapters(mergedSet.At().Labels()), - }) - if limit > 0 && len(result.Metric) >= limit { + if limit > 0 && cnt >= limit { break } } - return result, cleanup, nil + return cleanup, nil } // MetricsMetadata returns all the metric metadata of a user. diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f64368877f..e7ff476518 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2958,6 +2958,12 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) { res, err := i.MetricsForLabelMatchers(ctx, req) require.NoError(t, err) assert.ElementsMatch(t, testData.expected, res.Metric) + + // Stream + ss := mockMetricsForLabelMatchersStreamServer{ctx: ctx} + err = i.MetricsForLabelMatchersStream(req, &ss) + require.NoError(t, err) + assert.ElementsMatch(t, testData.expected, ss.res.Metric) }) } } @@ -3292,6 +3298,21 @@ func writeRequestSingleSeries(lbls labels.Labels, samples []cortexpb.Sample) *co return req } +type mockMetricsForLabelMatchersStreamServer struct { + grpc.ServerStream + ctx context.Context + res client.MetricsForLabelMatchersStreamResponse +} + +func (m *mockMetricsForLabelMatchersStreamServer) Send(response *client.MetricsForLabelMatchersStreamResponse) error { + m.res.Metric = append(m.res.Metric, response.Metric...) + return nil +} + +func (m *mockMetricsForLabelMatchersStreamServer) Context() context.Context { + return m.ctx +} + type mockQueryStreamServer struct { grpc.ServerStream ctx context.Context From e5f807c436c56d477d26d699ef844f53dde0595a Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 17 Dec 2024 17:34:50 -0800 Subject: [PATCH 2/2] Update pkg/ingester/ingester.go Co-authored-by: SungJin1212 Signed-off-by: Alan Protasio --- pkg/ingester/ingester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2956ab4ce1..613b96ae4c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1715,7 +1715,7 @@ func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatc Labels: cortexpb.FromLabelsToLabelAdapters(l), }) - if len(result.Metric) > metadataStreamBatchSize { + if len(result.Metric) >= metadataStreamBatchSize { err := client.SendMetricsForLabelMatchersStream(stream, result) if err != nil { return err