diff --git a/api/docs/disperser_v2.html b/api/docs/disperser_v2.html
index 41d63829c..50b3685d7 100644
--- a/api/docs/disperser_v2.html
+++ b/api/docs/disperser_v2.html
@@ -789,7 +789,7 @@
SignedBatch
BlobStatus
- BlobStatus represents the status of a blob.
The status of a blob is updated as the blob is processed by the disperser.
The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:
- QUEUED
- ENCODED
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
+ BlobStatus represents the status of a blob.
The status of a blob is updated as the blob is processed by the disperser.
The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:
- QUEUED
- ENCODED
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
- INSUFFICIENT_SIGNATURES
Name | Number | Description |
@@ -826,6 +826,12 @@ BlobStatus
FAILED means that the blob has failed permanently |
+
+ INSUFFICIENT_SIGNATURES |
+ 5 |
+ INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation |
+
+
diff --git a/api/docs/disperser_v2.md b/api/docs/disperser_v2.md
index 622c24f4e..0ca84f3b0 100644
--- a/api/docs/disperser_v2.md
+++ b/api/docs/disperser_v2.md
@@ -286,6 +286,7 @@ Intermediate states are states that the blob can be in while being processed, an
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
+- INSUFFICIENT_SIGNATURES
| Name | Number | Description |
| ---- | ------ | ----------- |
@@ -294,6 +295,7 @@ Terminal states are states that will not be updated to a different state:
| ENCODED | 2 | ENCODED means that the blob has been encoded and is ready to be dispersed to DA Nodes |
| CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes |
| FAILED | 4 | FAILED means that the blob has failed permanently |
+| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation |
diff --git a/api/docs/eigenda-protos.html b/api/docs/eigenda-protos.html
index b2c637223..44474db3c 100644
--- a/api/docs/eigenda-protos.html
+++ b/api/docs/eigenda-protos.html
@@ -2460,7 +2460,7 @@ SignedBatch
BlobStatus
- BlobStatus represents the status of a blob.
The status of a blob is updated as the blob is processed by the disperser.
The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:
- QUEUED
- ENCODED
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
+ BlobStatus represents the status of a blob.
The status of a blob is updated as the blob is processed by the disperser.
The status of a blob can be queried by the client using the GetBlobStatus API.
Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:
- QUEUED
- ENCODED
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
- INSUFFICIENT_SIGNATURES
Name | Number | Description |
@@ -2497,6 +2497,12 @@ BlobStatus
FAILED means that the blob has failed permanently |
+
+ INSUFFICIENT_SIGNATURES |
+ 5 |
+ INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation |
+
+
diff --git a/api/docs/eigenda-protos.md b/api/docs/eigenda-protos.md
index 027e2ffb0..60d2a21c9 100644
--- a/api/docs/eigenda-protos.md
+++ b/api/docs/eigenda-protos.md
@@ -988,6 +988,7 @@ Intermediate states are states that the blob can be in while being processed, an
Terminal states are states that will not be updated to a different state:
- CERTIFIED
- FAILED
+- INSUFFICIENT_SIGNATURES
| Name | Number | Description |
| ---- | ------ | ----------- |
@@ -996,6 +997,7 @@ Terminal states are states that will not be updated to a different state:
| ENCODED | 2 | ENCODED means that the blob has been encoded and is ready to be dispersed to DA Nodes |
| CERTIFIED | 3 | CERTIFIED means the blob has been dispersed and attested by the DA nodes |
| FAILED | 4 | FAILED means that the blob has failed permanently |
+| INSUFFICIENT_SIGNATURES | 5 | INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation |
diff --git a/api/grpc/disperser/v2/disperser_v2.pb.go b/api/grpc/disperser/v2/disperser_v2.pb.go
index 80d0814fe..4fb9718f6 100644
--- a/api/grpc/disperser/v2/disperser_v2.pb.go
+++ b/api/grpc/disperser/v2/disperser_v2.pb.go
@@ -31,6 +31,7 @@ const (
// Terminal states are states that will not be updated to a different state:
// - CERTIFIED
// - FAILED
+// - INSUFFICIENT_SIGNATURES
type BlobStatus int32
const (
@@ -43,6 +44,8 @@ const (
BlobStatus_CERTIFIED BlobStatus = 3
// FAILED means that the blob has failed permanently
BlobStatus_FAILED BlobStatus = 4
+ // INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation
+ BlobStatus_INSUFFICIENT_SIGNATURES BlobStatus = 5
)
// Enum value maps for BlobStatus.
@@ -53,13 +56,15 @@ var (
2: "ENCODED",
3: "CERTIFIED",
4: "FAILED",
+ 5: "INSUFFICIENT_SIGNATURES",
}
BlobStatus_value = map[string]int32{
- "UNKNOWN": 0,
- "QUEUED": 1,
- "ENCODED": 2,
- "CERTIFIED": 3,
- "FAILED": 4,
+ "UNKNOWN": 0,
+ "QUEUED": 1,
+ "ENCODED": 2,
+ "CERTIFIED": 3,
+ "FAILED": 4,
+ "INSUFFICIENT_SIGNATURES": 5,
}
)
@@ -1133,39 +1138,40 @@ var file_disperser_v2_disperser_v2_proto_rawDesc = []byte{
0x63, 0x6f, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0d, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x73,
0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65,
- 0x2a, 0x4d, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b,
+ 0x2a, 0x6a, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b,
0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x51,
0x55, 0x45, 0x55, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x45, 0x4e, 0x43, 0x4f, 0x44,
0x45, 0x44, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x45, 0x52, 0x54, 0x49, 0x46, 0x49, 0x45,
- 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x32,
- 0xf2, 0x02, 0x0a, 0x09, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x54, 0x0a,
- 0x0c, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x21, 0x2e,
+ 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x12,
+ 0x1b, 0x0a, 0x17, 0x49, 0x4e, 0x53, 0x55, 0x46, 0x46, 0x49, 0x43, 0x49, 0x45, 0x4e, 0x54, 0x5f,
+ 0x53, 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x53, 0x10, 0x05, 0x32, 0xf2, 0x02, 0x0a,
+ 0x09, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x54, 0x0a, 0x0c, 0x44, 0x69,
+ 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x21, 0x2e, 0x64, 0x69, 0x73,
+ 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72,
+ 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e,
0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x69, 0x73,
- 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
- 0x1a, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e,
- 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c,
- 0x79, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74,
- 0x61, 0x74, 0x75, 0x73, 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72,
- 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65,
- 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65,
- 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52,
- 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f,
- 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x23, 0x2e, 0x64, 0x69,
- 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x43,
- 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
- 0x1a, 0x21, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e,
- 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65,
- 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d,
- 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x24, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65,
- 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65,
- 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22,
- 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65,
- 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70,
- 0x6c, 0x79, 0x22, 0x00, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
- 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67,
- 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x69,
- 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
- 0x6f, 0x33,
+ 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00,
+ 0x12, 0x51, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75,
+ 0x73, 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32,
+ 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65,
+ 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76,
+ 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x70, 0x6c,
+ 0x79, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f,
+ 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x23, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65,
+ 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x6f, 0x6d, 0x6d,
+ 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e,
+ 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x6c, 0x6f,
+ 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x79,
+ 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74,
+ 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x24, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65,
+ 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53,
+ 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x64, 0x69,
+ 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x61,
+ 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22,
+ 0x00, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
+ 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64,
+ 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x65,
+ 0x72, 0x73, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/api/proto/disperser/v2/disperser_v2.proto b/api/proto/disperser/v2/disperser_v2.proto
index 2e26e61c4..0cae0f0d1 100644
--- a/api/proto/disperser/v2/disperser_v2.proto
+++ b/api/proto/disperser/v2/disperser_v2.proto
@@ -99,6 +99,7 @@ message GetPaymentStateReply {
// Terminal states are states that will not be updated to a different state:
// - CERTIFIED
// - FAILED
+// - INSUFFICIENT_SIGNATURES
enum BlobStatus {
UNKNOWN = 0;
@@ -113,6 +114,9 @@ enum BlobStatus {
// FAILED means that the blob has failed permanently
FAILED = 4;
+
+ // INSUFFICIENT_SIGNATURES means that the blob has failed to gather sufficient attestation
+ INSUFFICIENT_SIGNATURES = 5;
}
// SignedBatch is a batch of blobs with a signature.
diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go
index b87cca90c..151c85837 100644
--- a/disperser/common/v2/blob.go
+++ b/disperser/common/v2/blob.go
@@ -15,6 +15,7 @@ const (
Encoded
Certified
Failed
+ INSUFFICIENT_SIGNATURES
)
func (s BlobStatus) String() string {
@@ -27,6 +28,8 @@ func (s BlobStatus) String() string {
return "Certified"
case Failed:
return "Failed"
+ case INSUFFICIENT_SIGNATURES:
+ return "Insufficient Signatures"
default:
return "Unknown"
}
@@ -42,6 +45,8 @@ func (s BlobStatus) ToProfobuf() pb.BlobStatus {
return pb.BlobStatus_CERTIFIED
case Failed:
return pb.BlobStatus_FAILED
+ case INSUFFICIENT_SIGNATURES:
+ return pb.BlobStatus_INSUFFICIENT_SIGNATURES
default:
return pb.BlobStatus_UNKNOWN
}
@@ -57,6 +62,8 @@ func BlobStatusFromProtobuf(s pb.BlobStatus) (BlobStatus, error) {
return Certified, nil
case pb.BlobStatus_FAILED:
return Failed, nil
+ case pb.BlobStatus_INSUFFICIENT_SIGNATURES:
+ return INSUFFICIENT_SIGNATURES, nil
default:
return 0, fmt.Errorf("unknown blob status: %v", s)
}
diff --git a/disperser/controller/controller_test.go b/disperser/controller/controller_test.go
index 46954cec9..50b4145cc 100644
--- a/disperser/controller/controller_test.go
+++ b/disperser/controller/controller_test.go
@@ -154,7 +154,7 @@ func teardown() {
}
}
-func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) {
+func newBlob(t *testing.T, quorumNumbers []core.QuorumID) (corev2.BlobKey, *corev2.BlobHeader) {
accountBytes := make([]byte, 32)
_, err := rand.Read(accountBytes)
require.NoError(t, err)
@@ -168,7 +168,7 @@ func newBlob(t *testing.T) (corev2.BlobKey, *corev2.BlobHeader) {
require.NoError(t, err)
bh := &corev2.BlobHeader{
BlobVersion: 0,
- QuorumNumbers: []core.QuorumID{0, 1},
+ QuorumNumbers: quorumNumbers,
BlobCommitments: mockCommitment,
PaymentMetadata: core.PaymentMetadata{
AccountID: accountID,
diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go
index 594957897..031eef8fa 100644
--- a/disperser/controller/dispatcher.go
+++ b/disperser/controller/dispatcher.go
@@ -16,6 +16,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
+ "github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
)
@@ -259,9 +260,13 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))
nonZeroQuorums := make([]core.QuorumID, 0)
- for quorumID := range quorumAttestation.QuorumResults {
- d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID])
- nonZeroQuorums = append(nonZeroQuorums, quorumID)
+ quorumResults := make(map[core.QuorumID]uint8)
+ for quorumID, quorumResult := range quorumAttestation.QuorumResults {
+ d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumResult)
+ if quorumResult.PercentSigned > 0 {
+ nonZeroQuorums = append(nonZeroQuorums, quorumID)
+ quorumResults[quorumID] = quorumResult.PercentSigned
+ }
}
if len(nonZeroQuorums) == 0 {
return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash)
@@ -274,11 +279,7 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err)
}
- quorumResults := make(map[core.QuorumID]uint8)
- for quorumID, result := range quorumAttestation.QuorumResults {
- quorumResults[quorumID] = result.PercentSigned
- }
- err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
+ attestation := &corev2.Attestation{
BatchHeader: batchData.Batch.BatchHeader,
AttestedAt: uint64(time.Now().UnixNano()),
NonSignerPubKeys: aggSig.NonSigners,
@@ -287,18 +288,19 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
Sigma: aggSig.AggSignature,
QuorumNumbers: nonZeroQuorums,
QuorumResults: quorumResults,
- })
+ }
+ err = d.blobMetadataStore.PutAttestation(ctx, attestation)
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err)
}
- err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
+ err = d.updateBatchStatus(ctx, batchData, attestation)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
- return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err)
+ return fmt.Errorf("failed to update blob statuses for batch %s: %w", batchHeaderHash, err)
}
d.logger.Debug("successfully processed batch", "batchHeader", batchHeaderHash)
@@ -489,12 +491,41 @@ func (d *Dispatcher) sendChunks(ctx context.Context, client clients.NodeClient,
return sig, nil
}
-func (d *Dispatcher) updateBatchStatus(ctx context.Context, keys []corev2.BlobKey, status v2.BlobStatus) error {
- for _, key := range keys {
- err := d.blobMetadataStore.UpdateBlobStatus(ctx, key, status)
+func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, attestation *corev2.Attestation) error {
+ var multierr error
+ for i, cert := range batch.Batch.BlobCertificates {
+ blobKey := batch.BlobKeys[i]
+ if cert == nil || cert.BlobHeader == nil {
+ d.logger.Error("invalid blob certificate in batch")
+ err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Failed)
+ if err != nil {
+ multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
+ }
+ continue
+ }
+
+ failed := false
+ for _, q := range cert.BlobHeader.QuorumNumbers {
+ if res, ok := attestation.QuorumResults[q]; !ok || res == 0 {
+ d.logger.Error("quorum result not found", "quorumID", q, "blobKey", blobKey.Hex())
+ failed = true
+ break
+ }
+ }
+
+ if failed {
+ err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.INSUFFICIENT_SIGNATURES)
+ if err != nil {
+ multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
+ }
+ continue
+ }
+
+ err := d.blobMetadataStore.UpdateBlobStatus(ctx, blobKey, v2.Certified)
if err != nil {
- d.logger.Error("failed to update blob status", "blobKey", key.Hex(), "status", status.String(), "err", err)
+ multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to certified: %w", blobKey.Hex(), err))
}
}
- return nil
+
+ return multierr
}
diff --git a/disperser/controller/dispatcher_test.go b/disperser/controller/dispatcher_test.go
index d99580dd7..9f9378788 100644
--- a/disperser/controller/dispatcher_test.go
+++ b/disperser/controller/dispatcher_test.go
@@ -2,6 +2,7 @@ package controller_test
import (
"context"
+ "errors"
"math/big"
"testing"
"time"
@@ -28,6 +29,7 @@ import (
var (
opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311")
opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312")
+ opId2, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568313")
mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{
0: {
opId0: 1,
@@ -36,6 +38,7 @@ var (
1: {
opId0: 1,
opId1: 3,
+ opId2: 1,
},
})
finalizationBlockDelay = uint64(10)
@@ -54,7 +57,7 @@ type dispatcherComponents struct {
func TestDispatcherHandleBatch(t *testing.T) {
components := newDispatcherComponents(t)
- objs := setupBlobCerts(t, components.BlobMetadataStore, 2)
+ objs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, 2)
ctx := context.Background()
// Get batch header hash to mock signatures
@@ -74,12 +77,18 @@ func TestDispatcherHandleBatch(t *testing.T) {
mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(sig0, nil)
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort
+ op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort
require.NotEqual(t, op0Port, op1Port)
+ require.NotEqual(t, op0Port, op2Port)
components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil)
mockClient1 := clientsmock.NewNodeClient()
sig1 := mockChainState.KeyPairs[opId1].SignMessage(bhh)
mockClient1.On("StoreChunks", mock.Anything, mock.Anything).Return(sig1, nil)
components.NodeClientManager.On("GetClient", mock.Anything, op1Port).Return(mockClient1, nil)
+ mockClient2 := clientsmock.NewNodeClient()
+ sig2 := mockChainState.KeyPairs[opId2].SignMessage(bhh)
+ mockClient2.On("StoreChunks", mock.Anything, mock.Anything).Return(sig2, nil)
+ components.NodeClientManager.On("GetClient", mock.Anything, op2Port).Return(mockClient2, nil)
sigChan, batchData, err := components.Dispatcher.HandleBatch(ctx)
require.NoError(t, err)
@@ -117,10 +126,89 @@ func TestDispatcherHandleBatch(t *testing.T) {
deleteBlobs(t, components.BlobMetadataStore, objs.blobKeys, [][32]byte{bhh})
}
+func TestInsufficientSignatures(t *testing.T) {
+ components := newDispatcherComponents(t)
+ failedObjs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, 2)
+ successfulObjs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{1}, 1)
+ ctx := context.Background()
+
+ // Get batch header hash to mock signatures
+ certs := make([]*corev2.BlobCertificate, 0, len(failedObjs.blobCerts)+len(successfulObjs.blobCerts))
+ certs = append(certs, failedObjs.blobCerts...)
+ certs = append(certs, successfulObjs.blobCerts...)
+ merkleTree, err := corev2.BuildMerkleTree(certs)
+ require.NoError(t, err)
+ require.NotNil(t, merkleTree)
+ require.NotNil(t, merkleTree.Root())
+ batchHeader := &corev2.BatchHeader{
+ ReferenceBlockNumber: blockNumber - finalizationBlockDelay,
+ }
+ copy(batchHeader.BatchRoot[:], merkleTree.Root())
+ bhh, err := batchHeader.Hash()
+ require.NoError(t, err)
+
+ // only op2 signs - quorum 0 will have 0 signing rate, quorum 1 will have 20%
+ mockClient0 := clientsmock.NewNodeClient()
+ mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure"))
+ op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort
+ op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort
+ op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort
+ require.NotEqual(t, op0Port, op1Port)
+ require.NotEqual(t, op0Port, op2Port)
+ components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil)
+ mockClient1 := clientsmock.NewNodeClient()
+ mockClient1.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure"))
+ components.NodeClientManager.On("GetClient", mock.Anything, op1Port).Return(mockClient1, nil)
+ mockClient2 := clientsmock.NewNodeClient()
+ sig := mockChainState.KeyPairs[opId2].SignMessage(bhh)
+ mockClient2.On("StoreChunks", mock.Anything, mock.Anything).Return(sig, nil)
+ components.NodeClientManager.On("GetClient", mock.Anything, op2Port).Return(mockClient2, nil)
+
+ sigChan, batchData, err := components.Dispatcher.HandleBatch(ctx)
+ require.NoError(t, err)
+ err = components.Dispatcher.HandleSignatures(ctx, batchData, sigChan)
+ require.NoError(t, err)
+
+ // Test that the blob metadata status are updated
+ for _, blobKey := range failedObjs.blobKeys {
+ bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
+ require.NoError(t, err)
+ require.Equal(t, v2.Failed, bm.BlobStatus)
+ }
+ for _, blobKey := range successfulObjs.blobKeys {
+ bm, err := components.BlobMetadataStore.GetBlobMetadata(ctx, blobKey)
+ require.NoError(t, err)
+ require.Equal(t, v2.Certified, bm.BlobStatus)
+ }
+
+ // Get batch header
+ vis, err := components.BlobMetadataStore.GetBlobVerificationInfos(ctx, failedObjs.blobKeys[0])
+ require.NoError(t, err)
+ require.Len(t, vis, 1)
+ bhh, err = vis[0].BatchHeader.Hash()
+ require.NoError(t, err)
+
+ // Test that attestation is written
+ att, err := components.BlobMetadataStore.GetAttestation(ctx, bhh)
+ require.NoError(t, err)
+ require.NotNil(t, att)
+ require.Equal(t, vis[0].BatchHeader, att.BatchHeader)
+ require.Greater(t, att.AttestedAt, uint64(0))
+ require.Len(t, att.NonSignerPubKeys, 2)
+ require.NotNil(t, att.APKG2)
+ require.Len(t, att.QuorumAPKs, 1)
+ require.NotNil(t, att.Sigma)
+ require.ElementsMatch(t, att.QuorumNumbers, []core.QuorumID{1})
+ require.InDeltaMapValues(t, map[core.QuorumID]uint8{1: 20}, att.QuorumResults, 0)
+
+ deleteBlobs(t, components.BlobMetadataStore, failedObjs.blobKeys, [][32]byte{bhh})
+ deleteBlobs(t, components.BlobMetadataStore, successfulObjs.blobKeys, [][32]byte{bhh})
+}
+
func TestDispatcherMaxBatchSize(t *testing.T) {
components := newDispatcherComponents(t)
numBlobs := 12
- objs := setupBlobCerts(t, components.BlobMetadataStore, numBlobs)
+ objs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, numBlobs)
ctx := context.Background()
expectedNumBatches := (numBlobs + int(maxBatchSize) - 1) / int(maxBatchSize)
for i := 0; i < expectedNumBatches; i++ {
@@ -140,7 +228,7 @@ func TestDispatcherMaxBatchSize(t *testing.T) {
func TestDispatcherNewBatch(t *testing.T) {
components := newDispatcherComponents(t)
- objs := setupBlobCerts(t, components.BlobMetadataStore, 2)
+ objs := setupBlobCerts(t, components.BlobMetadataStore, []core.QuorumID{0, 1}, 2)
require.Len(t, objs.blobHedaers, 2)
require.Len(t, objs.blobKeys, 2)
require.Len(t, objs.blobMetadatas, 2)
@@ -258,14 +346,14 @@ type testObjects struct {
blobCerts []*corev2.BlobCertificate
}
-func setupBlobCerts(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore, numObjects int) *testObjects {
+func setupBlobCerts(t *testing.T, blobMetadataStore *blobstore.BlobMetadataStore, quorumNumbers []core.QuorumID, numObjects int) *testObjects {
ctx := context.Background()
headers := make([]*corev2.BlobHeader, numObjects)
keys := make([]corev2.BlobKey, numObjects)
metadatas := make([]*v2.BlobMetadata, numObjects)
certs := make([]*corev2.BlobCertificate, numObjects)
for i := 0; i < numObjects; i++ {
- keys[i], headers[i] = newBlob(t)
+ keys[i], headers[i] = newBlob(t, quorumNumbers)
now := time.Now()
metadatas[i] = &v2.BlobMetadata{
BlobHeader: headers[i],
diff --git a/disperser/controller/encoding_manager_test.go b/disperser/controller/encoding_manager_test.go
index e72d5f67e..129da5964 100644
--- a/disperser/controller/encoding_manager_test.go
+++ b/disperser/controller/encoding_manager_test.go
@@ -2,7 +2,6 @@ package controller_test
import (
"context"
- "github.com/prometheus/client_golang/prometheus"
"testing"
"time"
@@ -19,6 +18,7 @@ import (
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/gammazero/workerpool"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@@ -97,7 +97,7 @@ func TestGetRelayKeys(t *testing.T) {
func TestEncodingManagerHandleBatch(t *testing.T) {
ctx := context.Background()
- blobKey1, blobHeader1 := newBlob(t)
+ blobKey1, blobHeader1 := newBlob(t, []core.QuorumID{0, 1})
now := time.Now()
metadata1 := &commonv2.BlobMetadata{
BlobHeader: blobHeader1,
@@ -143,7 +143,7 @@ func TestEncodingManagerHandleManyBatches(t *testing.T) {
headers := make([]*corev2.BlobHeader, numBlobs)
metadata := make([]*commonv2.BlobMetadata, numBlobs)
for i := 0; i < numBlobs; i++ {
- keys[i], headers[i] = newBlob(t)
+ keys[i], headers[i] = newBlob(t, []core.QuorumID{0, 1})
now := time.Now()
metadata[i] = &commonv2.BlobMetadata{
BlobHeader: headers[i],
@@ -176,7 +176,7 @@ func TestEncodingManagerHandleManyBatches(t *testing.T) {
require.ErrorContains(t, err, "no blobs to encode")
// new record
- key, header := newBlob(t)
+ key, header := newBlob(t, []core.QuorumID{0, 1})
now := time.Now()
meta := &commonv2.BlobMetadata{
BlobHeader: header,
@@ -205,7 +205,7 @@ func TestEncodingManagerHandleBatchNoBlobs(t *testing.T) {
func TestEncodingManagerHandleBatchRetrySuccess(t *testing.T) {
ctx := context.Background()
- blobKey1, blobHeader1 := newBlob(t)
+ blobKey1, blobHeader1 := newBlob(t, []core.QuorumID{0, 1})
now := time.Now()
metadata1 := &commonv2.BlobMetadata{
BlobHeader: blobHeader1,
@@ -248,7 +248,7 @@ func TestEncodingManagerHandleBatchRetrySuccess(t *testing.T) {
func TestEncodingManagerHandleBatchRetryFailure(t *testing.T) {
ctx := context.Background()
- blobKey1, blobHeader1 := newBlob(t)
+ blobKey1, blobHeader1 := newBlob(t, []core.QuorumID{0, 1})
now := time.Now()
metadata1 := &commonv2.BlobMetadata{
BlobHeader: blobHeader1,