Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standardize the error codes from API endpoints #317

Merged
merged 8 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions common/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package common
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just go in the api package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! The only question is api is currently all protobuf stuff, it otherwise fits better in api since these are all around APIs. I'll move it over unless other folks have different thought/suggestion.


import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// The canonical errors from the EigenDA gRPC API endpoints.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need one for timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may add if it's becoming a more clear case to cover

//
// Notes:
// - Start the error space small (but sufficient), and expand when there is an important
// failure case to separate out.
// - Avoid simply wrapping system-internal errors without checking if they are appropriate
// in user-facing errors defined here. Consider map and convert system-internal errors
// before return to users from APIs.

func NewGRPCError(code codes.Code, msg string) error {
return status.Errorf(code, msg)
}

// HTTP Mapping: 400 Bad Request
func NewInvalidArgError(msg string) error {
return NewGRPCError(codes.InvalidArgument, msg)
}

// HTTP Mapping: 404 Not Found
func NewNotFoundError(msg string) error {
return NewGRPCError(codes.NotFound, msg)
}

// HTTP Mapping: 429 Too Many Requests
func NewResourceExhaustedError(msg string) error {
return NewGRPCError(codes.ResourceExhausted, msg)
}

// HTTP Mapping: 500 Internal Server Error
func NewInternalError(msg string) error {
return NewGRPCError(codes.Internal, msg)
}
36 changes: 19 additions & 17 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
// Process disperse_request
in, err := stream.Recv()
if err != nil {
return fmt.Errorf("error receiving next message: %v", err)
return common.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this really due to invalid argument? Feel like stream.Revc() breaking could be due to pod dying in the middle or something like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, this may be not a clear cut whether it's client side or server side issue, as it may be caused by various reasons, like network connection, malformed message as well as server side resource issue etc.
I think overall it may fit better as client-side, as this is still at the gate of the system at this point.

}

request, ok := in.Payload.(*pb.AuthenticatedRequest_DisperseRequest)
if !ok {
return errors.New("expected DisperseBlobRequest")
return common.NewInvalidArgError("missing DisperseBlobRequest")
}

blob := getBlobFromRequest(request.DisperseRequest)
Expand All @@ -107,12 +107,12 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
// Decode public key
publicKeyBytes, err := hexutil.Decode(blob.RequestHeader.AccountID)
if err != nil {
return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err)
return common.NewInvalidArgError(fmt.Sprintf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err))
}

pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes)
if err != nil {
return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err)
return common.NewInvalidArgError(fmt.Sprintf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err))
}

authenticatedAddress := crypto.PubkeyToAddress(*pubKey).String()
Expand All @@ -131,20 +131,20 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse
// Recieve challenge_reply
in, err = stream.Recv()
if err != nil {
return fmt.Errorf("error receiving next message: %v", err)
return common.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err))
}

challengeReply, ok := in.Payload.(*pb.AuthenticatedRequest_AuthenticationData)
if !ok {
return errors.New("expected AuthenticationData")
return common.NewInvalidArgError("expected AuthenticationData")
}

blob.RequestHeader.Nonce = challenge
blob.RequestHeader.AuthenticationData = challengeReply.AuthenticationData.AuthenticationData

err = s.authenticator.AuthenticateBlobRequest(blob.RequestHeader.BlobAuthHeader)
if err != nil {
return fmt.Errorf("failed to authenticate blob request: %v", err)
return common.NewInvalidArgError(fmt.Sprintf("failed to authenticate blob request: %v", err))
}

// Disperse the blob
Expand Down Expand Up @@ -247,7 +247,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
quorumId := string(param.QuorumID)
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
}
return nil, err
return nil, common.NewInvalidArgError(err.Error())
}

s.logger.Debug("received a new blob request", "origin", origin, "securityParams", strings.Join(securityParamsStrings, ", "))
Expand All @@ -258,7 +258,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
quorumId := string(param.QuorumID)
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
}
return nil, err
return nil, common.NewInvalidArgError(err.Error())
}

if s.ratelimiter != nil {
Expand All @@ -274,7 +274,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob")
}
}
return nil, err
return nil, common.NewResourceExhaustedError(err.Error())
}
}

Expand All @@ -286,7 +286,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
s.metrics.HandleBlobStoreFailedRequest(quorumId, blobSize, "DisperseBlob")
}
s.logger.Error("failed to store blob", "err", err)
return nil, errors.New("failed to store blob, please try again later")
return nil, common.NewInternalError("failed to store blob, please try again later")
}

for _, param := range securityParams {
Expand Down Expand Up @@ -443,24 +443,25 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR

requestID := req.GetRequestId()
if len(requestID) == 0 {
return nil, errors.New("invalid request: request_id must not be empty")
return nil, common.NewInvalidArgError("request_id must not be empty")
}

s.logger.Info("received a new blob status request", "requestID", string(requestID))
metadataKey, err := disperser.ParseBlobKey(string(requestID))
if err != nil {
return nil, err
return nil, common.NewInvalidArgError(fmt.Sprintf("failed to parse the requestID: %s", err.Error()))
}

s.logger.Debug("metadataKey", "metadataKey", metadataKey.String())
metadata, err := s.blobStore.GetBlobMetadata(ctx, metadataKey)
if err != nil {
return nil, err
// TODO: we need to distinguish NOT_FOUND from actual internal error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would probably need to standarize the error message and then check that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we could check the message, or surface an error with NewNotFoundError inside the GetBlobMetaata (this approach was used in the Churner, L95 in churner/server.go)

return nil, common.NewInternalError(fmt.Sprintf("failed to get blob metadata, blobkey: %s", metadataKey.String()))
}

isConfirmed, err := metadata.IsConfirmed()
if err != nil {
return nil, err
return nil, common.NewInternalError(fmt.Sprintf("missing confirmation information: %s", err.Error()))
}

s.logger.Debug("isConfirmed", "metadata", metadata, "isConfirmed", isConfirmed)
Expand Down Expand Up @@ -556,15 +557,16 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
s.logger.Error("Failed to retrieve blob metadata", "err", err)
s.metrics.IncrementFailedBlobRequestNum("", "RetrieveBlob")

return nil, err
// TODO: we need to distinguish NOT_FOUND from actual internal error.
return nil, common.NewInternalError("failed to get blob metadata, please retry")
}

data, err := s.blobStore.GetBlobContent(ctx, blobMetadata.BlobHash)
if err != nil {
s.logger.Error("Failed to retrieve blob", "err", err)
s.metrics.HandleFailedRequest("", len(data), "RetrieveBlob")

return nil, err
return nil, common.NewInternalError("failed to get blob data, please retry")
}

s.metrics.HandleSuccessfulRequest("", len(data), "RetrieveBlob")
Expand Down
10 changes: 6 additions & 4 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestDisperseBlobWithInvalidQuorum(t *testing.T) {
},
},
})
assert.ErrorContains(t, err, "invalid request: the quorum_id must be in range [0, 1], but found 2")
assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = invalid request: the quorum_id must be in range [0, 1], but found 2")

_, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{
Data: data,
Expand All @@ -103,7 +103,7 @@ func TestDisperseBlobWithInvalidQuorum(t *testing.T) {
},
},
})
assert.ErrorContains(t, err, "invalid request: security_params must not contain duplicate quorum_id")
assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = invalid request: security_params must not contain duplicate quorum_id")
}

func TestGetBlobStatus(t *testing.T) {
Expand Down Expand Up @@ -246,7 +246,9 @@ func TestRetrieveBlobFailsWhenBlobNotConfirmed(t *testing.T) {

// Try to retrieve the blob before it is confirmed
_, err = retrieveBlob(t, dispersalServer, 2)
assert.Error(t, err)
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "rpc error: code = Internal desc = failed to get blob metadata, please retry")

}

func TestDisperseBlobWithExceedSizeLimit(t *testing.T) {
Expand Down Expand Up @@ -277,7 +279,7 @@ func TestDisperseBlobWithExceedSizeLimit(t *testing.T) {
},
})
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "blob size cannot exceed 2 MiB")
assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = blob size cannot exceed 2 MiB")
}

func setup(m *testing.M) {
Expand Down
4 changes: 2 additions & 2 deletions operators/churner/churner.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op
// register needs to have 1.1 times the stake of the lowest-stake operator.
if new(big.Int).Mul(lowestStake, churnBIPsOfOperatorStake).Cmp(new(big.Int).Mul(operatorToRegisterStake, bipMultiplier)) >= 0 {
c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToRegister)
return nil, fmt.Errorf("registering operator must have %f%% more than the stake of the lowest-stake operator. Stake of registering operator: %d, stake of lowest-stake operator: %d, quorum ID: %d", float64(operatorSetParams.ChurnBIPsOfOperatorStake)/100.0-100.0, operatorToRegisterStake, lowestStake, quorumID)
return nil, common.NewInvalidArgError(fmt.Sprintf("registering operator must have %f%% more than the stake of the lowest-stake operator. Stake of registering operator: %d, stake of lowest-stake operator: %d, quorum ID: %d", float64(operatorSetParams.ChurnBIPsOfOperatorStake)/100.0-100.0, operatorToRegisterStake, lowestStake, quorumID))
}

// verify the lowest stake against the total stake
Expand All @@ -241,7 +241,7 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op
// stake.
if new(big.Int).Mul(lowestStake, bipMultiplier).Cmp(new(big.Int).Mul(totalStake, churnBIPsOfTotalStake)) >= 0 {
c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToChurn)
return nil, fmt.Errorf("operator to churn out must have less than %f%% of the total stake. Stake of the operator to churn: %d, total stake in quorum: %d, quorum ID: %d", float64(operatorSetParams.ChurnBIPsOfTotalStake)/100.0, lowestStake, totalStake, quorumID)
return nil, common.NewInvalidArgError(fmt.Sprintf("operator to churn out must have less than %f%% of the total stake. Stake of the operator to churn: %d, total stake in quorum: %d, quorum ID: %d", float64(operatorSetParams.ChurnBIPsOfTotalStake)/100.0, lowestStake, totalStake, quorumID))
}

operatorToChurnAddress, err := c.Transactor.OperatorIDToAddress(ctx, lowestStakeOperatorId)
Expand Down
18 changes: 11 additions & 7 deletions operators/churner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Layr-Labs/eigenda/core"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/status"
)

type Server struct {
Expand Down Expand Up @@ -57,7 +58,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl
err := s.validateChurnRequest(ctx, req)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidRequest)
return nil, fmt.Errorf("invalid request: %w", err)
return nil, common.NewInvalidArgError(fmt.Sprintf("invalid request: %s", err.Error()))
}

timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
Expand All @@ -67,31 +68,34 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl
s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds())

now := time.Now()
// check that we are after the previous approval's expiry
// Global rate limiting: check that we are after the previous approval's expiry
if now.Unix() < s.latestExpiry {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonPrevApprovalNotExpired)
return nil, fmt.Errorf("previous approval not expired, retry in %d", s.latestExpiry-now.Unix())
return nil, common.NewResourceExhaustedError(fmt.Sprintf("previous approval not expired, retry in %d", s.latestExpiry-now.Unix()))
}

request := createChurnRequest(req)

operatorToRegisterAddress, err := s.churner.VerifyRequestSignature(ctx, request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidSignature)
return nil, fmt.Errorf("failed to verify request signature: %w", err)
return nil, common.NewInvalidArgError(fmt.Sprintf("failed to verify request signature: %s", err.Error()))
}

// check if the request should be rate limited
// Per-operator rate limiting: check if the request should be rate limited
err = s.checkShouldBeRateLimited(now, *request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonRateLimitExceeded)
return nil, fmt.Errorf("rate limiter error: %w", err)
return nil, common.NewResourceExhaustedError(fmt.Sprintf("rate limiter error: %s", err.Error()))
}

response, err := s.churner.ProcessChurnRequest(ctx, operatorToRegisterAddress, request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed)
return nil, fmt.Errorf("failed to process churn request: %w", err)
if _, ok := status.FromError(err); ok {
return nil, err
}
return nil, common.NewInternalError(fmt.Sprintf("failed to process churn request: %s", err.Error()))
}

// update the latest expiry
Expand Down
6 changes: 4 additions & 2 deletions operators/churner/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func TestChurn(t *testing.T) {

// retry prior to expiry should fail
_, err = s.Churn(ctx, request)
assert.ErrorContains(t, err, "previous approval not expired, retry in")
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "rpc error: code = ResourceExhausted desc = previous approval not expired, retry in 90")
}

func TestChurnWithInvalidQuorum(t *testing.T) {
Expand Down Expand Up @@ -122,7 +123,8 @@ func TestChurnWithInvalidQuorum(t *testing.T) {
}, nil)

_, err := s.Churn(ctx, request)
assert.ErrorContains(t, err, "invalid request: the quorum_id must be in range [0, 1], but found 2")
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = invalid request: invalid request: the quorum_id must be in range [0, 1], but found 2")
}

func setupMockTransactor() {
Expand Down
Loading