Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp metrics to account the status code #320

Merged
merged 6 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func NewDispersalServer(
}

func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_DisperseBlobAuthenticatedServer) error {
s.metrics.IncrementBlobRequestNum("DisperseBlobAuthenticated")

// Process disperse_request
in, err := stream.Recv()
Expand Down Expand Up @@ -169,6 +170,7 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
}

func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) {
s.metrics.IncrementBlobRequestNum("DisperseBlob")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it would be more useful to emit this at the end instead of the beginning. Because right now IncrementBlobRequestNum has status code = total. Total should equal "success + failures" anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a time series for "total" will make the monitoring query easy, just 1 - 500 / total, and also more stable (no need to update when there is a new failure case to add here).

We currently have 4 failure codes; adding more will make the query itself more complex if "total" has to be expanded


blob := getBlobFromRequest(req)

Expand Down Expand Up @@ -246,7 +248,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
s.metrics.HandleFailedRequest("400", quorumId, blobSize, "DisperseBlob")
}
return nil, api.NewInvalidArgError(err.Error())
}
Expand All @@ -257,23 +259,18 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
if err != nil {
for _, param := range securityParams {
quorumId := string(param.QuorumID)
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
s.metrics.HandleFailedRequest("400", quorumId, blobSize, "DisperseBlob")
}
return nil, api.NewInvalidArgError(err.Error())
}

if s.ratelimiter != nil {
err := s.checkRateLimitsAndAddRates(ctx, blob, origin, authenticatedAddress)
if err != nil {
for _, param := range securityParams {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was wrong accounting: the checkRateLimitsAndAddRates will error out and return upon the first error (e.g. first quorum), but this accounting increments metrics for each quorum

quorumId := string(param.QuorumID)
if errors.Is(err, errSystemBlobRateLimit) || errors.Is(err, errSystemThroughputRateLimit) {
s.metrics.HandleSystemRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
} else if errors.Is(err, errAccountBlobRateLimit) || errors.Is(err, errAccountThroughputRateLimit) {
s.metrics.HandleAccountRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
} else {
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
}
rateLimited := errors.Is(err, errSystemBlobRateLimit) || errors.Is(err, errSystemThroughputRateLimit) || errors.Is(err, errAccountBlobRateLimit) || errors.Is(err, errAccountThroughputRateLimit)
if !rateLimited {
s.metrics.HandleFailedRequest("500", "", blobSize, "DisperseBlob")
return nil, api.NewInternalError(err.Error())
}
return nil, api.NewResourceExhaustedError(err.Error())
}
Expand Down Expand Up @@ -369,6 +366,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
for i, param := range blob.RequestHeader.SecurityParams {

rates, ok := s.rateConfig.QuorumRateInfos[param.QuorumID]
quorumId := string(param.QuorumID)
if !ok {
return fmt.Errorf("no configured rate exists for quorum %d", param.QuorumID)
}
Expand All @@ -394,6 +392,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
}
if !allowed {
s.logger.Warn("system byte ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", rates.TotalUnauthThroughput)
s.metrics.HandleSystemRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
return errSystemThroughputRateLimit
}

Expand All @@ -404,6 +403,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
}
if !allowed {
s.logger.Warn("system blob ratelimit exceeded", "systemQuorumKey", systemQuorumKey, "rate", float32(rates.TotalUnauthBlobRate)/blobRateMultiplier)
s.metrics.HandleSystemRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
return errSystemBlobRateLimit
}

Expand All @@ -416,6 +416,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
}
if !allowed {
s.logger.Warn("account byte ratelimit exceeded", "accountQuorumKey", accountQuorumKey, "rate", accountRates.Throughput)
s.metrics.HandleAccountRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
return errAccountThroughputRateLimit
}

Expand All @@ -426,6 +427,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *
}
if !allowed {
s.logger.Warn("account blob ratelimit exceeded", "accountQuorumKey", accountQuorumKey, "rate", float32(accountRates.BlobRate)/blobRateMultiplier)
s.metrics.HandleAccountRateLimitedRequest(quorumId, blobSize, "DisperseBlob")
return errAccountBlobRateLimit
}

Expand All @@ -442,6 +444,8 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR
}))
defer timer.ObserveDuration()

s.metrics.IncrementBlobRequestNum("GetBlobStatus")

requestID := req.GetRequestId()
if len(requestID) == 0 {
return nil, api.NewInvalidArgError("request_id must not be empty")
Expand Down Expand Up @@ -544,6 +548,8 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
}))
defer timer.ObserveDuration()

s.metrics.IncrementBlobRequestNum("RetrieveBlob")

s.logger.Info("received a new blob retrieval request", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex)

batchHeaderHash := req.GetBatchHeaderHash()
Expand All @@ -556,17 +562,15 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex)
if err != nil {
s.logger.Error("Failed to retrieve blob metadata", "err", err)
s.metrics.IncrementFailedBlobRequestNum("", "RetrieveBlob")

// TODO: we need to distinguish NOT_FOUND from actual internal error.
s.metrics.IncrementFailedBlobRequestNum("500", "", "RetrieveBlob")
return nil, api.NewInternalError("failed to get blob metadata, please retry")
}

data, err := s.blobStore.GetBlobContent(ctx, blobMetadata.BlobHash)
if err != nil {
s.logger.Error("Failed to retrieve blob", "err", err)
s.metrics.HandleFailedRequest("", len(data), "RetrieveBlob")

s.metrics.HandleFailedRequest("500", "", len(data), "RetrieveBlob")
return nil, api.NewInternalError("failed to get blob data, please retry")
}

Expand Down
53 changes: 34 additions & 19 deletions disperser/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
Name: "requests_total",
Help: "the number of blob requests",
},
[]string{"status", "quorum", "method"},
[]string{"status_code", "status", "quorum", "method"},
),
BlobSize: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -82,9 +82,10 @@ func (g *Metrics) ObserveLatency(method string, latencyMs float64) {
// IncrementSuccessfulBlobRequestNum increments the number of successful blob requests
func (g *Metrics) IncrementSuccessfulBlobRequestNum(quorum string, method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status": "success",
"quorum": quorum,
"method": method,
"status_code": "200",
"status": "success",
"quorum": quorum,
"method": method,
}).Inc()
}

Expand All @@ -98,18 +99,29 @@ func (g *Metrics) HandleSuccessfulRequest(quorum string, blobBytes int, method s
}).Add(float64(blobBytes))
}

// IncrementBlobRequestNum increments the number of blob requests received
func (g *Metrics) IncrementBlobRequestNum(method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status_code": "total",
"status": "total",
"quorum": "",
"method": method,
}).Inc()
}

// IncrementFailedBlobRequestNum increments the number of failed blob requests
func (g *Metrics) IncrementFailedBlobRequestNum(quorum string, method string) {
func (g *Metrics) IncrementFailedBlobRequestNum(statusCode string, quorum string, method string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have these http status codes typed somewhere and use that type instead of taking raw string. Something like

type HttpStatusCode string
const (
  SuccessHttpStatusCode = "200"
  ...
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HTTP code is already quite standard (probably it's the standard)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. My point isn't whether it's the standard or not. It's that we shouldn't take raw string in this method.
Why don't we use existing consts defined in standard library? https://go.dev/src/net/http/status.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same reason why quorum isn't a uint8 which you don't even need a library.

g.NumBlobRequests.With(prometheus.Labels{
"status": "failed",
"quorum": quorum,
"method": method,
"status_code": statusCode,
"status": "failed",
"quorum": quorum,
"method": method,
}).Inc()
}

// HandleFailedRequest updates the number of failed requests and the size of the blob
func (g *Metrics) HandleFailedRequest(quorum string, blobBytes int, method string) {
g.IncrementFailedBlobRequestNum(quorum, method)
func (g *Metrics) HandleFailedRequest(statusCode string, quorum string, blobBytes int, method string) {
g.IncrementFailedBlobRequestNum(statusCode, quorum, method)
g.BlobSize.With(prometheus.Labels{
"status": "failed",
"quorum": quorum,
Expand All @@ -120,9 +132,10 @@ func (g *Metrics) HandleFailedRequest(quorum string, blobBytes int, method strin
// HandleBlobStoreFailedRequest updates the number of requests failed to store blob and the size of the blob
func (g *Metrics) HandleBlobStoreFailedRequest(quorum string, blobBytes int, method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status": StoreBlobFailure,
"quorum": quorum,
"method": method,
"status_code": "500",
"status": StoreBlobFailure,
"quorum": quorum,
"method": method,
}).Inc()
g.BlobSize.With(prometheus.Labels{
"status": StoreBlobFailure,
Expand All @@ -134,9 +147,10 @@ func (g *Metrics) HandleBlobStoreFailedRequest(quorum string, blobBytes int, met
// HandleSystemRateLimitedRequest updates the number of system rate limited requests and the size of the blob
func (g *Metrics) HandleSystemRateLimitedRequest(quorum string, blobBytes int, method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status": SystemRateLimitedFailure,
"quorum": quorum,
"method": method,
"status_code": "429",
"status": SystemRateLimitedFailure,
"quorum": quorum,
"method": method,
}).Inc()
g.BlobSize.With(prometheus.Labels{
"status": SystemRateLimitedFailure,
Expand All @@ -148,9 +162,10 @@ func (g *Metrics) HandleSystemRateLimitedRequest(quorum string, blobBytes int, m
// HandleAccountRateLimitedRequest updates the number of account rate limited requests and the size of the blob
func (g *Metrics) HandleAccountRateLimitedRequest(quorum string, blobBytes int, method string) {
g.NumBlobRequests.With(prometheus.Labels{
"status": AccountRateLimitedFailure,
"quorum": quorum,
"method": method,
"status_code": "429",
"status": AccountRateLimitedFailure,
"quorum": quorum,
"method": method,
}).Inc()
g.BlobSize.With(prometheus.Labels{
"status": AccountRateLimitedFailure,
Expand Down
2 changes: 1 addition & 1 deletion operators/churner/churner.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *churner) ProcessChurnRequest(ctx context.Context, operatorToRegisterAdd
for _, quorumID := range churnRequest.QuorumIDs {
for _, quorumIDAlreadyRegisteredFor := range quorumIDsAlreadyRegisteredFor {
if quorumIDAlreadyRegisteredFor == quorumID {
return nil, errors.New("operator is already registered in quorum")
return nil, api.NewInvalidArgError("operator is already registered in quorum")
}
}
}
Expand Down
31 changes: 30 additions & 1 deletion operators/churner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type FailReason string

// Note: failure reason constants must be maintained in sync with statusCodeMap.
const (
FailReasonRateLimitExceeded FailReason = "rate_limit_exceeded" // Rate limited: per operator rate limiting
FailReasonInsufficientStakeToRegister FailReason = "insufficient_stake_to_register" // Operator doesn't have enough stake to be registered
Expand All @@ -25,6 +26,18 @@ const (
FailReasonInvalidRequest FailReason = "invalid_request" // Invalid request: request is malformed
)

// Note: statusCodeMap must be maintained in sync with failure reason constants.
var statusCodeMap map[FailReason]string = map[FailReason]string{
FailReasonRateLimitExceeded: "429",
FailReasonInsufficientStakeToRegister: "400",
FailReasonInsufficientStakeToChurn: "400",
FailReasonQuorumIdOutOfRange: "400",
FailReasonPrevApprovalNotExpired: "429",
FailReasonInvalidSignature: "400",
FailReasonProcessChurnRequestFailed: "500",
FailReasonInvalidRequest: "400",
}

type MetricsConfig struct {
HTTPPort string
EnableMetrics bool
Expand Down Expand Up @@ -76,6 +89,15 @@ func (g *Metrics) ObserveLatency(method string, latencyMs float64) {
g.Latency.WithLabelValues(method).Observe(latencyMs)
}

// IncrementRequestNum increments the number of successful requests
func (g *Metrics) IncrementRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
"status": "total",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here, feel like we shouldn't track total because we can compute it from failed + success.

"method": method,
"reason": "",
}).Inc()
}

// IncrementSuccessfulRequestNum increments the number of successful requests
func (g *Metrics) IncrementSuccessfulRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
Expand All @@ -87,8 +109,15 @@ func (g *Metrics) IncrementSuccessfulRequestNum(method string) {

// IncrementFailedRequestNum increments the number of failed requests
func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) {
code, ok := statusCodeMap[reason]
if !ok {
g.logger.Error("cannot map failure reason to status code", "failure reason", reason)
// Treat this as an internal server error. This is a conservative approach to
// handle a negligence of mapping from failure reason to status code.
code = "500"
}
g.NumRequests.With(prometheus.Labels{
"status": "failed",
"status": code,
"reason": string(reason),
"method": method,
}).Inc()
Expand Down
3 changes: 2 additions & 1 deletion operators/churner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (s *Server) Start(metricsConfig MetricsConfig) error {
}

func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnReply, error) {
s.metrics.IncrementRequestNum("Churn")

err := s.validateChurnRequest(ctx, req)
if err != nil {
Expand Down Expand Up @@ -92,10 +93,10 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl

response, err := s.churner.ProcessChurnRequest(ctx, operatorToRegisterAddress, request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed)
if _, ok := status.FromError(err); ok {
return nil, err
}
s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed)
return nil, api.NewInternalError(fmt.Sprintf("failed to process churn request: %s", err.Error()))
}

Expand Down
Loading