Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail v2 blobs for no attestations #1043

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion api/docs/disperser_v2.html
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ <h3 id="disperser.v2.SignedBatch">SignedBatch</h3>


<h3 id="disperser.v2.BlobStatus">BlobStatus</h3>
<p>BlobStatus represents the status of a blob.</p><p>The status of a blob is updated as the blob is processed by the disperser.</p><p>The status of a blob can be queried by the client using the GetBlobStatus API.</p><p>Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:</p><p>- QUEUED</p><p>- ENCODED</p><p>Terminal states are states that will not be updated to a different state:</p><p>- CERTIFIED</p><p>- FAILED</p>
<p>BlobStatus represents the status of a blob.</p><p>The status of a blob is updated as the blob is processed by the disperser.</p><p>The status of a blob can be queried by the client using the GetBlobStatus API.</p><p>Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:</p><p>- QUEUED</p><p>- ENCODED</p><p>Terminal states are states that will not be updated to a different state:</p><p>- CERTIFIED</p><p>- FAILED</p><p>- INSUFFICIENT_SIGNATURES</p>
<table class="enum-table">
<thead>
<tr><td>Name</td><td>Number</td><td>Description</td></tr>
Expand Down Expand Up @@ -826,6 +826,12 @@ <h3 id="disperser.v2.BlobStatus">BlobStatus</h3>
<td><p>FAILED means that the blob has failed permanently</p></td>
</tr>

<tr>
<td>INSUFFICIENT_SIGNATURES</td>
<td>5</td>
<td><p>INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation</p></td>
</tr>

</tbody>
</table>

Expand Down
2 changes: 2 additions & 0 deletions api/docs/disperser_v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ Intermediate states are states that the blob can be in while being processed, an
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
- INSUFFICIENT_SIGNATURES

| Name | Number | Description |
| ---- | ------ | ----------- |
Expand All @@ -294,6 +295,7 @@ Terminal states are states that will not be updated to a different state:
| ENCODED | 2 | ENCODED means that the blob has been encoded and is ready to be dispersed to DA Nodes |
| CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes |
| FAILED | 4 | FAILED means that the blob has failed permanently |
| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation |



Expand Down
8 changes: 7 additions & 1 deletion api/docs/eigenda-protos.html
Original file line number Diff line number Diff line change
Expand Up @@ -2460,7 +2460,7 @@ <h3 id="disperser.v2.SignedBatch">SignedBatch</h3>


<h3 id="disperser.v2.BlobStatus">BlobStatus</h3>
<p>BlobStatus represents the status of a blob.</p><p>The status of a blob is updated as the blob is processed by the disperser.</p><p>The status of a blob can be queried by the client using the GetBlobStatus API.</p><p>Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:</p><p>- QUEUED</p><p>- ENCODED</p><p>Terminal states are states that will not be updated to a different state:</p><p>- CERTIFIED</p><p>- FAILED</p>
<p>BlobStatus represents the status of a blob.</p><p>The status of a blob is updated as the blob is processed by the disperser.</p><p>The status of a blob can be queried by the client using the GetBlobStatus API.</p><p>Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:</p><p>- QUEUED</p><p>- ENCODED</p><p>Terminal states are states that will not be updated to a different state:</p><p>- CERTIFIED</p><p>- FAILED</p><p>- INSUFFICIENT_SIGNATURES</p>
<table class="enum-table">
<thead>
<tr><td>Name</td><td>Number</td><td>Description</td></tr>
Expand Down Expand Up @@ -2497,6 +2497,12 @@ <h3 id="disperser.v2.BlobStatus">BlobStatus</h3>
<td><p>FAILED means that the blob has failed permanently</p></td>
</tr>

<tr>
<td>INSUFFICIENT_SIGNATURES</td>
<td>5</td>
<td><p>INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation</p></td>
</tr>

</tbody>
</table>

Expand Down
2 changes: 2 additions & 0 deletions api/docs/eigenda-protos.md
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,7 @@ Intermediate states are states that the blob can be in while being processed, an
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
- INSUFFICIENT_SIGNATURES

| Name | Number | Description |
| ---- | ------ | ----------- |
Expand All @@ -996,6 +997,7 @@ Terminal states are states that will not be updated to a different state:
| ENCODED | 2 | ENCODED means that the blob has been encoded and is ready to be dispersed to DA Nodes |
| CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes |
| FAILED | 4 | FAILED means that the blob has failed permanently |
| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation |



Expand Down
74 changes: 40 additions & 34 deletions api/grpc/disperser/v2/disperser_v2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/proto/disperser/v2/disperser_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ message GetPaymentStateReply {
// Terminal states are states that will not be updated to a different state:
// - CERTIFIED
// - FAILED
// - INSUFFICIENT_SIGNATURES
enum BlobStatus {
UNKNOWN = 0;

Expand All @@ -113,6 +114,9 @@ enum BlobStatus {

// FAILED means that the blob has failed permanently
FAILED = 4;

// INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation
INSUFFICIENT_SIGNATURES = 5;
}

// SignedBatch is a batch of blobs with a signature.
Expand Down
7 changes: 7 additions & 0 deletions disperser/common/v2/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
Encoded
Certified
Failed
INSUFFICIENT_SIGNATURES
)

func (s BlobStatus) String() string {
Expand All @@ -27,6 +28,8 @@ func (s BlobStatus) String() string {
return "Certified"
case Failed:
return "Failed"
case INSUFFICIENT_SIGNATURES:
return "Insufficient Signatures"
default:
return "Unknown"
}
Expand All @@ -42,6 +45,8 @@ func (s BlobStatus) ToProfobuf() pb.BlobStatus {
return pb.BlobStatus_CERTIFIED
case Failed:
return pb.BlobStatus_FAILED
case INSUFFICIENT_SIGNATURES:
return pb.BlobStatus_INSUFFICIENT_SIGNATURES
default:
return pb.BlobStatus_UNKNOWN
}
Expand All @@ -57,6 +62,8 @@ func BlobStatusFromProtobuf(s pb.BlobStatus) (BlobStatus, error) {
return Certified, nil
case pb.BlobStatus_FAILED:
return Failed, nil
case pb.BlobStatus_INSUFFICIENT_SIGNATURES:
return INSUFFICIENT_SIGNATURES, nil
default:
return 0, fmt.Errorf("unknown blob status: %v", s)
}
Expand Down
9 changes: 5 additions & 4 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ const (

var (
statusUpdatePrecondition = map[v2.BlobStatus][]v2.BlobStatus{
v2.Queued: {},
v2.Encoded: {v2.Queued},
v2.Certified: {v2.Encoded},
v2.Failed: {v2.Queued, v2.Encoded},
v2.Queued: {},
v2.Encoded: {v2.Queued},
v2.Certified: {v2.Encoded},
v2.Failed: {v2.Queued, v2.Encoded},
v2.INSUFFICIENT_SIGNATURES: {v2.Encoded},
}
ErrInvalidStateTransition = errors.New("invalid state transition")
)
Expand Down
4 changes: 2 additions & 2 deletions disperser/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func teardown() {
}
}

func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) {
func newBlob(t *testing.T, quorumNumbers []core.QuorumID) (corev2.BlobKey, *corev2.BlobHeader) {
accountBytes := make([]byte, 32)
_, err := rand.Read(accountBytes)
require.NoError(t, err)
Expand All @@ -168,7 +168,7 @@ func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) {
require.NoError(t, err)
bh := &corev2.BlobHeader{
BlobVersion: 0,
QuorumNumbers: []core.QuorumID{0, 1},
QuorumNumbers: quorumNumbers,
BlobCommitments: mockCommitment,
PaymentMetadata: core.PaymentMetadata{
AccountID: accountID,
Expand Down
63 changes: 47 additions & 16 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -259,9 +260,13 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))

nonZeroQuorums := make([]core.QuorumID, 0)
for quorumID := range quorumAttestation.QuorumResults {
d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID])
nonZeroQuorums = append(nonZeroQuorums, quorumID)
quorumResults := make(map[core.QuorumID]uint8)
for quorumID, quorumResult := range quorumAttestation.QuorumResults {
d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumResult)
if quorumResult.PercentSigned > 0 {
nonZeroQuorums = append(nonZeroQuorums, quorumID)
quorumResults[quorumID] = quorumResult.PercentSigned
}
}
if len(nonZeroQuorums) == 0 {
return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash)
Expand All @@ -274,11 +279,7 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err)
}

quorumResults := make(map[core.QuorumID]uint8)
for quorumID, result := range quorumAttestation.QuorumResults {
quorumResults[quorumID] = result.PercentSigned
}
err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
attestation := &corev2.Attestation{
BatchHeader: batchData.Batch.BatchHeader,
AttestedAt: uint64(time.Now().UnixNano()),
NonSignerPubKeys: aggSig.NonSigners,
Expand All @@ -287,18 +288,19 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
Sigma: aggSig.AggSignature,
QuorumNumbers: nonZeroQuorums,
QuorumResults: quorumResults,
})
}
err = d.blobMetadataStore.PutAttestation(ctx, attestation)
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err)
}

err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
err = d.updateBatchStatus(ctx, batchData, attestation)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err)
return fmt.Errorf("failed to update blob statuses for batch %s: %w", batchHeaderHash, err)
}

d.logger.Debug("successfully processed batch", "batchHeader", batchHeaderHash)
Expand Down Expand Up @@ -489,12 +491,41 @@ func (d *Dispatcher) sendChunks(ctx context.Context, client clients.NodeClient,
return sig, nil
}

func (d *Dispatcher) updateBatchStatus(ctx context.Context, keys []corev2.BlobKey, status v2.BlobStatus) error {
for _, key := range keys {
err := d.blobMetadataStore.UpdateBlobStatus(ctx, key, status)
func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, attestation *corev2.Attestation) error {
var multierr error
for i, cert := range batch.Batch.BlobCertificates {
blobKey := batch.BlobKeys[i]
if cert == nil || cert.BlobHeader == nil {
d.logger.Error("invalid blob certificate in batch")
err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Failed)
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
continue
}

failed := false
for _, q := range cert.BlobHeader.QuorumNumbers {
if res, ok := attestation.QuorumResults[q]; !ok || res == 0 {
d.logger.Error("quorum result not found", "quorumID", q, "blobKey", blobKey.Hex())
failed = true
break
}
}

if failed {
err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.INSUFFICIENT_SIGNATURES)
Copy link
Contributor

@dmanc dmanc Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So right now the insufficient signature state is for when a quorum has no attestations? Would we ever use that state for doing something like a quorum only recieved X% signatures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah
We can improve this by either considering the coding rate or the default confirmation threshold onchain, but this is a quick fix before further steps

if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
continue
}

err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Certified)
if err != nil {
d.logger.Error("failed to update blob status", "blobKey", key.Hex(), "status", status.String(), "err", err)
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to certified: %w", blobKey.Hex(), err))
}
}
return nil

return multierr
}
Loading
Loading