Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester: Add matchers to LabelNames() ingester RPC #6209

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [ENHANCEMENT] Ingester: Add new API `/ingester/all_user_stats` which shows loaded blocks, active timeseries and ingestion rate for a specific ingester. #6178
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209

## 1.18.0 2024-09-03

Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ querier:
# CLI flag: -querier.ingester-metadata-streaming
[ingester_metadata_streaming: <boolean> | default = true]

# Use LabelNames ingester RPCs with match params.
# CLI flag: -querier.ingester-label-names-with-matchers
[ingester_label_names_with_matchers: <boolean> | default = false]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this config?
If the matcher is in the request, its not cause we always wanna filter out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature flag is for safe guarding during a deployment.
In this PR, queriers will start calling ingester LabelNames with Matchers. But if ingesters haven't restarted yet, then ingesters will ignore that and return label names without filtering out.

We can remove this flag in 2 releases and make this behavior the default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we add TODO or create an issue about removing/deprecating this flag now. Otherwise we will forget

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that if the ingesters ignored the matchers in the request from the querier since the ingesters had not restarted yet, wouldn't that be the same behavior today since the ingesters already do not handle the matchers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today's behavior for label names calls with matchers is like this:

  • Querier will retrieve all the series using MetricsForLabelMatchers call. It'll then get the unique label names from the retrieved series.

The new behaviour is for label names calls with matchers:

  • Querier will directly call LabelNames call and pass the matchers.
  • If ingesters ignore the matchers, this will break the API.

I think having the feature flag is unavoidable.

Created #6222 for tracking the deprecation of the flag.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thank you for the clarification @harry671003.


# Maximum number of samples a single query can load into memory.
# CLI flag: -querier.max-samples
[max_samples: <int> | default = 50000000]
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3701,6 +3701,10 @@ The `querier_config` configures the Cortex querier.
# CLI flag: -querier.ingester-metadata-streaming
[ingester_metadata_streaming: <boolean> | default = true]

# Use LabelNames ingester RPCs with match params.
# CLI flag: -querier.ingester-label-names-with-matchers
[ingester_label_names_with_matchers: <boolean> | default = false]

# Maximum number of samples a single query can load into memory.
# CLI flag: -querier.max-samples
[max_samples: <int> | default = 50000000]
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
}, matchers...)
}

func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelNames", opentracing.Tags{
"start": from.Unix(),
"end": to.Unix(),
Expand All @@ -1113,11 +1113,11 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
}

limit := getLimitFromLabelHints(hints)
req := &ingester_client.LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Limit: int64(limit),
req, err := ingester_client.ToLabelNamesRequest(from, to, limit, matchers)
if err != nil {
return nil, err
}

resps, err := f(ctx, replicationSet, req)
if err != nil {
return nil, err
Expand All @@ -1142,7 +1142,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
return r, nil
}

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints) ([]string, error) {
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelNamesStream(ctx, req)
Expand All @@ -1164,11 +1164,11 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,

return allLabelNames, nil
})
})
}, matchers...)
}

// LabelNames returns all the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints) ([]string, error) {
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelNames(ctx, req)
Expand All @@ -1177,7 +1177,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint
}
return resp.LabelNames, nil
})
})
}, matchers...)
}

// MetricsForLabelMatchers gets the metrics that match said matchers
Expand Down
30 changes: 30 additions & 0 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,36 @@ func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int,
return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
}

// ToLabelNamesRequest builds a LabelNamesRequest proto
func ToLabelNamesRequest(from, to model.Time, limit int, matchers []*labels.Matcher) (*LabelNamesRequest, error) {
ms, err := toLabelMatchers(matchers)
if err != nil {
return nil, err
}

return &LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Matchers: &LabelMatchers{Matchers: ms},
Limit: int64(limit),
}, nil
}

// FromLabelNamesRequest unpacks a LabelNamesRequest proto
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) {
var err error
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return 0, 0, 0, nil, err
}
}

return req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
}

func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
result := make([]*LabelMatcher, 0, len(matchers))
for _, matcher := range matchers {
Expand Down
Loading
Loading