Skip to content

Commit

Permalink
chore: small edits, remove whitespace, add comments
Browse files Browse the repository at this point in the history
docs: better documentation for BlobStatus in disperser.proto and disperser.go

feat: eigenda-client returns 503 errors for batcher failover

Update disperser/disperser.go

Co-authored-by: Ethen <[email protected]>

Update api/clients/eigenda_client.go

Co-authored-by: Ethen <[email protected]>

fix(lint): ErrorAPIGeneric.Error() had an infinite recursive call

proto: make protoc

refactor: use http status codes instead of our own enum in api errors.go

docs: remove deprecated notice on eigendaClient.GetCodec()

docs: make more precise DISPERSING blob status comment

In both disperser.proto and disperser.go

feat(503): return 503 for insufficient signatures (was returning 500 before)

docs: make more precise comments for why we return 503 on blobstatus_failed

docs: better comment explaining timeout while in confirmed blob status

chore: update comments and asserts to reflect that disperseBlob only ever returns PROCESSING status

style: clearer docstring for blobStatus_DISPERSING in disperser.proto

refactor(eigenda-client): return grpc errors + one special ErrorFailover error (#843)
  • Loading branch information
samlaf committed Oct 29, 2024
1 parent bee55ed commit b5efda8
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 28 deletions.
2 changes: 2 additions & 0 deletions api/clients/codecs/default_blob_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func NewDefaultBlobCodec() DefaultBlobCodec {
return DefaultBlobCodec{}
}

// EncodeBlob can never return an error, but to maintain the interface it is included
// so that it can be swapped for the IFFTCodec without changing the interface
func (v DefaultBlobCodec) EncodeBlob(rawData []byte) ([]byte, error) {
codecBlobHeader := make([]byte, 32)
// first byte is always 0 to ensure the codecBlobHeader is a valid bn254 element
Expand Down
1 change: 1 addition & 0 deletions api/clients/codecs/ifft_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (v IFFTCodec) EncodeBlob(data []byte) ([]byte, error) {
var err error
data, err = v.writeCodec.EncodeBlob(data)
if err != nil {
// this cannot happen, because EncodeBlob never returns an error
return nil, fmt.Errorf("error encoding data: %w", err)
}

Expand Down
8 changes: 7 additions & 1 deletion api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func NewConfig(hostname, port string, timeout time.Duration, useSecureGrpcFlag b
type DisperserClient interface {
Close() error
DisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
// DisperseBlobAuthenticated disperses a blob with an authenticated request.
// The BlobStatus returned will always be PROCESSSING if error is nil.
DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
DispersePaidBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error)
Expand Down Expand Up @@ -197,7 +199,6 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
err = stream.Send(&disperser_rpc.AuthenticatedRequest{Payload: &disperser_rpc.AuthenticatedRequest_DisperseRequest{
DisperseRequest: request,
}})

if err != nil {
return nil, nil, fmt.Errorf("failed to send request: %w", err)
}
Expand Down Expand Up @@ -247,6 +248,11 @@ func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []
return nil, nil, err
}

// Assert: only status that makes sense is processing. Anything else is a bug on disperser side.
if *blobStatus != disperser.Processing {
return nil, nil, fmt.Errorf("expected status to be Processing, got %v", *blobStatus)
}

return blobStatus, disperseReply.DisperseReply.GetRequestId(), nil
}

Expand Down
93 changes: 75 additions & 18 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"

"github.com/Layr-Labs/eigenda/api"
"github.com/Layr-Labs/eigenda/api/clients/codecs"
grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser"
edasm "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/disperser"
)

// IEigenDAClient is a wrapper around the DisperserClient interface which
Expand Down Expand Up @@ -162,11 +162,37 @@ func (m *EigenDAClient) GetBlob(ctx context.Context, batchHeaderHash []byte, blo

// PutBlob encodes and writes a blob to EigenDA, waiting for a desired blob status
// to be reached (guarded by WaitForFinalization config param) before returning.
// This function is resilient to transient failures and timeouts.
//
// TODO: describe retry/timeout behavior
//
// Upon return the blob is guaranteed to be:
// - finalized onchain (if Config.WaitForFinalization is true), or
// - confirmed at a certain depth (if Config.WaitForFinalization is false, in which case Config.WaitForConfirmationDepth specifies the depth).
// - confirmed at a certain depth (if Config.WaitForFinalization is false,
// in which case Config.WaitForConfirmationDepth specifies the depth).
//
// Errors returned all either grpc errors, or api.ErrorFailover, for eg:
//
// blobInfo, err := client.PutBlob(ctx, blobData)
// if err != nil {
// if errors.Is(err, api.ErrorFailover) {
// // failover to ethda
// }
// st, isGRPCError := status.FromError(err)
// if isGRPCError {
// // use st.Code() and st.Message()
// } else {
// // assert this shouldn't happen
// }
// }
//
// An api.ErrorFailover error returned is used to signify that eigenda is temporarily unavailable,
// and suggest to the caller (most likely some rollup batcher via the eigenda-proxy)
// to fallback to ethda for some amount of time. Three reasons for returning api.ErrorFailover:
// 1. Failed to put the blob in the disperser's queue (disperser is down)
// 2. Timed out before getting confirmed onchain (batcher is down)
// 3. Insufficient signatures (eigenda network is down)
//
// See https://github.com/ethereum-optimism/specs/issues/434 for more details.
func (m *EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) {
resultChan, errorChan := m.PutBlobAsync(ctx, data)
select { // no timeout here because we depend on the configured timeout in PutBlobAsync
Expand All @@ -189,13 +215,14 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan

// encode blob
if m.Codec == nil {
errChan <- fmt.Errorf("Codec cannot be nil")
errChan <- api.NewErrorInternal("codec not initialized")
return
}

data, err := m.Codec.EncodeBlob(rawData)
if err != nil {
errChan <- fmt.Errorf("error encoding blob: %w", err)
// Encode can only fail if there is something wrong with the data, so we return a 400 error
errChan <- api.NewErrorInvalidArg(fmt.Sprintf("error encoding blob: %v", err))
return
}

Expand All @@ -205,15 +232,11 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
}
// disperse blob
// TODO: would be nice to add a trace-id key to the context, to be able to follow requests from batcher->proxy->eigenda
blobStatus, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
_, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
if err != nil {
errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err)
return
}

// process response
if *blobStatus == disperser.Failed {
errChan <- fmt.Errorf("unable to disperse blob to eigenda (reply status %d): %w", blobStatus, err)
// Disperser-client returned error is already a grpc error which can be a 400 (eg rate limited) or 500,
// so we wrap the error such that clients can still use grpc's status.FromError() function to get the status code.
errChan <- fmt.Errorf("error submitting authenticated blob to disperser: %w", err)
return
}

Expand All @@ -229,18 +252,41 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan

alreadyWaitingForDispersal := false
alreadyWaitingForConfirmationOrFinality := false
var latestBlobStatus grpcdisperser.BlobStatus
for {
select {
case <-ctx.Done():
errChan <- fmt.Errorf("timed out waiting for EigenDA blob to confirm blob with request id=%s: %w", base64RequestID, ctx.Err())
// We can only land here if blob is still in
// 1. processing or dispersing status: waiting to land onchain
// 2. or confirmed status: landed onchain, waiting for finalization
// because all other statuses return immediately below.
//
// Assuming that the timeout is correctly set (long enough to both land onchain + finalize),
// 1. means that there is a problem with EigenDA, so we return an ErrorFailover to let the batcher failover to ethda
// 2. means that there is a problem with Ethereum, so we return 500.
// batcher would most likely resubmit another blob, which is not ideal but there isn't much to be done...
// eigenDA v2 will have idempotency so one can just resubmit the same blob safely.
if latestBlobStatus == grpcdisperser.BlobStatus_PROCESSING || latestBlobStatus == grpcdisperser.BlobStatus_DISPERSING {
errChan <- api.NewErrorFailover(fmt.Errorf("eigenda might be down. timed out waiting for blob to land onchain (request id=%s): %w", base64RequestID, ctx.Err()))
} else if latestBlobStatus == grpcdisperser.BlobStatus_CONFIRMED {
// Timeout'ing in confirmed state means one of two things:
// 1. (if timeout was long enough to finalize in normal conditions): problem with ethereum, so we return 504 (DeadlineExceeded)
// 2. TODO: (if timeout was not long enough to finalize in normal conditions): eigenda-client is badly configured, should be a 400 (INVALID_ARGUMENT)
errChan <- api.NewErrorDeadlineExceeded(
fmt.Sprintf("timed out waiting for blob that landed onchain to finalize (request id=%s). "+
"Either timeout not long enough, or ethereum might be experiencing difficulties: %v. ", base64RequestID, ctx.Err()))
} else {
// this should not be reachable...
errChan <- api.NewErrorInternal(fmt.Sprintf("timed out in a state that shouldn't be possible (request id=%s): %s", base64RequestID, ctx.Err()))
}
return
case <-ticker.C:
statusRes, err := m.Client.GetBlobStatus(ctx, requestID)
if err != nil {
m.Log.Warn("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
continue
}

latestBlobStatus = statusRes.Status
switch statusRes.Status {
case grpcdisperser.BlobStatus_PROCESSING, grpcdisperser.BlobStatus_DISPERSING:
// to prevent log clutter, we only log at info level once
Expand All @@ -251,10 +297,21 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
alreadyWaitingForDispersal = true
}
case grpcdisperser.BlobStatus_FAILED:
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing, requestID=%s: %w", base64RequestID, err)
// This can happen for a few reasons:
// 1. blob has expired, a client retrieve after 14 days. Sounds like 400 errors, but not sure this can happen during dispersal...
// 2. internal logic error while requesting encoding (shouldn't happen), but should probably return api.ErrorFailover
// 3. wait for blob finalization from confirmation and blob retry has exceeded its limit.
// Probably from a chain re-org. See https://github.com/Layr-Labs/eigenda/blob/master/disperser/batcher/finalizer.go#L179-L189.
// So we should be returning 500 to force a blob resubmission (not eigenda's fault but until
// we have idempotency this is unfortunately the only solution)
// TODO: we should create new BlobStatus categories to separate these cases out. For now returning 500 is fine.
errChan <- api.NewErrorInternal(fmt.Sprintf("blob dispersal (requestID=%s) reached failed status. please resubmit the blob.", base64RequestID))
return
case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES:
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w", base64RequestID, err)
// Some quorum failed to sign the blob, indicating that the whole network is having issues.
// We hence return api.ErrorFailover to let the batcher failover to ethda. This could however be a very unlucky
// temporary issue, so the caller should retry at least one more time before failing over.
errChan <- api.NewErrorFailover(fmt.Errorf("blob dispersal (requestID=%s) failed with insufficient signatures. eigenda nodes are probably down.", base64RequestID))
return
case grpcdisperser.BlobStatus_CONFIRMED:
if m.Config.WaitForFinalization {
Expand Down Expand Up @@ -291,7 +348,7 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan
return
default:
// this should never happen. If it does, the blob is in a heisenberg state... it could either eventually get confirmed or fail
errChan <- fmt.Errorf("unknown reply status %d. ask for assistance from EigenDA team, using requestID %s", statusRes.Status, base64RequestID)
errChan <- api.NewErrorInternal(fmt.Sprintf("unknown reply status %d. ask for assistance from EigenDA team, using requestID %s", statusRes.Status, base64RequestID))
return
}
}
Expand Down
55 changes: 53 additions & 2 deletions api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
// The canonical errors from the EigenDA gRPC API endpoints.
//
// Notes:
// - We start with a small (but sufficient) subset of google's error code convention,
// - We start with a small (but sufficient) subset of grpc's error codes,
// and expand when there is an important failure case to separate out. See:
// https://cloud.google.com/apis/design/errors#handling_errors
// https://grpc.io/docs/guides/status-codes/
// - Make sure that internally propagated errors are eventually wrapped in one of the
// user-facing errors defined here, since grpc otherwise returns an UNKNOWN error code,
// which is harder to debug and understand for users.
// - See https://github.com/googleapis/googleapis/blob/ba8ea80f25d19bde8501cd51f314391f8d39bde8/google/rpc/code.proto
// for the mapping of grpc error codes to HTTP status codes.

func newErrorGRPC(code codes.Code, msg string) error {
return status.Error(code, msg)
Expand All @@ -39,7 +41,56 @@ func NewErrorInternal(msg string) error {
return newErrorGRPC(codes.Internal, msg)
}

// HTTP Mapping: 500 Internal Server Error
func NewErrorUnknown(msg string) error {
return newErrorGRPC(codes.Unknown, msg)
}

// HTTP Mapping: 501 Not Implemented
func NewErrorUnimplemented() error {
return newErrorGRPC(codes.Unimplemented, "not implemented")
}

// HTTP Mapping: 504 Gateway Timeout
func NewErrorDeadlineExceeded(msg string) error {
return newErrorGRPC(codes.DeadlineExceeded, "msg")
}

// ErrorFailover is returned by the disperser-client and eigenda-client to signify
// that eigenda is temporarily unavailable, and suggest to the caller
// (most likely some rollup batcher via the eigenda-proxy) to failover
// to ethda for some amount of time.
// See https://github.com/ethereum-optimism/specs/issues/434 for more details.
//
// Given that both clients already return grpc errors, we could potentially use
// a grpc UNAVAILABLE error instead, but we don't because:
// 1. UNAVAILABLE is typically used to tell the client to retry the request, not failover
// 2. the grpc framework itself also returns UNAVAILABLE errors in some cases, see:
// https://github.com/grpc/grpc-go/blob/192ee33f6fc0f07070eeaaa1d34e41746740e64c/codes/codes.go#L184.
// We could differentiate from those generated by the grpc framework by using error details, like
// https://github.com/grpc/grpc-go/tree/master/examples/features/error_details, but that would complicate things
// and it feels much simpler to just use a custom error type for this specific purpose.
//
// 3 reasons for returning api.ErrorFailover:
// 1. Failed to put the blob in the disperser's queue (disperser is down)
// 2. Timed out before getting confirmed onchain (batcher is down)
// 3. Insufficient signatures (eigenda network is down)
type ErrorFailover struct {
Err error
}

// NewErrorFailover creates a new ErrorFailover with the given underlying error.
// See ErrorFailover for more details.
func NewErrorFailover(err error) *ErrorFailover {
return &ErrorFailover{
Err: err,
}
}

func (e *ErrorFailover) Error() string {
return e.Err.Error()
}

func (e *ErrorFailover) Unwrap() error {
return e.Err
}
13 changes: 10 additions & 3 deletions api/grpc/disperser/disperser.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions api/proto/disperser/disperser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ message DispersePaidBlobRequest {
}

message DisperseBlobReply {
// The status of the blob associated with the request_id.
// The status of the blob associated with the request_id. Will always be PROCESSING.
BlobStatus result = 1;
// The request ID generated by the disperser.
// Once a request is accepted (although not processed), a unique request ID will be
Expand Down Expand Up @@ -184,15 +184,21 @@ enum BlobStatus {
CONFIRMED = 2;

// FAILED means that the blob has failed permanently (for reasons other than insufficient
// signatures, which is a separate state)
// signatures, which is a separate state). This status is somewhat of a catch-all category,
// containg (but not necessarily exclusively as errors can be added in the future):
// - blob has expired
// - internal logic error while requesting encoding
// - blob retry has exceeded its limit while waiting for blob finalization after confirmation.
// Most likely triggered by a chain reorg: see https://github.com/Layr-Labs/eigenda/blob/master/disperser/batcher/finalizer.go#L179-L189.
FAILED = 3;
// FINALIZED means that the block containing the blob's confirmation transaction has been finalized on Ethereum
FINALIZED = 4;
// INSUFFICIENT_SIGNATURES means that the confirmation threshold for the blob was not met
// for at least one quorum.
INSUFFICIENT_SIGNATURES = 5;

// DISPERSING means that the blob is currently being dispersed to DA Nodes and being confirmed onchain
// The DISPERSING state is comprised of two separate phases:
// - Dispersing to DA nodes and collecting signature
// - Submitting the transaction on chain and waiting for tx receipt
DISPERSING = 6;
}

Expand Down
Loading

0 comments on commit b5efda8

Please sign in to comment.