Skip to content

Commit

Permalink
Fail v2 blobs for no attestations (#1043)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Dec 19, 2024
1 parent 45be21e commit 449e992
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 69 deletions.
8 changes: 7 additions & 1 deletion api/docs/disperser_v2.html
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ <h3 id="disperser.v2.SignedBatch">SignedBatch</h3>


<h3 id="disperser.v2.BlobStatus">BlobStatus</h3>
<p>BlobStatus represents the status of a blob.</p><p>The status of a blob is updated as the blob is processed by the disperser.</p><p>The status of a blob can be queried by the client using the GetBlobStatus API.</p><p>Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:</p><p>- QUEUED</p><p>- ENCODED</p><p>Terminal states are states that will not be updated to a different state:</p><p>- CERTIFIED</p><p>- FAILED</p>
<p>BlobStatus represents the status of a blob.</p><p>The status of a blob is updated as the blob is processed by the disperser.</p><p>The status of a blob can be queried by the client using the GetBlobStatus API.</p><p>Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:</p><p>- QUEUED</p><p>- ENCODED</p><p>Terminal states are states that will not be updated to a different state:</p><p>- CERTIFIED</p><p>- FAILED</p><p>- INSUFFICIENT_SIGNATURES</p>
<table class="enum-table">
<thead>
<tr><td>Name</td><td>Number</td><td>Description</td></tr>
Expand Down Expand Up @@ -826,6 +826,12 @@ <h3 id="disperser.v2.BlobStatus">BlobStatus</h3>
<td><p>FAILED means that the blob has failed permanently</p></td>
</tr>

<tr>
<td>INSUFFICIENT_SIGNATURES</td>
<td>5</td>
<td><p>INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation</p></td>
</tr>

</tbody>
</table>

Expand Down
2 changes: 2 additions & 0 deletions api/docs/disperser_v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ Intermediate states are states that the blob can be in while being processed, an
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
- INSUFFICIENT_SIGNATURES

| Name | Number | Description |
| ---- | ------ | ----------- |
Expand All @@ -294,6 +295,7 @@ Terminal states are states that will not be updated to a different state:
| ENCODED | 2 | ENCODED means that the blob has been encoded and is ready to be dispersed to DA Nodes |
| CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes |
| FAILED | 4 | FAILED means that the blob has failed permanently |
| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation |



Expand Down
8 changes: 7 additions & 1 deletion api/docs/eigenda-protos.html
Original file line number Diff line number Diff line change
Expand Up @@ -2460,7 +2460,7 @@ <h3 id="disperser.v2.SignedBatch">SignedBatch</h3>


<h3 id="disperser.v2.BlobStatus">BlobStatus</h3>
<p>BlobStatus represents the status of a blob.</p><p>The status of a blob is updated as the blob is processed by the disperser.</p><p>The status of a blob can be queried by the client using the GetBlobStatus API.</p><p>Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:</p><p>- QUEUED</p><p>- ENCODED</p><p>Terminal states are states that will not be updated to a different state:</p><p>- CERTIFIED</p><p>- FAILED</p>
<p>BlobStatus represents the status of a blob.</p><p>The status of a blob is updated as the blob is processed by the disperser.</p><p>The status of a blob can be queried by the client using the GetBlobStatus API.</p><p>Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:</p><p>- QUEUED</p><p>- ENCODED</p><p>Terminal states are states that will not be updated to a different state:</p><p>- CERTIFIED</p><p>- FAILED</p><p>- INSUFFICIENT_SIGNATURES</p>
<table class="enum-table">
<thead>
<tr><td>Name</td><td>Number</td><td>Description</td></tr>
Expand Down Expand Up @@ -2497,6 +2497,12 @@ <h3 id="disperser.v2.BlobStatus">BlobStatus</h3>
<td><p>FAILED means that the blob has failed permanently</p></td>
</tr>

<tr>
<td>INSUFFICIENT_SIGNATURES</td>
<td>5</td>
<td><p>INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation</p></td>
</tr>

</tbody>
</table>

Expand Down
2 changes: 2 additions & 0 deletions api/docs/eigenda-protos.md
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,7 @@ Intermediate states are states that the blob can be in while being processed, an
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
- INSUFFICIENT_SIGNATURES

| Name | Number | Description |
| ---- | ------ | ----------- |
Expand All @@ -996,6 +997,7 @@ Terminal states are states that will not be updated to a different state:
| ENCODED | 2 | ENCODED means that the blob has been encoded and is ready to be dispersed to DA Nodes |
| CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes |
| FAILED | 4 | FAILED means that the blob has failed permanently |
| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation |



Expand Down
74 changes: 40 additions & 34 deletions api/grpc/disperser/v2/disperser_v2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/proto/disperser/v2/disperser_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ message GetPaymentStateReply {
// Terminal states are states that will not be updated to a different state:
// - CERTIFIED
// - FAILED
// - INSUFFICIENT_SIGNATURES
enum BlobStatus {
UNKNOWN = 0;

Expand All @@ -113,6 +114,9 @@ enum BlobStatus {

// FAILED means that the blob has failed permanently
FAILED = 4;

// INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation
INSUFFICIENT_SIGNATURES = 5;
}

// SignedBatch is a batch of blobs with a signature.
Expand Down
7 changes: 7 additions & 0 deletions disperser/common/v2/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
Encoded
Certified
Failed
INSUFFICIENT_SIGNATURES
)

func (s BlobStatus) String() string {
Expand All @@ -27,6 +28,8 @@ func (s BlobStatus) String() string {
return "Certified"
case Failed:
return "Failed"
case INSUFFICIENT_SIGNATURES:
return "Insufficient Signatures"
default:
return "Unknown"
}
Expand All @@ -42,6 +45,8 @@ func (s BlobStatus) ToProfobuf() pb.BlobStatus {
return pb.BlobStatus_CERTIFIED
case Failed:
return pb.BlobStatus_FAILED
case INSUFFICIENT_SIGNATURES:
return pb.BlobStatus_INSUFFICIENT_SIGNATURES
default:
return pb.BlobStatus_UNKNOWN
}
Expand All @@ -57,6 +62,8 @@ func BlobStatusFromProtobuf(s pb.BlobStatus) (BlobStatus, error) {
return Certified, nil
case pb.BlobStatus_FAILED:
return Failed, nil
case pb.BlobStatus_INSUFFICIENT_SIGNATURES:
return INSUFFICIENT_SIGNATURES, nil
default:
return 0, fmt.Errorf("unknown blob status: %v", s)
}
Expand Down
9 changes: 5 additions & 4 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ const (

var (
statusUpdatePrecondition = map[v2.BlobStatus][]v2.BlobStatus{
v2.Queued: {},
v2.Encoded: {v2.Queued},
v2.Certified: {v2.Encoded},
v2.Failed: {v2.Queued, v2.Encoded},
v2.Queued: {},
v2.Encoded: {v2.Queued},
v2.Certified: {v2.Encoded},
v2.Failed: {v2.Queued, v2.Encoded},
v2.INSUFFICIENT_SIGNATURES: {v2.Encoded},
}
ErrInvalidStateTransition = errors.New("invalid state transition")
)
Expand Down
4 changes: 2 additions & 2 deletions disperser/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func teardown() {
}
}

func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) {
func newBlob(t *testing.T, quorumNumbers []core.QuorumID) (corev2.BlobKey, *corev2.BlobHeader) {
accountBytes := make([]byte, 32)
_, err := rand.Read(accountBytes)
require.NoError(t, err)
Expand All @@ -168,7 +168,7 @@ func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) {
require.NoError(t, err)
bh := &corev2.BlobHeader{
BlobVersion: 0,
QuorumNumbers: []core.QuorumID{0, 1},
QuorumNumbers: quorumNumbers,
BlobCommitments: mockCommitment,
PaymentMetadata: core.PaymentMetadata{
AccountID: accountID,
Expand Down
63 changes: 47 additions & 16 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -259,9 +260,13 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))

nonZeroQuorums := make([]core.QuorumID, 0)
for quorumID := range quorumAttestation.QuorumResults {
d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID])
nonZeroQuorums = append(nonZeroQuorums, quorumID)
quorumResults := make(map[core.QuorumID]uint8)
for quorumID, quorumResult := range quorumAttestation.QuorumResults {
d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumResult)
if quorumResult.PercentSigned > 0 {
nonZeroQuorums = append(nonZeroQuorums, quorumID)
quorumResults[quorumID] = quorumResult.PercentSigned
}
}
if len(nonZeroQuorums) == 0 {
return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash)
Expand All @@ -274,11 +279,7 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err)
}

quorumResults := make(map[core.QuorumID]uint8)
for quorumID, result := range quorumAttestation.QuorumResults {
quorumResults[quorumID] = result.PercentSigned
}
err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
attestation := &corev2.Attestation{
BatchHeader: batchData.Batch.BatchHeader,
AttestedAt: uint64(time.Now().UnixNano()),
NonSignerPubKeys: aggSig.NonSigners,
Expand All @@ -287,18 +288,19 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
Sigma: aggSig.AggSignature,
QuorumNumbers: nonZeroQuorums,
QuorumResults: quorumResults,
})
}
err = d.blobMetadataStore.PutAttestation(ctx, attestation)
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err)
}

err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
err = d.updateBatchStatus(ctx, batchData, attestation)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err)
return fmt.Errorf("failed to update blob statuses for batch %s: %w", batchHeaderHash, err)
}

d.logger.Debug("successfully processed batch", "batchHeader", batchHeaderHash)
Expand Down Expand Up @@ -489,12 +491,41 @@ func (d *Dispatcher) sendChunks(ctx context.Context, client clients.NodeClient,
return sig, nil
}

func (d *Dispatcher) updateBatchStatus(ctx context.Context, keys []corev2.BlobKey, status v2.BlobStatus) error {
for _, key := range keys {
err := d.blobMetadataStore.UpdateBlobStatus(ctx, key, status)
func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, attestation *corev2.Attestation) error {
var multierr error
for i, cert := range batch.Batch.BlobCertificates {
blobKey := batch.BlobKeys[i]
if cert == nil || cert.BlobHeader == nil {
d.logger.Error("invalid blob certificate in batch")
err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Failed)
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
continue
}

failed := false
for _, q := range cert.BlobHeader.QuorumNumbers {
if res, ok := attestation.QuorumResults[q]; !ok || res == 0 {
d.logger.Error("quorum result not found", "quorumID", q, "blobKey", blobKey.Hex())
failed = true
break
}
}

if failed {
err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.INSUFFICIENT_SIGNATURES)
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
continue
}

err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Certified)
if err != nil {
d.logger.Error("failed to update blob status", "blobKey", key.Hex(), "status", status.String(), "err", err)
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to certified: %w", blobKey.Hex(), err))
}
}
return nil

return multierr
}
Loading

0 comments on commit 449e992

Please sign in to comment.