Skip to content

Commit

Permalink
Add matchers to LabelNames() ingester RPC (cortexproject#6209)
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 authored Sep 23, 2024
1 parent 8b4630a commit 820c3bf
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 164 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [ENHANCEMENT] Ingester: Add new API `/ingester/all_user_stats` which shows loaded blocks, active timeseries and ingestion rate for a specific ingester. #6178
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to track the number of histogram samples which resolution was reduced. #6182
* [ENHANCEMENT] StoreGateway: Implement metadata API limit in queryable. #6195
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209

## 1.18.0 2024-09-03

Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ querier:
# CLI flag: -querier.ingester-metadata-streaming
[ingester_metadata_streaming: <boolean> | default = true]

# Use LabelNames ingester RPCs with match params.
# CLI flag: -querier.ingester-label-names-with-matchers
[ingester_label_names_with_matchers: <boolean> | default = false]

# Maximum number of samples a single query can load into memory.
# CLI flag: -querier.max-samples
[max_samples: <int> | default = 50000000]
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3701,6 +3701,10 @@ The `querier_config` configures the Cortex querier.
# CLI flag: -querier.ingester-metadata-streaming
[ingester_metadata_streaming: <boolean> | default = true]
# Use LabelNames ingester RPCs with match params.
# CLI flag: -querier.ingester-label-names-with-matchers
[ingester_label_names_with_matchers: <boolean> | default = false]
# Maximum number of samples a single query can load into memory.
# CLI flag: -querier.max-samples
[max_samples: <int> | default = 50000000]
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
}, matchers...)
}

func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelNames", opentracing.Tags{
"start": from.Unix(),
"end": to.Unix(),
Expand All @@ -1123,11 +1123,11 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
}

limit := getLimitFromLabelHints(hints)
req := &ingester_client.LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Limit: int64(limit),
req, err := ingester_client.ToLabelNamesRequest(from, to, limit, matchers)
if err != nil {
return nil, err
}

resps, err := f(ctx, replicationSet, req)
if err != nil {
return nil, err
Expand All @@ -1152,7 +1152,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
return r, nil
}

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints) ([]string, error) {
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelNamesStream(ctx, req)
Expand All @@ -1174,11 +1174,11 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,

return allLabelNames, nil
})
})
}, matchers...)
}

// LabelNames returns all the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints) ([]string, error) {
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelNames(ctx, req)
Expand All @@ -1187,7 +1187,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint
}
return resp.LabelNames, nil
})
})
}, matchers...)
}

// MetricsForLabelMatchers gets the metrics that match said matchers
Expand Down
30 changes: 30 additions & 0 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,36 @@ func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int,
return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
}

// ToLabelNamesRequest builds a LabelNamesRequest proto
func ToLabelNamesRequest(from, to model.Time, limit int, matchers []*labels.Matcher) (*LabelNamesRequest, error) {
ms, err := toLabelMatchers(matchers)
if err != nil {
return nil, err
}

return &LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Matchers: &LabelMatchers{Matchers: ms},
Limit: int64(limit),
}, nil
}

// FromLabelNamesRequest unpacks a LabelNamesRequest proto
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) {
var err error
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return 0, 0, 0, nil, err
}
}

return req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
}

func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
result := make([]*LabelMatcher, 0, len(matchers))
for _, matcher := range matchers {
Expand Down
Loading

0 comments on commit 820c3bf

Please sign in to comment.