diff --git a/api/errors.go b/api/errors.go new file mode 100644 index 0000000000..701280b51e --- /dev/null +++ b/api/errors.go @@ -0,0 +1,39 @@ +package api + +import ( + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// The canonical errors from the EigenDA gRPC API endpoints. +// +// Notes: +// - Start the error space small (but sufficient), and expand when there is an important +// failure case to separate out. +// - Avoid simply wrapping system-internal errors without checking if they are appropriate +// in user-facing errors defined here. Consider map and convert system-internal errors +// before return to users from APIs. + +func NewGRPCError(code codes.Code, msg string) error { + return status.Errorf(code, msg) +} + +// HTTP Mapping: 400 Bad Request +func NewInvalidArgError(msg string) error { + return NewGRPCError(codes.InvalidArgument, msg) +} + +// HTTP Mapping: 404 Not Found +func NewNotFoundError(msg string) error { + return NewGRPCError(codes.NotFound, msg) +} + +// HTTP Mapping: 429 Too Many Requests +func NewResourceExhaustedError(msg string) error { + return NewGRPCError(codes.ResourceExhausted, msg) +} + +// HTTP Mapping: 500 Internal Server Error +func NewInternalError(msg string) error { + return NewGRPCError(codes.Internal, msg) +} diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index d2a89f56d8..d8831ec3ef 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/Layr-Labs/eigenda/api" commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser" "github.com/Layr-Labs/eigenda/common" @@ -93,12 +94,12 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse // Process disperse_request in, err := stream.Recv() if err != nil { - return fmt.Errorf("error receiving next message: %v", err) + return api.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err)) } request, ok := in.Payload.(*pb.AuthenticatedRequest_DisperseRequest) if !ok { - return errors.New("expected DisperseBlobRequest") + return api.NewInvalidArgError("missing DisperseBlobRequest") } blob := getBlobFromRequest(request.DisperseRequest) @@ -107,12 +108,12 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse // Decode public key publicKeyBytes, err := hexutil.Decode(blob.RequestHeader.AccountID) if err != nil { - return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) + return api.NewInvalidArgError(fmt.Sprintf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err)) } pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes) if err != nil { - return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) + return api.NewInvalidArgError(fmt.Sprintf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err)) } authenticatedAddress := crypto.PubkeyToAddress(*pubKey).String() @@ -131,12 +132,12 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse // Recieve challenge_reply in, err = stream.Recv() if err != nil { - return fmt.Errorf("error receiving next message: %v", err) + return api.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err)) } challengeReply, ok := in.Payload.(*pb.AuthenticatedRequest_AuthenticationData) if !ok { - return errors.New("expected AuthenticationData") + return api.NewInvalidArgError("expected AuthenticationData") } blob.RequestHeader.Nonce = challenge @@ -144,7 +145,7 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse err = s.authenticator.AuthenticateBlobRequest(blob.RequestHeader.BlobAuthHeader) if err != nil { - return fmt.Errorf("failed to authenticate blob request: %v", err) + return api.NewInvalidArgError(fmt.Sprintf("failed to authenticate blob request: %v", err)) } // Disperse the blob @@ -247,7 +248,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut quorumId := string(param.QuorumID) s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob") } - return nil, err + return nil, api.NewInvalidArgError(err.Error()) } s.logger.Debug("received a new blob request", "origin", origin, "securityParams", strings.Join(securityParamsStrings, ", ")) @@ -258,7 +259,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut quorumId := string(param.QuorumID) s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob") } - return nil, err + return nil, api.NewInvalidArgError(err.Error()) } if s.ratelimiter != nil { @@ -274,7 +275,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut s.metrics.HandleFailedRequest(quorumId, blobSize, "DisperseBlob") } } - return nil, err + return nil, api.NewResourceExhaustedError(err.Error()) } } @@ -286,7 +287,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut s.metrics.HandleBlobStoreFailedRequest(quorumId, blobSize, "DisperseBlob") } s.logger.Error("failed to store blob", "err", err) - return nil, errors.New("failed to store blob, please try again later") + return nil, api.NewInternalError("failed to store blob, please try again later") } for _, param := range securityParams { @@ -443,24 +444,25 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR requestID := req.GetRequestId() if len(requestID) == 0 { - return nil, errors.New("invalid request: request_id must not be empty") + return nil, api.NewInvalidArgError("request_id must not be empty") } s.logger.Info("received a new blob status request", "requestID", string(requestID)) metadataKey, err := disperser.ParseBlobKey(string(requestID)) if err != nil { - return nil, err + return nil, api.NewInvalidArgError(fmt.Sprintf("failed to parse the requestID: %s", err.Error())) } s.logger.Debug("metadataKey", "metadataKey", metadataKey.String()) metadata, err := s.blobStore.GetBlobMetadata(ctx, metadataKey) if err != nil { - return nil, err + // TODO: we need to distinguish NOT_FOUND from actual internal error. + return nil, api.NewInternalError(fmt.Sprintf("failed to get blob metadata, blobkey: %s", metadataKey.String())) } isConfirmed, err := metadata.IsConfirmed() if err != nil { - return nil, err + return nil, api.NewInternalError(fmt.Sprintf("missing confirmation information: %s", err.Error())) } s.logger.Debug("isConfirmed", "metadata", metadata, "isConfirmed", isConfirmed) @@ -556,7 +558,8 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob s.logger.Error("Failed to retrieve blob metadata", "err", err) s.metrics.IncrementFailedBlobRequestNum("", "RetrieveBlob") - return nil, err + // TODO: we need to distinguish NOT_FOUND from actual internal error. + return nil, api.NewInternalError("failed to get blob metadata, please retry") } data, err := s.blobStore.GetBlobContent(ctx, blobMetadata.BlobHash) @@ -564,7 +567,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob s.logger.Error("Failed to retrieve blob", "err", err) s.metrics.HandleFailedRequest("", len(data), "RetrieveBlob") - return nil, err + return nil, api.NewInternalError("failed to get blob data, please retry") } s.metrics.HandleSuccessfulRequest("", len(data), "RetrieveBlob") diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index ff596faaf7..41c0dc238d 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -86,7 +86,7 @@ func TestDisperseBlobWithInvalidQuorum(t *testing.T) { }, }, }) - assert.ErrorContains(t, err, "invalid request: the quorum_id must be in range [0, 1], but found 2") + assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = invalid request: the quorum_id must be in range [0, 1], but found 2") _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ Data: data, @@ -103,7 +103,7 @@ func TestDisperseBlobWithInvalidQuorum(t *testing.T) { }, }, }) - assert.ErrorContains(t, err, "invalid request: security_params must not contain duplicate quorum_id") + assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = invalid request: security_params must not contain duplicate quorum_id") } func TestGetBlobStatus(t *testing.T) { @@ -246,7 +246,9 @@ func TestRetrieveBlobFailsWhenBlobNotConfirmed(t *testing.T) { // Try to retrieve the blob before it is confirmed _, err = retrieveBlob(t, dispersalServer, 2) - assert.Error(t, err) + assert.NotNil(t, err) + assert.Equal(t, err.Error(), "rpc error: code = Internal desc = failed to get blob metadata, please retry") + } func TestDisperseBlobWithExceedSizeLimit(t *testing.T) { @@ -277,7 +279,7 @@ func TestDisperseBlobWithExceedSizeLimit(t *testing.T) { }, }) assert.NotNil(t, err) - assert.Equal(t, err.Error(), "blob size cannot exceed 2 MiB") + assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = blob size cannot exceed 2 MiB") } func setup(m *testing.M) { diff --git a/operators/churner/churner.go b/operators/churner/churner.go index 3471e20f9d..93f4b7c858 100644 --- a/operators/churner/churner.go +++ b/operators/churner/churner.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/Layr-Labs/eigenda/api" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/eth" @@ -229,7 +230,7 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op // register needs to have 1.1 times the stake of the lowest-stake operator. if new(big.Int).Mul(lowestStake, churnBIPsOfOperatorStake).Cmp(new(big.Int).Mul(operatorToRegisterStake, bipMultiplier)) >= 0 { c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToRegister) - return nil, fmt.Errorf("registering operator must have %f%% more than the stake of the lowest-stake operator. Stake of registering operator: %d, stake of lowest-stake operator: %d, quorum ID: %d", float64(operatorSetParams.ChurnBIPsOfOperatorStake)/100.0-100.0, operatorToRegisterStake, lowestStake, quorumID) + return nil, api.NewInvalidArgError(fmt.Sprintf("registering operator must have %f%% more than the stake of the lowest-stake operator. Stake of registering operator: %d, stake of lowest-stake operator: %d, quorum ID: %d", float64(operatorSetParams.ChurnBIPsOfOperatorStake)/100.0-100.0, operatorToRegisterStake, lowestStake, quorumID)) } // verify the lowest stake against the total stake @@ -241,7 +242,7 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op // stake. if new(big.Int).Mul(lowestStake, bipMultiplier).Cmp(new(big.Int).Mul(totalStake, churnBIPsOfTotalStake)) >= 0 { c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToChurn) - return nil, fmt.Errorf("operator to churn out must have less than %f%% of the total stake. Stake of the operator to churn: %d, total stake in quorum: %d, quorum ID: %d", float64(operatorSetParams.ChurnBIPsOfTotalStake)/100.0, lowestStake, totalStake, quorumID) + return nil, api.NewInvalidArgError(fmt.Sprintf("operator to churn out must have less than %f%% of the total stake. Stake of the operator to churn: %d, total stake in quorum: %d, quorum ID: %d", float64(operatorSetParams.ChurnBIPsOfTotalStake)/100.0, lowestStake, totalStake, quorumID)) } operatorToChurnAddress, err := c.Transactor.OperatorIDToAddress(ctx, lowestStakeOperatorId) diff --git a/operators/churner/server.go b/operators/churner/server.go index 6ef7cc0b4d..cfb64a790d 100644 --- a/operators/churner/server.go +++ b/operators/churner/server.go @@ -6,11 +6,13 @@ import ( "fmt" "time" + "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/churner" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc/status" ) type Server struct { @@ -57,7 +59,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl err := s.validateChurnRequest(ctx, req) if err != nil { s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidRequest) - return nil, fmt.Errorf("invalid request: %w", err) + return nil, api.NewInvalidArgError(fmt.Sprintf("invalid request: %s", err.Error())) } timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { @@ -67,10 +69,10 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds()) now := time.Now() - // check that we are after the previous approval's expiry + // Global rate limiting: check that we are after the previous approval's expiry if now.Unix() < s.latestExpiry { s.metrics.IncrementFailedRequestNum("Churn", FailReasonPrevApprovalNotExpired) - return nil, fmt.Errorf("previous approval not expired, retry in %d", s.latestExpiry-now.Unix()) + return nil, api.NewResourceExhaustedError(fmt.Sprintf("previous approval not expired, retry in %d", s.latestExpiry-now.Unix())) } request := createChurnRequest(req) @@ -78,20 +80,23 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl operatorToRegisterAddress, err := s.churner.VerifyRequestSignature(ctx, request) if err != nil { s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidSignature) - return nil, fmt.Errorf("failed to verify request signature: %w", err) + return nil, api.NewInvalidArgError(fmt.Sprintf("failed to verify request signature: %s", err.Error())) } - // check if the request should be rate limited + // Per-operator rate limiting: check if the request should be rate limited err = s.checkShouldBeRateLimited(now, *request) if err != nil { s.metrics.IncrementFailedRequestNum("Churn", FailReasonRateLimitExceeded) - return nil, fmt.Errorf("rate limiter error: %w", err) + return nil, api.NewResourceExhaustedError(fmt.Sprintf("rate limiter error: %s", err.Error())) } response, err := s.churner.ProcessChurnRequest(ctx, operatorToRegisterAddress, request) if err != nil { s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed) - return nil, fmt.Errorf("failed to process churn request: %w", err) + if _, ok := status.FromError(err); ok { + return nil, err + } + return nil, api.NewInternalError(fmt.Sprintf("failed to process churn request: %s", err.Error())) } // update the latest expiry diff --git a/operators/churner/server_test.go b/operators/churner/server_test.go index d641f7d024..4c95421e1e 100644 --- a/operators/churner/server_test.go +++ b/operators/churner/server_test.go @@ -90,7 +90,8 @@ func TestChurn(t *testing.T) { // retry prior to expiry should fail _, err = s.Churn(ctx, request) - assert.ErrorContains(t, err, "previous approval not expired, retry in") + assert.NotNil(t, err) + assert.Equal(t, err.Error(), "rpc error: code = ResourceExhausted desc = previous approval not expired, retry in 90") } func TestChurnWithInvalidQuorum(t *testing.T) { @@ -122,7 +123,8 @@ func TestChurnWithInvalidQuorum(t *testing.T) { }, nil) _, err := s.Churn(ctx, request) - assert.ErrorContains(t, err, "invalid request: the quorum_id must be in range [0, 1], but found 2") + assert.NotNil(t, err) + assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = invalid request: invalid request: the quorum_id must be in range [0, 1], but found 2") } func setupMockTransactor() {