Skip to content

Commit

Permalink
Add users stats http api to ingester (cortexproject#6178)
Browse files Browse the repository at this point in the history
* Add users stats http api to ingester

Signed-off-by: Daniel Deluiggi <[email protected]>

* Changelog

Signed-off-by: Daniel Deluiggi <[email protected]>

* Change name for loadedBlocks

Signed-off-by: Daniel Deluiggi <[email protected]>

---------

Signed-off-by: Daniel Deluiggi <[email protected]>
Signed-off-by: Daniel Blando <[email protected]>
  • Loading branch information
danielblando authored Sep 3, 2024
1 parent 15ad4de commit c6048fa
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 136 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
* [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163
* [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173
* [ENHANCEMENT] Ingester: Add new API `/ingester/all_user_stats` which shows loaded blocks, active timeseries and ingestion rate for a specific ingester. #6178
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to to track the number of histogram samples which resolution was reduced. #6182

## 1.18.0 in progress
Expand Down
12 changes: 11 additions & 1 deletion docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi
| [Flush blocks](#flush-blocks) | Ingester || `GET,POST /ingester/flush` |
| [Shutdown](#shutdown) | Ingester || `GET,POST /ingester/shutdown` |
| [Ingesters ring status](#ingesters-ring-status) | Ingester || `GET /ingester/ring` |
| [Ingester tenants stats](#ingester-tenants-stats) | Ingester || `GET /ingester/all_user_stats` |
| [Ingester mode](#ingester-mode) | Ingester || `GET,POST /ingester/mode` |
| [Instant query](#instant-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query` |
| [Range query](#range-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query_range` |
Expand Down Expand Up @@ -242,7 +243,7 @@ GET /distributor/all_user_stats
GET /all_user_stats
```

Displays a web page with per-tenant statistics updated in realtime, including the total number of active series across all ingesters and the current ingestion rate (samples / sec).
Displays a web page with per-tenant statistics updated in realtime, including the total number of loaded blocks and active series across all ingesters as well as the current ingestion rate (samples / sec).

### HA tracker status

Expand Down Expand Up @@ -297,6 +298,15 @@ GET /ring

Displays a web page with the ingesters hash ring status, including the state, healthy and last heartbeat time of each ingester.

### Ingester tenants stats

```
GET /ingester/all_user_stats
```

Displays a web page with per-tenant statistics updated in realtime, including the total number of loaded blocks and active series from a specific ingester as well as the current ingestion rate (samples / sec).

### Ingester mode

```
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ type Ingester interface {
FlushHandler(http.ResponseWriter, *http.Request)
ShutdownHandler(http.ResponseWriter, *http.Request)
RenewTokenHandler(http.ResponseWriter, *http.Request)
AllUserStatsHandler(http.ResponseWriter, *http.Request)
ModeHandler(http.ResponseWriter, *http.Request)
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
}
Expand All @@ -292,6 +293,8 @@ type Ingester interface {
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
client.RegisterIngesterServer(a.server.GRPC, i)

a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics")

a.indexPage.AddLink(SectionDangerous, "/ingester/flush", "Trigger a Flush of data from Ingester to storage")
a.indexPage.AddLink(SectionDangerous, "/ingester/shutdown", "Trigger Ingester Shutdown (Dangerous)")
a.indexPage.AddLink(SectionDangerous, "/ingester/renewTokens", "Renew Ingester Tokens (10%)")
Expand All @@ -300,6 +303,7 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.

Expand Down
34 changes: 21 additions & 13 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ha"
"github.com/cortexproject/cortex/pkg/ingester"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
Expand Down Expand Up @@ -1319,7 +1320,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad
}

// UserStats returns statistics about the current user.
func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
if err != nil {
return nil, err
Expand All @@ -1336,7 +1337,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
return nil, err
}

totalStats := &UserStats{}
totalStats := &ingester.UserStats{}
for _, resp := range resps {
r := resp.(*ingester_client.UserStatsResponse)
totalStats.IngestionRate += r.IngestionRate
Expand All @@ -1354,17 +1355,11 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
return totalStats, nil
}

// UserIDStats models ingestion statistics for one user, including the user ID
type UserIDStats struct {
UserID string `json:"userID"`
UserStats
}

// AllUserStats returns statistics about all users.
// Note it does not divide by the ReplicationFactor like UserStats()
func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, error) {
// Add up by user, across all responses from ingesters
perUserTotals := make(map[string]UserStats)
perUserTotals := make(map[string]ingester.UserStats)

req := &ingester_client.UserStatsRequest{}
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
Expand All @@ -1389,28 +1384,41 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
s.RuleIngestionRate += u.Data.RuleIngestionRate
s.NumSeries += u.Data.NumSeries
s.ActiveSeries += u.Data.ActiveSeries
s.LoadedBlocks += u.Data.LoadedBlocks
perUserTotals[u.UserId] = s
}
}

// Turn aggregated map into a slice for return
response := make([]UserIDStats, 0, len(perUserTotals))
response := make([]ingester.UserIDStats, 0, len(perUserTotals))
for id, stats := range perUserTotals {
response = append(response, UserIDStats{
response = append(response, ingester.UserIDStats{
UserID: id,
UserStats: UserStats{
UserStats: ingester.UserStats{
IngestionRate: stats.IngestionRate,
APIIngestionRate: stats.APIIngestionRate,
RuleIngestionRate: stats.RuleIngestionRate,
NumSeries: stats.NumSeries,
ActiveSeries: stats.ActiveSeries,
LoadedBlocks: stats.LoadedBlocks,
},
})
}

return response, nil
}

// AllUserStatsHandler shows stats for all users.
func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats, err := d.AllUserStats(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

ingester.AllUserStatsRender(w, r, stats, d.ingestersRing.ReplicationFactor())
}

func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if d.distributorsRing != nil {
d.distributorsRing.ServeHTTP(w, req)
Expand Down
9 changes: 0 additions & 9 deletions pkg/distributor/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@ import (
"github.com/cortexproject/cortex/pkg/util"
)

// UserStats models ingestion statistics for one user.
type UserStats struct {
IngestionRate float64 `json:"ingestionRate"`
NumSeries uint64 `json:"numSeries"`
APIIngestionRate float64 `json:"APIIngestionRate"`
RuleIngestionRate float64 `json:"RuleIngestionRate"`
ActiveSeries uint64 `json:"activeSeries"`
}

// UserStatsHandler handles user stats to the Distributor.
func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats, err := d.UserStats(r.Context())
Expand Down
Loading

0 comments on commit c6048fa

Please sign in to comment.