From a84c241e620f860424330b37e034e546ddd1f0bf Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Thu, 19 Dec 2024 13:45:24 -0800 Subject: [PATCH] fail v2 blobs for no attestations --- api/docs/disperser_v2.html | 8 +- api/docs/disperser_v2.md | 2 + api/docs/eigenda-protos.html | 8 +- api/docs/eigenda-protos.md | 2 + api/grpc/disperser/v2/disperser_v2.pb.go | 74 ++++++++------- api/proto/disperser/v2/disperser_v2.proto | 4 + disperser/common/v2/blob.go | 7 ++ disperser/controller/controller_test.go | 4 +- disperser/controller/dispatcher.go | 63 +++++++++---- disperser/controller/dispatcher_test.go | 92 ++++++++++++++++++- disperser/controller/encoding_manager_test.go | 13 +-- 11 files changed, 212 insertions(+), 65 deletions(-) diff --git a/api/docs/disperser_v2.html b/api/docs/disperser_v2.html index 41d63829c7..50b3685d72 100644 --- a/api/docs/disperser_v2.html +++ b/api/docs/disperser_v2.html @@ -789,7 +789,7 @@

SignedBatch

BlobStatus

-

BlobStatus represents the status of a blob.

The status of a blob is updated as the blob is processed by the disperser.

The status of a blob can be queried by the client using the GetBlobStatus API.

Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:

- QUEUED

- ENCODED

Terminal states are states that will not be updated to a different state:

- CERTIFIED

- FAILED

+

BlobStatus represents the status of a blob.

The status of a blob is updated as the blob is processed by the disperser.

The status of a blob can be queried by the client using the GetBlobStatus API.

Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:

- QUEUED

- ENCODED

Terminal states are states that will not be updated to a different state:

- CERTIFIED

- FAILED

- INSUFFICIENT_SIGNATURES

@@ -826,6 +826,12 @@

BlobStatus

+ + + + + +
NameNumberDescription

FAILED means that the blob has failed permanently

INSUFFICIENT_SIGNATURES5

INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation

diff --git a/api/docs/disperser_v2.md b/api/docs/disperser_v2.md index 622c24f4eb..0ca84f3b0d 100644 --- a/api/docs/disperser_v2.md +++ b/api/docs/disperser_v2.md @@ -286,6 +286,7 @@ Intermediate states are states that the blob can be in while being processed, an Terminal states are states that will not be updated to a different state: - CERTIFIED - FAILED +- INSUFFICIENT_SIGNATURES | Name | Number | Description | | ---- | ------ | ----------- | @@ -294,6 +295,7 @@ Terminal states are states that will not be updated to a different state: | ENCODED | 2 | ENCODED means that the blob has been encoded and is ready to be dispersed to DA Nodes | | CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes | | FAILED | 4 | FAILED means that the blob has failed permanently | +| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation | diff --git a/api/docs/eigenda-protos.html b/api/docs/eigenda-protos.html index b2c637223f..44474db3c8 100644 --- a/api/docs/eigenda-protos.html +++ b/api/docs/eigenda-protos.html @@ -2460,7 +2460,7 @@

SignedBatch

BlobStatus

-

BlobStatus represents the status of a blob.

The status of a blob is updated as the blob is processed by the disperser.

The status of a blob can be queried by the client using the GetBlobStatus API.

Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:

- QUEUED

- ENCODED

Terminal states are states that will not be updated to a different state:

- CERTIFIED

- FAILED

+

BlobStatus represents the status of a blob.

The status of a blob is updated as the blob is processed by the disperser.

The status of a blob can be queried by the client using the GetBlobStatus API.

Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:

- QUEUED

- ENCODED

Terminal states are states that will not be updated to a different state:

- CERTIFIED

- FAILED

- INSUFFICIENT_SIGNATURES

@@ -2497,6 +2497,12 @@

BlobStatus

+ + + + + +
NameNumberDescription

FAILED means that the blob has failed permanently

INSUFFICIENT_SIGNATURES5

INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation

diff --git a/api/docs/eigenda-protos.md b/api/docs/eigenda-protos.md index 027e2ffb07..60d2a21c90 100644 --- a/api/docs/eigenda-protos.md +++ b/api/docs/eigenda-protos.md @@ -988,6 +988,7 @@ Intermediate states are states that the blob can be in while being processed, an Terminal states are states that will not be updated to a different state: - CERTIFIED - FAILED +- INSUFFICIENT_SIGNATURES | Name | Number | Description | | ---- | ------ | ----------- | @@ -996,6 +997,7 @@ Terminal states are states that will not be updated to a different state: | ENCODED | 2 | ENCODED means that the blob has been encoded and is ready to be dispersed to DA Nodes | | CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes | | FAILED | 4 | FAILED means that the blob has failed permanently | +| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation | diff --git a/api/grpc/disperser/v2/disperser_v2.pb.go b/api/grpc/disperser/v2/disperser_v2.pb.go index 80d0814fee..4fb9718f65 100644 --- a/api/grpc/disperser/v2/disperser_v2.pb.go +++ b/api/grpc/disperser/v2/disperser_v2.pb.go @@ -31,6 +31,7 @@ const ( // Terminal states are states that will not be updated to a different state: // - CERTIFIED // - FAILED +// - INSUFFICIENT_SIGNATURES type BlobStatus int32 const ( @@ -43,6 +44,8 @@ const ( BlobStatus_CERTIFIED BlobStatus = 3 // FAILED means that the blob has failed permanently BlobStatus_FAILED BlobStatus = 4 + // INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation + BlobStatus_INSUFFICIENT_SIGNATURES BlobStatus = 5 ) // Enum value maps for BlobStatus. @@ -53,13 +56,15 @@ var ( 2: "ENCODED", 3: "CERTIFIED", 4: "FAILED", + 5: "INSUFFICIENT_SIGNATURES", } BlobStatus_value = map[string]int32{ - "UNKNOWN": 0, - "QUEUED": 1, - "ENCODED": 2, - "CERTIFIED": 3, - "FAILED": 4, + "UNKNOWN": 0, + "QUEUED": 1, + "ENCODED": 2, + "CERTIFIED": 3, + "FAILED": 4, + "INSUFFICIENT_SIGNATURES": 5, } ) @@ -1133,39 +1138,40 @@ var file_disperser_v2_disperser_v2_proto_rawDesc = []byte{ 0x63, 0x6f, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, - 0x2a, 0x4d, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, + 0x2a, 0x6a, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x51, 0x55, 0x45, 0x55, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x45, 0x4e, 0x43, 0x4f, 0x44, 0x45, 0x44, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x45, - 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x32, - 0xf2, 0x02, 0x0a, 0x09, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x54, 0x0a, - 0x0c, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x21, 0x2e, + 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x12, + 0x1b, 0x0a, 0x17, 0x49, 0x4e, 0x53, 0x55, 0x46, 0x46, 0x49, 0x43, 0x49, 0x45, 0x4e, 0x54, 0x5f, + 0x53, 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x53, 0x10, 0x05, 0x32, 0xf2, 0x02, 0x0a, + 0x09, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x54, 0x0a, 0x0c, 0x44, 0x69, + 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x21, 0x2e, 0x64, 0x69, 0x73, + 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, + 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x69, 0x73, - 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, - 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, - 0x79, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, - 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, - 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, - 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, - 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x23, 0x2e, 0x64, 0x69, - 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x43, - 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x21, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, - 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, - 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, - 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x24, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, - 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, - 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, - 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, - 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, - 0x6c, 0x79, 0x22, 0x00, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, - 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x69, - 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, + 0x12, 0x51, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, + 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, + 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f, + 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x23, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, + 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f, 0x6d, 0x6d, + 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, + 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, + 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x79, + 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x24, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, + 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53, + 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x64, 0x69, + 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, + 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, + 0x00, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, + 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x65, + 0x72, 0x73, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/api/proto/disperser/v2/disperser_v2.proto b/api/proto/disperser/v2/disperser_v2.proto index 2e26e61c47..0cae0f0d1b 100644 --- a/api/proto/disperser/v2/disperser_v2.proto +++ b/api/proto/disperser/v2/disperser_v2.proto @@ -99,6 +99,7 @@ message GetPaymentStateReply { // Terminal states are states that will not be updated to a different state: // - CERTIFIED // - FAILED +// - INSUFFICIENT_SIGNATURES enum BlobStatus { UNKNOWN = 0; @@ -113,6 +114,9 @@ enum BlobStatus { // FAILED means that the blob has failed permanently FAILED = 4; + + // INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation + INSUFFICIENT_SIGNATURES = 5; } // SignedBatch is a batch of blobs with a signature. diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go index b87cca90cc..151c85837f 100644 --- a/disperser/common/v2/blob.go +++ b/disperser/common/v2/blob.go @@ -15,6 +15,7 @@ const ( Encoded Certified Failed + INSUFFICIENT_SIGNATURES ) func (s BlobStatus) String() string { @@ -27,6 +28,8 @@ func (s BlobStatus) String() string { return "Certified" case Failed: return "Failed" + case INSUFFICIENT_SIGNATURES: + return "Insufficient Signatures" default: return "Unknown" } @@ -42,6 +45,8 @@ func (s BlobStatus) ToProfobuf() pb.BlobStatus { return pb.BlobStatus_CERTIFIED case Failed: return pb.BlobStatus_FAILED + case INSUFFICIENT_SIGNATURES: + return pb.BlobStatus_INSUFFICIENT_SIGNATURES default: return pb.BlobStatus_UNKNOWN } @@ -57,6 +62,8 @@ func BlobStatusFromProtobuf(s pb.BlobStatus) (BlobStatus, error) { return Certified, nil case pb.BlobStatus_FAILED: return Failed, nil + case pb.BlobStatus_INSUFFICIENT_SIGNATURES: + return INSUFFICIENT_SIGNATURES, nil default: return 0, fmt.Errorf("unknown blob status: %v", s) } diff --git a/disperser/controller/controller_test.go b/disperser/controller/controller_test.go index 46954cec96..50b4145ccd 100644 --- a/disperser/controller/controller_test.go +++ b/disperser/controller/controller_test.go @@ -154,7 +154,7 @@ func teardown() { } } -func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) { +func newBlob(t *testing.T, quorumNumbers []core.QuorumID) (corev2.BlobKey, *corev2.BlobHeader) { accountBytes := make([]byte, 32) _, err := rand.Read(accountBytes) require.NoError(t, err) @@ -168,7 +168,7 @@ func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) { require.NoError(t, err) bh := &corev2.BlobHeader{ BlobVersion: 0, - QuorumNumbers: []core.QuorumID{0, 1}, + QuorumNumbers: quorumNumbers, BlobCommitments: mockCommitment, PaymentMetadata: core.PaymentMetadata{ AccountID: accountID, diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index 5949578978..031eef8faf 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -16,6 +16,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigensdk-go/logging" gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus" ) @@ -259,9 +260,13 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart)) nonZeroQuorums := make([]core.QuorumID, 0) - for quorumID := range quorumAttestation.QuorumResults { - d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID]) - nonZeroQuorums = append(nonZeroQuorums, quorumID) + quorumResults := make(map[core.QuorumID]uint8) + for quorumID, quorumResult := range quorumAttestation.QuorumResults { + d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumResult) + if quorumResult.PercentSigned > 0 { + nonZeroQuorums = append(nonZeroQuorums, quorumID) + quorumResults[quorumID] = quorumResult.PercentSigned + } } if len(nonZeroQuorums) == 0 { return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash) @@ -274,11 +279,7 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err) } - quorumResults := make(map[core.QuorumID]uint8) - for quorumID, result := range quorumAttestation.QuorumResults { - quorumResults[quorumID] = result.PercentSigned - } - err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{ + attestation := &corev2.Attestation{ BatchHeader: batchData.Batch.BatchHeader, AttestedAt: uint64(time.Now().UnixNano()), NonSignerPubKeys: aggSig.NonSigners, @@ -287,18 +288,19 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, Sigma: aggSig.AggSignature, QuorumNumbers: nonZeroQuorums, QuorumResults: quorumResults, - }) + } + err = d.blobMetadataStore.PutAttestation(ctx, attestation) putAttestationFinished := time.Now() d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished)) if err != nil { return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err) } - err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified) + err = d.updateBatchStatus(ctx, batchData, attestation) updateBatchStatusFinished := time.Now() d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished)) if err != nil { - return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err) + return fmt.Errorf("failed to update blob statuses for batch %s: %w", batchHeaderHash, err) } d.logger.Debug("successfully processed batch", "batchHeader", batchHeaderHash) @@ -489,12 +491,41 @@ func (d *Dispatcher) sendChunks(ctx context.Context, client clients.NodeClient, return sig, nil } -func (d *Dispatcher) updateBatchStatus(ctx context.Context, keys []corev2.BlobKey, status v2.BlobStatus) error { - for _, key := range keys { - err := d.blobMetadataStore.UpdateBlobStatus(ctx, key, status) +func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, attestation *corev2.Attestation) error { + var multierr error + for i, cert := range batch.Batch.BlobCertificates { + blobKey := batch.BlobKeys[i] + if cert == nil || cert.BlobHeader == nil { + d.logger.Error("invalid blob certificate in batch") + err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Failed) + if err != nil { + multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err)) + } + continue + } + + failed := false + for _, q := range cert.BlobHeader.QuorumNumbers { + if res, ok := attestation.QuorumResults[q]; !ok || res == 0 { + d.logger.Error("quorum result not found", "quorumID", q, "blobKey", blobKey.Hex()) + failed = true + break + } + } + + if failed { + err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.INSUFFICIENT_SIGNATURES) + if err != nil { + multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err)) + } + continue + } + + err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Certified) if err != nil { - d.logger.Error("failed to update blob status", "blobKey", key.Hex(), "status", status.String(), "err", err) + multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to certified: %w", blobKey.Hex(), err)) } } - return nil + + return multierr } diff --git a/disperser/controller/dispatcher_test.go b/disperser/controller/dispatcher_test.go index d99580dd78..90efab4e9b 100644 --- a/disperser/controller/dispatcher_test.go +++ b/disperser/controller/dispatcher_test.go @@ -2,6 +2,7 @@ package controller_test import ( "context" + "errors" "math/big" "testing" "time" @@ -28,6 +29,7 @@ import ( var ( opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311") opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312") + opId2, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568313") mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{ 0: { opId0: 1, @@ -36,6 +38,7 @@ var ( 1: { opId0: 1, opId1: 3, + opId2: 1, }, }) finalizationBlockDelay = uint64(10) @@ -54,7 +57,7 @@ type dispatcherComponents struct { func TestDispatcherHandleBatch(t *testing.T) { components := newDispatcherComponents(t) - objs := setupBlobCerts(t, components.BlobMetadataStore, 2) + objs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, 2) ctx := context.Background() // Get batch header hash to mock signatures @@ -117,10 +120,89 @@ func TestDispatcherHandleBatch(t *testing.T) { deleteBlobs(t, components.BlobMetadataStore, objs.blobKeys, [][32]byte{bhh}) } +func TestInsufficientSignatures(t *testing.T) { + components := newDispatcherComponents(t) + failedObjs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, 2) + successfulObjs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{1}, 1) + ctx := context.Background() + + // Get batch header hash to mock signatures + certs := make([]*corev2.BlobCertificate, 0, len(failedObjs.blobCerts)+len(successfulObjs.blobCerts)) + certs = append(certs, failedObjs.blobCerts...) + certs = append(certs, successfulObjs.blobCerts...) + merkleTree, err := corev2.BuildMerkleTree(certs) + require.NoError(t, err) + require.NotNil(t, merkleTree) + require.NotNil(t, merkleTree.Root()) + batchHeader := &corev2.BatchHeader{ + ReferenceBlockNumber: blockNumber - finalizationBlockDelay, + } + copy(batchHeader.BatchRoot[:], merkleTree.Root()) + bhh, err := batchHeader.Hash() + require.NoError(t, err) + + // only op2 signs - quorum 0 will have 0 signing rate, quorum 1 will have 20% + mockClient0 := clientsmock.NewNodeClient() + mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure")) + op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort + op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort + op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort + require.NotEqual(t, op0Port, op1Port) + require.NotEqual(t, op0Port, op2Port) + components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil) + mockClient1 := clientsmock.NewNodeClient() + mockClient1.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure")) + components.NodeClientManager.On("GetClient", mock.Anything, op1Port).Return(mockClient1, nil) + mockClient2 := clientsmock.NewNodeClient() + sig := mockChainState.KeyPairs[opId2].SignMessage(bhh) + mockClient2.On("StoreChunks", mock.Anything, mock.Anything).Return(sig, nil) + components.NodeClientManager.On("GetClient", mock.Anything, op2Port).Return(mockClient2, nil) + + sigChan, batchData, err := components.Dispatcher.HandleBatch(ctx) + require.NoError(t, err) + err = components.Dispatcher.HandleSignatures(ctx, batchData, sigChan) + require.NoError(t, err) + + // Test that the blob metadata status are updated + for _, blobKey := range failedObjs.blobKeys { + bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey) + require.NoError(t, err) + require.Equal(t, v2.Failed, bm.BlobStatus) + } + for _, blobKey := range successfulObjs.blobKeys { + bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey) + require.NoError(t, err) + require.Equal(t, v2.Certified, bm.BlobStatus) + } + + // Get batch header + vis, err := components.BlobMetadataStore.GetBlobVerificationInfos(ctx, failedObjs.blobKeys[0]) + require.NoError(t, err) + require.Len(t, vis, 1) + bhh, err = vis[0].BatchHeader.Hash() + require.NoError(t, err) + + // Test that attestation is written + att, err := components.BlobMetadataStore.GetAttestation(ctx, bhh) + require.NoError(t, err) + require.NotNil(t, att) + require.Equal(t, vis[0].BatchHeader, att.BatchHeader) + require.Greater(t, att.AttestedAt, uint64(0)) + require.Len(t, att.NonSignerPubKeys, 2) + require.NotNil(t, att.APKG2) + require.Len(t, att.QuorumAPKs, 1) + require.NotNil(t, att.Sigma) + require.ElementsMatch(t, att.QuorumNumbers, []core.QuorumID{1}) + require.InDeltaMapValues(t, map[core.QuorumID]uint8{1: 20}, att.QuorumResults, 0) + + deleteBlobs(t, components.BlobMetadataStore, failedObjs.blobKeys, [][32]byte{bhh}) + deleteBlobs(t, components.BlobMetadataStore, successfulObjs.blobKeys, [][32]byte{bhh}) +} + func TestDispatcherMaxBatchSize(t *testing.T) { components := newDispatcherComponents(t) numBlobs := 12 - objs := setupBlobCerts(t, components.BlobMetadataStore, numBlobs) + objs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, numBlobs) ctx := context.Background() expectedNumBatches := (numBlobs + int(maxBatchSize) - 1) / int(maxBatchSize) for i := 0; i < expectedNumBatches; i++ { @@ -140,7 +222,7 @@ func TestDispatcherMaxBatchSize(t *testing.T) { func TestDispatcherNewBatch(t *testing.T) { components := newDispatcherComponents(t) - objs := setupBlobCerts(t, components.BlobMetadataStore, 2) + objs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, 2) require.Len(t, objs.blobHedaers, 2) require.Len(t, objs.blobKeys, 2) require.Len(t, objs.blobMetadatas, 2) @@ -258,14 +340,14 @@ type testObjects struct { blobCerts []*corev2.BlobCertificate } -func setupBlobCerts(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore, numObjects int) *testObjects { +func setupBlobCerts(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore, quorumNumbers []core.QuorumID, numObjects int) *testObjects { ctx := context.Background() headers := make([]*corev2.BlobHeader, numObjects) keys := make([]corev2.BlobKey, numObjects) metadatas := make([]*v2.BlobMetadata, numObjects) certs := make([]*corev2.BlobCertificate, numObjects) for i := 0; i < numObjects; i++ { - keys[i], headers[i] = newBlob(t) + keys[i], headers[i] = newBlob(t, quorumNumbers) now := time.Now() metadatas[i] = &v2.BlobMetadata{ BlobHeader: headers[i], diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go index e72d5f67e8..57bf27ebe6 100644 --- a/disperser/controller/encoding_manager_test.go +++ b/disperser/controller/encoding_manager_test.go @@ -2,10 +2,11 @@ package controller_test import ( "context" - "github.com/prometheus/client_golang/prometheus" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/Layr-Labs/eigenda/common" commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" @@ -97,7 +98,7 @@ func TestGetRelayKeys(t *testing.T) { func TestEncodingManagerHandleBatch(t *testing.T) { ctx := context.Background() - blobKey1, blobHeader1 := newBlob(t) + blobKey1, blobHeader1 := newBlob(t, []core.QuorumID{0, 1}) now := time.Now() metadata1 := &commonv2.BlobMetadata{ BlobHeader: blobHeader1, @@ -143,7 +144,7 @@ func TestEncodingManagerHandleManyBatches(t *testing.T) { headers := make([]*corev2.BlobHeader, numBlobs) metadata := make([]*commonv2.BlobMetadata, numBlobs) for i := 0; i < numBlobs; i++ { - keys[i], headers[i] = newBlob(t) + keys[i], headers[i] = newBlob(t, []core.QuorumID{0, 1}) now := time.Now() metadata[i] = &commonv2.BlobMetadata{ BlobHeader: headers[i], @@ -176,7 +177,7 @@ func TestEncodingManagerHandleManyBatches(t *testing.T) { require.ErrorContains(t, err, "no blobs to encode") // new record - key, header := newBlob(t) + key, header := newBlob(t, []core.QuorumID{0, 1}) now := time.Now() meta := &commonv2.BlobMetadata{ BlobHeader: header, @@ -205,7 +206,7 @@ func TestEncodingManagerHandleBatchNoBlobs(t *testing.T) { func TestEncodingManagerHandleBatchRetrySuccess(t *testing.T) { ctx := context.Background() - blobKey1, blobHeader1 := newBlob(t) + blobKey1, blobHeader1 := newBlob(t, []core.QuorumID{0, 1}) now := time.Now() metadata1 := &commonv2.BlobMetadata{ BlobHeader: blobHeader1, @@ -248,7 +249,7 @@ func TestEncodingManagerHandleBatchRetrySuccess(t *testing.T) { func TestEncodingManagerHandleBatchRetryFailure(t *testing.T) { ctx := context.Background() - blobKey1, blobHeader1 := newBlob(t) + blobKey1, blobHeader1 := newBlob(t, []core.QuorumID{0, 1}) now := time.Now() metadata1 := &commonv2.BlobMetadata{ BlobHeader: blobHeader1,