Skip to content

Commit

Permalink
refactor(eigenda-client): return grpc errors + one special ErrorFailo…
Browse files Browse the repository at this point in the history
…ver error (#843)
  • Loading branch information
samlaf authored Oct 29, 2024
1 parent d62c2ce commit 8eeb3e0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 119 deletions.
54 changes: 24 additions & 30 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/hex"
"fmt"
"net"
"net/http"
"time"

"github.com/Layr-Labs/eigenda/api"
Expand Down Expand Up @@ -145,9 +144,9 @@ func (m *EigenDAClient) GetBlob(ctx context.Context, batchHeaderHash []byte, blo
//
// PutBlob returned errors all implement the ErrorAPI interface, which allows the caller
// to determine whether the error is a client or server fault, as well as get its status code.
// A 503 error returned is used to signify that eigenda is temporarily unavailable,
// An api.ErrorFailover error returned is used to signify that eigenda is temporarily unavailable,
// and suggest to the caller (most likely some rollup batcher via the eigenda-proxy)
// to fallback to ethda for some amount of time. 3 reasons for returning 503:
// to fallback to ethda for some amount of time. 3 reasons for returning api.ErrorFailover:
// 1. Failed to put the blob in the disperser's queue (disperser is down)
// 2. Timed out before getting confirmed onchain (batcher is down)
// 3. Insufficient signatures (eigenda network is down)
Expand All @@ -168,25 +167,26 @@ func (m *EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperse
}
}

func (m *EigenDAClient) PutBlobAsync(ctx context.Context, data []byte) (resultChan chan *grpcdisperser.BlobInfo, errChan chan api.ErrorAPI) {
func (m *EigenDAClient) PutBlobAsync(ctx context.Context, data []byte) (resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
resultChan = make(chan *grpcdisperser.BlobInfo, 1)
errChan = make(chan api.ErrorAPI, 1)
errChan = make(chan error, 1)
go m.putBlob(ctx, data, resultChan, errChan)
return
}

func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan api.ErrorAPI) {
func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
m.Log.Info("Attempting to disperse blob to EigenDA")

// encode blob
if m.Codec == nil {
errChan <- api.NewErrorAPIGeneric(http.StatusBadRequest, fmt.Errorf("codec not initialized"))
errChan <- api.NewErrorInternal("codec not initialized")
return
}

data, err := m.Codec.EncodeBlob(rawData)
if err != nil {
errChan <- api.NewErrorAPIGeneric(http.StatusBadRequest, fmt.Errorf("error encoding blob: %w", err))
// Encode can only fail if there is something wrong with the data, so we return a 400 error
errChan <- api.NewErrorInvalidArg(fmt.Sprintf("error encoding blob: %v", err))
return
}

Expand All @@ -198,13 +198,9 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
// TODO: would be nice to add a trace-id key to the context, to be able to follow requests from batcher->proxy->eigenda
_, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
if err != nil {
errChan <- &api.ErrorAPIGeneric{
Err: fmt.Errorf("error submitting authenticated blob to disperser: %w", err),
// We set to unknown fault b/c disperser client returns a mix of 400s and 500s currently.
// TODO: update disperser client to also return ErrorAPIGeneric errors
Code: 0,
Fault: api.ErrorFaultUnknown,
}
// Disperser-client returned error is already a grpc error which can be a 400 (eg rate limited) or 500,
// so we wrap the error such that clients can still use grpc's status.FromError() function to get the status code.
errChan <- fmt.Errorf("error submitting authenticated blob to disperser: %w", err)
return
}

Expand All @@ -230,24 +226,22 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
// because all other statuses return immediately below.
//
// Assuming that the timeout is correctly set (long enough to both land onchain + finalize),
// 1. means that there is a problem with EigenDA, so we return 503 to let the batcher failover to ethda
// 1. means that there is a problem with EigenDA, so we return an ErrorFailover to let the batcher failover to ethda
// 2. means that there is a problem with Ethereum, so we return 500.
// batcher would most likely resubmit another blob, which is not ideal but there isn't much to be done...
// eigenDA v2 will have idempotency so one can just resubmit the same blob safely.
if latestBlobStatus == grpcdisperser.BlobStatus_PROCESSING || latestBlobStatus == grpcdisperser.BlobStatus_DISPERSING {
errChan <- api.NewErrorAPIGeneric(http.StatusServiceUnavailable,
fmt.Errorf("eigenda might be down. timed out waiting for blob to land onchain (request id=%s): %w", base64RequestID, ctx.Err()))
errChan <- api.NewErrorFailover(fmt.Errorf("eigenda might be down. timed out waiting for blob to land onchain (request id=%s): %w", base64RequestID, ctx.Err()))
} else if latestBlobStatus == grpcdisperser.BlobStatus_CONFIRMED {
// Timeout'ing in confirmed state means one of two things:
// 1. (if timeout was long enough to finalize in normal conditions): problem with ethereum, so we return 504
// 2. (if timeout was not long enough to finalize in normal conditions): eigenda-client is badly configured, should be a 408 (TODO)
errChan <- api.NewErrorAPIGeneric(http.StatusGatewayTimeout,
fmt.Errorf("timed out waiting for blob that landed onchain to finalize (request id=%s). "+
"Either timeout not long enough, or ethereum might be experiencing difficulties: %w. ", base64RequestID, ctx.Err()))
// 1. (if timeout was long enough to finalize in normal conditions): problem with ethereum, so we return 504 (DeadlineExceeded)
// 2. TODO: (if timeout was not long enough to finalize in normal conditions): eigenda-client is badly configured, should be a 400 (INVALID_ARGUMENT)
errChan <- api.NewErrorDeadlineExceeded(
fmt.Sprintf("timed out waiting for blob that landed onchain to finalize (request id=%s). "+
"Either timeout not long enough, or ethereum might be experiencing difficulties: %v. ", base64RequestID, ctx.Err()))
} else {
// this should not be reachable...
errChan <- api.NewErrorAPIGeneric(http.StatusInternalServerError,
fmt.Errorf("timed out in a state that shouldn't be possible (request id=%s): %w", base64RequestID, ctx.Err()))
errChan <- api.NewErrorInternal(fmt.Sprintf("timed out in a state that shouldn't be possible (request id=%s): %s", base64RequestID, ctx.Err()))
}
return
case <-ticker.C:
Expand All @@ -269,19 +263,19 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
case grpcdisperser.BlobStatus_FAILED:
// This can happen for a few reasons:
// 1. blob has expired, a client retrieve after 14 days. Sounds like 400 errors, but not sure this can happen during dispersal...
// 2. internal logic error while requesting encoding (shouldn't happen), but should probably return 503
// 2. internal logic error while requesting encoding (shouldn't happen), but should probably return api.ErrorFailover
// 3. wait for blob finalization from confirmation and blob retry has exceeded its limit.
// Probably from a chain re-org. See https://github.com/Layr-Labs/eigenda/blob/master/disperser/batcher/finalizer.go#L179-L189.
// So we should be returning 500 to force a blob resubmission (not eigenda's fault but until
// we have idempotency this is unfortunately the only solution)
// TODO: we should create new BlobStatus categories to separate these cases out. For now returning 500 is fine.
errChan <- api.NewErrorAPIGeneric(http.StatusInternalServerError, fmt.Errorf("blob dispersal (requestID=%s) reached failed status. please resubmit the blob.", base64RequestID))
errChan <- api.NewErrorInternal(fmt.Sprintf("blob dispersal (requestID=%s) reached failed status. please resubmit the blob.", base64RequestID))
return
case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES:
// Some quorum failed to sign the blob, indicating that the whole network is having issues.
// We hence return 503 to let the batcher failover to ethda. This could however be a very unlucky
// We hence return api.ErrorFailover to let the batcher failover to ethda. This could however be a very unlucky
// temporary issue, so the caller should retry at least one more time before failing over.
errChan <- api.NewErrorAPIGeneric(http.StatusServiceUnavailable, fmt.Errorf("blob dispersal (requestID=%s) failed with insufficient signatures. eigenda nodes are probably down.", base64RequestID))
errChan <- api.NewErrorFailover(fmt.Errorf("blob dispersal (requestID=%s) failed with insufficient signatures. eigenda nodes are probably down.", base64RequestID))
return
case grpcdisperser.BlobStatus_CONFIRMED:
if m.Config.WaitForFinalization {
Expand All @@ -304,7 +298,7 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
return
default:
// this should never happen. If it does, the blob is in a heisenberg state... it could either eventually get confirmed or fail
errChan <- api.NewErrorAPIGeneric(http.StatusInternalServerError, fmt.Errorf("unknown reply status %d. ask for assistance from EigenDA team, using requestID %s", statusRes.Status, base64RequestID))
errChan <- api.NewErrorInternal(fmt.Sprintf("unknown reply status %d. ask for assistance from EigenDA team, using requestID %s", statusRes.Status, base64RequestID))
return
}
}
Expand Down
130 changes: 41 additions & 89 deletions api/errors.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package api

import (
"fmt"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// The canonical errors from the EigenDA gRPC API endpoints.
//
// Notes:
// - We start with a small (but sufficient) subset of google's error code convention,
// - We start with a small (but sufficient) subset of grpc's error codes,
// and expand when there is an important failure case to separate out. See:
// https://cloud.google.com/apis/design/errors#handling_errors
// https://grpc.io/docs/guides/status-codes/
// - Make sure that internally propagated errors are eventually wrapped in one of the
// user-facing errors defined here, since grpc otherwise returns an UNKNOWN error code,
// which is harder to debug and understand for users.
// - See https://github.com/googleapis/googleapis/blob/ba8ea80f25d19bde8501cd51f314391f8d39bde8/google/rpc/code.proto
// for the mapping of grpc error codes to HTTP status codes.

func newErrorGRPC(code codes.Code, msg string) error {
return status.Error(code, msg)
Expand All @@ -41,104 +41,56 @@ func NewErrorInternal(msg string) error {
return newErrorGRPC(codes.Internal, msg)
}

// HTTP Mapping: 500 Internal Server Error
func NewErrorUnknown(msg string) error {
return newErrorGRPC(codes.Unknown, msg)
}

// HTTP Mapping: 501 Not Implemented
func NewErrorUnimplemented() error {
return newErrorGRPC(codes.Unimplemented, "not implemented")
}

// ==================================================================
// API CLIENT ERRORS
// Note: These errors are currently used by api/clients.
// Eventually it might be useful to use them across the entire codebase.
// ==================================================================

// Code below is adapted from https://github.com/aws/smithy-go/blob/main/errors.go
// HTTP Mapping: 504 Gateway Timeout
func NewErrorDeadlineExceeded(msg string) error {
return newErrorGRPC(codes.DeadlineExceeded, "msg")
}

// ErrorAPI is the most generic API and protocol agnostic error interface that we eventually
// want every api clients returned errors to implement.
// ErrorFailover is returned by the disperser-client and eigenda-client to signify
// that eigenda is temporarily unavailable, and suggest to the caller
// (most likely some rollup batcher via the eigenda-proxy) to failover
// to ethda for some amount of time.
// See https://github.com/ethereum-optimism/specs/issues/434 for more details.
//
// This way, consumers of the api clients can tell what kind of error they are dealing with,
// most broadly whether it is a client or server fault (see the ErrorFault type).
// Given that both clients already return grpc errors, we could potentially use
// a grpc UNAVAILABLE error instead, but we don't because:
// 1. UNAVAILABLE is typically used to tell the client to retry the request, not failover
// 2. the grpc framework itself also returns UNAVAILABLE errors in some cases, see:
// https://github.com/grpc/grpc-go/blob/192ee33f6fc0f07070eeaaa1d34e41746740e64c/codes/codes.go#L184.
// We could differentiate from those generated by the grpc framework by using error details, like
// https://github.com/grpc/grpc-go/tree/master/examples/features/error_details, but that would complicate things
// and it feels much simpler to just use a custom error type for this specific purpose.
//
// This interface is still a work in progress and might change. For eg,
// we might eventually switch from http error codes to the more precise grpc error codes
// https://cloud.google.com/apis/design/errors#handling_errors
type ErrorAPI interface {
error

// ErrorCode returns the HTTP status code for the API exception.
// See https://www.iana.org/assignments/http-status-codes/http-status-codes.xhtml
//
// One special and very important code is 503,
// which is used to signify that eigenda is temporarily unavailable,
// and suggest to the caller (most likely some rollup batcher via the eigenda-proxy)
// to fallback to ethda for some amount of time.
// See https://github.com/ethereum-optimism/specs/issues/434 for more details.
ErrorCode() int
// ErrorFault returns the fault for the API exception.
ErrorFault() ErrorFault
// 3 reasons for returning api.ErrorFailover:
// 1. Failed to put the blob in the disperser's queue (disperser is down)
// 2. Timed out before getting confirmed onchain (batcher is down)
// 3. Insufficient signatures (eigenda network is down)
type ErrorFailover struct {
Err error
}

// ErrorAPIGeneric provides a generic concrete API error type that implements APIError
// and clients can use when they don't have a more concrete error type to use.
// It would typically be
type ErrorAPIGeneric struct {
Err error
Code int
Fault ErrorFault
}

// NewErrorApiGeneric creates a new Error that implements the ErrorAPI interface.
//
// eg usage:
//
// NewErrorApiGeneric(http.StatusNotFound, errors.New("not found"))
func NewErrorAPIGeneric(code int, err error) *ErrorAPIGeneric {
errGeneric := &ErrorAPIGeneric{
Err: err,
Code: code,
Fault: ErrorFaultUnknown,
}
if code >= 400 && code < 500 {
errGeneric.Fault = ErrorFaultClient
} else if code >= 500 && code < 600 {
errGeneric.Fault = ErrorFaultServer
// NewErrorFailover creates a new ErrorFailover with the given underlying error.
// See ErrorFailover for more details.
func NewErrorFailover(err error) *ErrorFailover {
return &ErrorFailover{
Err: err,
}
return errGeneric
}

// ErrorCode returns the error code for the API exception.
func (e *ErrorAPIGeneric) ErrorCode() int { return e.Code }

// ErrorFault returns the fault for the API exception.
func (e *ErrorAPIGeneric) ErrorFault() ErrorFault { return e.Fault }

func (e *ErrorAPIGeneric) Error() string {
return fmt.Sprintf("api error %d: %s", e.Code, e.Err.Error())
func (e *ErrorFailover) Error() string {
return e.Err.Error()
}

// We implement Unwrap so that errors.Is and errors.As work as expected.
func (e *ErrorAPIGeneric) Unwrap() error { return e.Err }

var _ ErrorAPI = (*ErrorAPIGeneric)(nil)

// ErrorFault provides the broadest categorization of an error (client, server, or unknown).
type ErrorFault int

// ErrorFault enumeration values
const (
ErrorFaultUnknown ErrorFault = iota
ErrorFaultServer
ErrorFaultClient
)

func (f ErrorFault) String() string {
switch f {
case ErrorFaultServer:
return "server"
case ErrorFaultClient:
return "client"
default:
return "unknown"
}
func (e *ErrorFailover) Unwrap() error {
return e.Err
}

0 comments on commit 8eeb3e0

Please sign in to comment.