Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle signature aggregation failure in dispatcher #1045

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions disperser/common/v2/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
Encoded
Certified
Failed
INSUFFICIENT_SIGNATURES
InsufficientSignatures
)

func (s BlobStatus) String() string {
Expand All @@ -28,7 +28,7 @@ func (s BlobStatus) String() string {
return "Certified"
case Failed:
return "Failed"
case INSUFFICIENT_SIGNATURES:
case InsufficientSignatures:
return "Insufficient Signatures"
default:
return "Unknown"
Expand All @@ -45,7 +45,7 @@ func (s BlobStatus) ToProfobuf() pb.BlobStatus {
return pb.BlobStatus_CERTIFIED
case Failed:
return pb.BlobStatus_FAILED
case INSUFFICIENT_SIGNATURES:
case InsufficientSignatures:
return pb.BlobStatus_INSUFFICIENT_SIGNATURES
default:
return pb.BlobStatus_UNKNOWN
Expand All @@ -63,7 +63,7 @@ func BlobStatusFromProtobuf(s pb.BlobStatus) (BlobStatus, error) {
case pb.BlobStatus_FAILED:
return Failed, nil
case pb.BlobStatus_INSUFFICIENT_SIGNATURES:
return INSUFFICIENT_SIGNATURES, nil
return InsufficientSignatures, nil
default:
return 0, fmt.Errorf("unknown blob status: %v", s)
}
Expand Down
10 changes: 5 additions & 5 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ const (

var (
statusUpdatePrecondition = map[v2.BlobStatus][]v2.BlobStatus{
v2.Queued: {},
v2.Encoded: {v2.Queued},
v2.Certified: {v2.Encoded},
v2.Failed: {v2.Queued, v2.Encoded},
v2.INSUFFICIENT_SIGNATURES: {v2.Encoded},
v2.Queued: {},
v2.Encoded: {v2.Queued},
v2.Certified: {v2.Encoded},
v2.Failed: {v2.Queued, v2.Encoded},
v2.InsufficientSignatures: {v2.Encoded},
}
ErrInvalidStateTransition = errors.New("invalid state transition")
)
Expand Down
36 changes: 32 additions & 4 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
batchHeaderHash := hex.EncodeToString(batchData.BatchHeaderHash[:])
quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan)
if err != nil {
dbErr := d.failBatch(ctx, batchData)
if dbErr != nil {
return fmt.Errorf("failed to update blob statuses for batch %s to failed: %w", batchHeaderHash, dbErr)
}
return fmt.Errorf("failed to receive and validate signatures for batch %s: %w", batchHeaderHash, err)
}
receiveSignaturesFinished := time.Now()
Expand All @@ -269,13 +273,21 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
}
}
if len(nonZeroQuorums) == 0 {
err = d.updateBatchStatus(ctx, batchData, quorumResults)
if err != nil {
return fmt.Errorf("failed to update blob statuses for batch %s: %w", batchHeaderHash, err)
}
return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash)
}

aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, nonZeroQuorums)
aggregateSignaturesFinished := time.Now()
d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished))
if err != nil {
dbErr := d.failBatch(ctx, batchData)
if dbErr != nil {
return fmt.Errorf("failed to update blob statuses for batch %s to failed: %w", batchHeaderHash, dbErr)
}
return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err)
}

Expand All @@ -293,10 +305,14 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
dbErr := d.failBatch(ctx, batchData)
if dbErr != nil {
return fmt.Errorf("failed to update blob statuses for batch %s to failed: %w", batchHeaderHash, dbErr)
}
return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err)
}

err = d.updateBatchStatus(ctx, batchData, attestation)
err = d.updateBatchStatus(ctx, batchData, attestation.QuorumResults)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
Expand Down Expand Up @@ -491,7 +507,7 @@ func (d *Dispatcher) sendChunks(ctx context.Context, client clients.NodeClient,
return sig, nil
}

func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, attestation *corev2.Attestation) error {
func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, quorumResults map[core.QuorumID]uint8) error {
var multierr error
for i, cert := range batch.Batch.BlobCertificates {
blobKey := batch.BlobKeys[i]
Expand All @@ -506,15 +522,15 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, at

failed := false
for _, q := range cert.BlobHeader.QuorumNumbers {
if res, ok := attestation.QuorumResults[q]; !ok || res == 0 {
if res, ok := quorumResults[q]; !ok || res == 0 {
d.logger.Error("quorum result not found", "quorumID", q, "blobKey", blobKey.Hex())
failed = true
break
}
}

if failed {
err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.INSUFFICIENT_SIGNATURES)
err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.InsufficientSignatures)
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
Expand All @@ -529,3 +545,15 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, at

return multierr
}

func (d *Dispatcher) failBatch(ctx context.Context, batch *batchData) error {
var multierr error
for _, blobKey := range batch.BlobKeys {
err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Failed)
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
}

return multierr
}
66 changes: 65 additions & 1 deletion disperser/controller/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestDispatcherInsufficientSignatures(t *testing.T) {
for _, blobKey := range failedObjs.blobKeys {
bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
require.NoError(t, err)
require.Equal(t, v2.INSUFFICIENT_SIGNATURES, bm.BlobStatus)
require.Equal(t, v2.InsufficientSignatures, bm.BlobStatus)
}
for _, blobKey := range successfulObjs.blobKeys {
bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
Expand Down Expand Up @@ -205,6 +205,70 @@ func TestDispatcherInsufficientSignatures(t *testing.T) {
deleteBlobs(t, components.BlobMetadataStore, successfulObjs.blobKeys, [][32]byte{bhh})
}

func TestDispatcherInsufficientSignatures2(t *testing.T) {
components := newDispatcherComponents(t)
objsInBothQuorum := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, 2)
objsInQuorum1 := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{1}, 1)
ctx := context.Background()

// Get batch header hash to mock signatures
certs := make([]*corev2.BlobCertificate, 0, len(objsInBothQuorum.blobCerts)+len(objsInQuorum1.blobCerts))
certs = append(certs, objsInBothQuorum.blobCerts...)
certs = append(certs, objsInQuorum1.blobCerts...)
merkleTree, err := corev2.BuildMerkleTree(certs)
require.NoError(t, err)
require.NotNil(t, merkleTree)
require.NotNil(t, merkleTree.Root())

// no operators sign, all blobs will have insufficient signatures
mockClient0 := clientsmock.NewNodeClient()
mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure"))
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort
require.NotEqual(t, op0Port, op1Port)
require.NotEqual(t, op0Port, op2Port)
components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil)
mockClient1 := clientsmock.NewNodeClient()
mockClient1.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure"))
components.NodeClientManager.On("GetClient", mock.Anything, op1Port).Return(mockClient1, nil)
mockClient2 := clientsmock.NewNodeClient()
mockClient2.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure"))
components.NodeClientManager.On("GetClient", mock.Anything, op2Port).Return(mockClient2, nil)

sigChan, batchData, err := components.Dispatcher.HandleBatch(ctx)
require.NoError(t, err)
err = components.Dispatcher.HandleSignatures(ctx, batchData, sigChan)
require.ErrorContains(t, err, "all quorums received no attestation")

// Test that the blob metadata status are updated
for _, blobKey := range objsInBothQuorum.blobKeys {
bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
require.NoError(t, err)
require.Equal(t, v2.InsufficientSignatures, bm.BlobStatus)
}
for _, blobKey := range objsInQuorum1.blobKeys {
bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
require.NoError(t, err)
require.Equal(t, v2.InsufficientSignatures, bm.BlobStatus)
}

// Get batch header
vis, err := components.BlobMetadataStore.GetBlobVerificationInfos(ctx, objsInBothQuorum.blobKeys[0])
require.NoError(t, err)
require.Len(t, vis, 1)
bhh, err := vis[0].BatchHeader.Hash()
require.NoError(t, err)

// Test that attestation is written
att, err := components.BlobMetadataStore.GetAttestation(ctx, bhh)
require.Error(t, err)
require.Nil(t, att)

deleteBlobs(t, components.BlobMetadataStore, objsInBothQuorum.blobKeys, [][32]byte{bhh})
deleteBlobs(t, components.BlobMetadataStore, objsInQuorum1.blobKeys, [][32]byte{bhh})
}

func TestDispatcherMaxBatchSize(t *testing.T) {
components := newDispatcherComponents(t)
numBlobs := 12
Expand Down
Loading