From a0c28c792c23f59c381ae7206294883d9380a9f1 Mon Sep 17 00:00:00 2001 From: Ian Shim <100327837+ian-shim@users.noreply.github.com> Date: Thu, 18 Apr 2024 09:57:12 -0700 Subject: [PATCH] Add `Confirming` state for blobs (#466) --- api/grpc/disperser/disperser.pb.go | 80 +++++++++++--------- api/grpc/node/node.pb.go | 2 + api/proto/disperser/disperser.proto | 3 + disperser/apiserver/server.go | 2 +- disperser/apiserver/server_test.go | 25 ++++++ disperser/batcher/batcher.go | 26 ++++--- disperser/batcher/batcher_test.go | 19 +++-- disperser/batcher/encoded_blob_store.go | 28 +------ disperser/batcher/encoding_streamer.go | 13 +--- disperser/batcher/finalizer.go | 3 +- disperser/batcher/finalizer_test.go | 20 +---- disperser/common/blobstore/shared_storage.go | 7 ++ disperser/common/inmem/store.go | 13 ++++ disperser/disperser.go | 10 +++ 14 files changed, 140 insertions(+), 111 deletions(-) diff --git a/api/grpc/disperser/disperser.pb.go b/api/grpc/disperser/disperser.pb.go index 4a94490a67..8756a44996 100644 --- a/api/grpc/disperser/disperser.pb.go +++ b/api/grpc/disperser/disperser.pb.go @@ -38,6 +38,8 @@ const ( // INSUFFICIENT_SIGNATURES means that the confirmation threshold for the blob was not met // for at least one quorum. BlobStatus_INSUFFICIENT_SIGNATURES BlobStatus = 5 + // CONFIRMING means that the blob has been dispersed to DA nodes and it's waiting for the confirmation onchain + BlobStatus_CONFIRMING BlobStatus = 6 ) // Enum value maps for BlobStatus. @@ -49,6 +51,7 @@ var ( 3: "FAILED", 4: "FINALIZED", 5: "INSUFFICIENT_SIGNATURES", + 6: "CONFIRMING", } BlobStatus_value = map[string]int32{ "UNKNOWN": 0, @@ -57,6 +60,7 @@ var ( "FAILED": 3, "FINALIZED": 4, "INSUFFICIENT_SIGNATURES": 5, + "CONFIRMING": 6, } ) @@ -358,7 +362,12 @@ type DisperseBlobRequest struct { unknownFields protoimpl.UnknownFields // The data to be dispersed. - // The size of data must be <= 2MiB. + // The size of data must be <= 2MiB. Every 32 bytes of data chunk is interpreted as an integer in big endian format + // where the lower address has more significant bits. The integer must stay in the valid range to be interpreted + // as a field element on the bn254 curve. The valid range is + // 0 <= x < 21888242871839275222246405745257275088548364400416034343698204186575808495617 + // containing slightly less than 254 bits and more than 253 bits. If any one of the 32 bytes chunk is outside the range, + // the whole request is deemed as invalid, and rejected. Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` // The quorums to which the blob will be sent, in addition to the required quorums which are configured // on the EigenDA smart contract. If required quorums are included here, an error will be returned. @@ -759,7 +768,7 @@ type BlobHeader struct { // KZG commitment of the blob. Commitment *common.G1Commitment `protobuf:"bytes,1,opt,name=commitment,proto3" json:"commitment,omitempty"` - // The length of the blob in symbols (each symbol is 31 bytes). + // The length of the blob in symbols (each symbol is 32 bytes). DataLength uint32 `protobuf:"varint,2,opt,name=data_length,json=dataLength,proto3" json:"data_length,omitempty"` // The params of the quorums that this blob participates in. BlobQuorumParams []*BlobQuorumParam `protobuf:"bytes,3,rep,name=blob_quorum_params,json=blobQuorumParams,proto3" json:"blob_quorum_params,omitempty"` @@ -1311,40 +1320,41 @@ var file_disperser_disperser_proto_rawDesc = []byte{ 0x65, 0x73, 0x12, 0x34, 0x0a, 0x16, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x14, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x42, 0x6c, 0x6f, - 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x2a, 0x70, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, 0x62, - 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, - 0x4e, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x50, 0x52, 0x4f, 0x43, 0x45, 0x53, 0x53, 0x49, 0x4e, - 0x47, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, 0x4e, 0x46, 0x49, 0x52, 0x4d, 0x45, 0x44, - 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0d, - 0x0a, 0x09, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x04, 0x12, 0x1b, 0x0a, - 0x17, 0x49, 0x4e, 0x53, 0x55, 0x46, 0x46, 0x49, 0x43, 0x49, 0x45, 0x4e, 0x54, 0x5f, 0x53, 0x49, - 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x53, 0x10, 0x05, 0x32, 0xd9, 0x02, 0x0a, 0x09, 0x44, - 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x4e, 0x0a, 0x0c, 0x44, 0x69, 0x73, 0x70, - 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, - 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, - 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, - 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, - 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x5f, 0x0a, 0x19, 0x44, 0x69, 0x73, 0x70, - 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, - 0x63, 0x61, 0x74, 0x65, 0x64, 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, - 0x72, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, + 0x63, 0x6b, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x2a, 0x80, 0x01, 0x0a, 0x0a, 0x42, 0x6c, 0x6f, + 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, + 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x50, 0x52, 0x4f, 0x43, 0x45, 0x53, 0x53, 0x49, + 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, 0x4e, 0x46, 0x49, 0x52, 0x4d, 0x45, + 0x44, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x12, + 0x0d, 0x0a, 0x09, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 0x45, 0x44, 0x10, 0x04, 0x12, 0x1b, + 0x0a, 0x17, 0x49, 0x4e, 0x53, 0x55, 0x46, 0x46, 0x49, 0x43, 0x49, 0x45, 0x4e, 0x54, 0x5f, 0x53, + 0x49, 0x47, 0x4e, 0x41, 0x54, 0x55, 0x52, 0x45, 0x53, 0x10, 0x05, 0x12, 0x0e, 0x0a, 0x0a, 0x43, + 0x4f, 0x4e, 0x46, 0x49, 0x52, 0x4d, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x32, 0xd9, 0x02, 0x0a, 0x09, + 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x12, 0x4e, 0x0a, 0x0c, 0x44, 0x69, 0x73, + 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, + 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, + 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, + 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x44, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, + 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x5f, 0x0a, 0x19, 0x44, 0x69, 0x73, + 0x70, 0x65, 0x72, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, + 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x12, 0x1f, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, - 0x52, 0x65, 0x70, 0x6c, 0x79, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4b, 0x0a, 0x0d, 0x47, 0x65, 0x74, - 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1c, 0x2e, 0x64, 0x69, 0x73, - 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, - 0x72, 0x73, 0x65, 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, - 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x0c, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, - 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, - 0x65, 0x72, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, - 0x65, 0x72, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x52, - 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, - 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, - 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, + 0x73, 0x65, 0x72, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, + 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4b, 0x0a, 0x0d, 0x47, 0x65, + 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1c, 0x2e, 0x64, 0x69, + 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x69, 0x73, 0x70, + 0x65, 0x72, 0x73, 0x65, 0x72, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x0c, 0x52, 0x65, 0x74, 0x72, 0x69, + 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, + 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, + 0x73, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x74, 0x72, 0x69, 0x65, 0x76, 0x65, 0x42, 0x6c, 0x6f, 0x62, + 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, + 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, + 0x2f, 0x64, 0x69, 0x73, 0x70, 0x65, 0x72, 0x73, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/api/grpc/node/node.pb.go b/api/grpc/node/node.pb.go index 67f330d032..ebfd8c58eb 100644 --- a/api/grpc/node/node.pb.go +++ b/api/grpc/node/node.pb.go @@ -441,6 +441,8 @@ type Blob struct { // Each bundle contains all chunks for a single quorum of the blob. // The number of bundles must be equal to the total number of quorums associated // with the blob, and the ordering must be the same as BlobHeader.quorum_headers. + // Note: an operator may be in some but not all of the quorums; in that case the + // bundle corresponding to that quorum will be empty. Bundles []*Bundle `protobuf:"bytes,2,rep,name=bundles,proto3" json:"bundles,omitempty"` } diff --git a/api/proto/disperser/disperser.proto b/api/proto/disperser/disperser.proto index cd77cdce20..f42b0c972d 100644 --- a/api/proto/disperser/disperser.proto +++ b/api/proto/disperser/disperser.proto @@ -149,6 +149,9 @@ enum BlobStatus { // INSUFFICIENT_SIGNATURES means that the confirmation threshold for the blob was not met // for at least one quorum. INSUFFICIENT_SIGNATURES = 5; + + // CONFIRMING means that the blob has been dispersed to DA nodes and it's waiting for the confirmation onchain + CONFIRMING = 6; } // Types below correspond to the types necessary to verify a blob diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index ecea41788e..4a04a939bf 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -839,7 +839,7 @@ func (s *DispersalServer) updateQuorumConfig(ctx context.Context) (QuorumConfig, func getResponseStatus(status disperser.BlobStatus) pb.BlobStatus { switch status { - case disperser.Processing: + case disperser.Confirming, disperser.Processing: return pb.BlobStatus_PROCESSING case disperser.Confirmed: return pb.BlobStatus_CONFIRMED diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index f20de0ae97..7306a50c82 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -292,6 +292,31 @@ func TestGetBlobStatus(t *testing.T) { assert.Equal(t, reply.GetInfo().GetBlobVerificationProof().GetQuorumIndexes(), quorumIndexes) } +func TestGetBlobConfirmingStatus(t *testing.T) { + data := make([]byte, 1024) + _, err := rand.Read(data) + assert.NoError(t, err) + + data = codec.ConvertByPaddingEmptyByte(data) + + status, _, requestID := disperseBlob(t, dispersalServer, data) + assert.Equal(t, status, pb.BlobStatus_PROCESSING) + assert.NotNil(t, requestID) + blobKey, err := disperser.ParseBlobKey(string(requestID)) + assert.NoError(t, err) + err = queue.MarkBlobConfirming(context.Background(), blobKey) + assert.NoError(t, err) + meta, err := queue.GetBlobMetadata(context.Background(), blobKey) + assert.NoError(t, err) + assert.Equal(t, meta.BlobStatus, disperser.Confirming) + + reply, err := dispersalServer.GetBlobStatus(context.Background(), &pb.BlobStatusRequest{ + RequestId: requestID, + }) + assert.NoError(t, err) + assert.Equal(t, reply.GetStatus(), pb.BlobStatus_PROCESSING) +} + func TestRetrieveBlob(t *testing.T) { for i := 0; i < 3; i++ { diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 28403f494e..e6f6abb899 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -299,14 +299,10 @@ func (b *Batcher) updateConfirmationInfo( if status == disperser.Confirmed { if _, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil { b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed) - // remove encoded blob from storage so we don't disperse it again - b.EncodingStreamer.RemoveEncodedBlob(metadata) } } else if status == disperser.InsufficientSignatures { if _, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo); updateConfirmationInfoErr == nil { b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures) - // remove encoded blob from storage so we don't disperse it again - b.EncodingStreamer.RemoveEncodedBlob(metadata) } } else { updateConfirmationInfoErr = fmt.Errorf("HandleSingleBatch: trying to update confirmation info for blob in status other than confirmed or insufficient signatures: %s", status.String()) @@ -392,6 +388,17 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser. return result.ErrorOrNil() } +func (b *Batcher) transitionBlobToConfirming(ctx context.Context, metadata *disperser.BlobMetadata) error { + err := b.Queue.MarkBlobConfirming(ctx, metadata.GetBlobKey()) + if err != nil { + b.logger.Error("error marking blob as confirming", "err", err) + return err + } + // remove encoded blob from storage so we don't disperse it again + b.EncodingStreamer.RemoveEncodedBlob(metadata) + return nil +} + type confirmationMetadata struct { batchHeader *core.BatchHeader blobs []*disperser.BlobMetadata @@ -494,13 +501,10 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch) return fmt.Errorf("HandleSingleBatch: error sending confirmBatch transaction: %w", err) - } else { - for _, metadata := range batch.BlobMetadata { - err = b.EncodingStreamer.MarkBlobPendingConfirmation(metadata) - if err != nil { - log.Error("HandleSingleBatch: error marking blob as pending confirmation", "err", err) - } - } + } + + for _, metadata := range batch.BlobMetadata { + _ = b.transitionBlobToConfirming(ctx, metadata) } return nil diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index a42fe35f0d..b15de072b6 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -402,7 +402,7 @@ func TestBlobRetry(t *testing.T) { encodedResult, err := components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0) assert.NoError(t, err) - assert.Equal(t, encodedResult.Status, bat.PendingDispersal) + assert.NotNil(t, encodedResult) txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil) @@ -411,12 +411,13 @@ func TestBlobRetry(t *testing.T) { err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) + // ConfirmBatch transaction has been sent. Waiting for transaction to be confirmed onchain meta, err := blobStore.GetBlobMetadata(ctx, blobKey) assert.NoError(t, err) - assert.Equal(t, disperser.Processing, meta.BlobStatus) + assert.Equal(t, disperser.Confirming, meta.BlobStatus) encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0) - assert.NoError(t, err) - assert.Equal(t, encodedResult.Status, bat.PendingConfirmation) + assert.ErrorContains(t, err, "no such key") + assert.Nil(t, encodedResult) err = components.encodingStreamer.RequestEncoding(ctx, out) assert.NoError(t, err) @@ -444,12 +445,10 @@ func TestBlobRetry(t *testing.T) { batch, err = components.encodingStreamer.CreateBatch() assert.ErrorContains(t, err, "no encoded results") assert.Nil(t, batch) - _, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0) - assert.NoError(t, err) meta, err = blobStore.GetBlobMetadata(ctx, blobKey) assert.NoError(t, err) - assert.Equal(t, disperser.Processing, meta.BlobStatus) + assert.Equal(t, disperser.Confirming, meta.BlobStatus) // Trigger a retry confirmationErr := errors.New("error") @@ -459,6 +458,10 @@ func TestBlobRetry(t *testing.T) { Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, }) assert.ErrorIs(t, err, confirmationErr) + meta, err = blobStore.GetBlobMetadata(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, disperser.Processing, meta.BlobStatus) + assert.Equal(t, uint(1), meta.NumRetries) components.encodingStreamer.ReferenceBlockNumber = 14 // Should pick up the blob to encode @@ -475,7 +478,7 @@ func TestBlobRetry(t *testing.T) { assert.NoError(t, err) encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0) assert.NoError(t, err) - assert.Equal(t, encodedResult.Status, bat.PendingDispersal) + assert.NotNil(t, encodedResult) } func TestRetryTxnReceipt(t *testing.T) { diff --git a/disperser/batcher/encoded_blob_store.go b/disperser/batcher/encoded_blob_store.go index 9e9c159d18..f2e2f8b8a3 100644 --- a/disperser/batcher/encoded_blob_store.go +++ b/disperser/batcher/encoded_blob_store.go @@ -11,12 +11,6 @@ import ( ) type requestID string -type status uint - -const ( - PendingDispersal status = iota - PendingConfirmation -) type encodedBlobStore struct { mu sync.RWMutex @@ -37,7 +31,6 @@ type EncodingResult struct { Commitment *encoding.BlobCommitments Chunks []*encoding.Frame Assignments map[core.OperatorID]core.Assignment - Status status } // EncodingResultOrStatus is a wrapper for EncodingResult that also contains an error @@ -74,7 +67,7 @@ func (e *encodedBlobStore) HasEncodingRequested(blobKey disperser.BlobKey, quoru } res, ok := e.encoded[requestID] - if ok && (res.Status == PendingConfirmation || res.ReferenceBlockNumber == referenceBlockNumber) { + if ok && res.ReferenceBlockNumber == referenceBlockNumber { return true } return false @@ -148,9 +141,7 @@ func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint) staleCount := 0 pendingConfirmation := 0 for k, encodedResult := range e.encoded { - if encodedResult.Status == PendingConfirmation { - pendingConfirmation++ - } else if encodedResult.ReferenceBlockNumber == blockNumber { + if encodedResult.ReferenceBlockNumber == blockNumber { fetched = append(fetched, encodedResult) } else if encodedResult.ReferenceBlockNumber < blockNumber { // this is safe: https://go.dev/doc/effective_go#for @@ -158,7 +149,7 @@ func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint) staleCount++ e.encodedResultSize -= getChunksSize(encodedResult) } else { - e.logger.Error("GetNewAndDeleteStaleEncodingResults: unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "blockNumber", blockNumber, "status", encodedResult.Status) + e.logger.Error("unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "blockNumber", blockNumber) } } e.logger.Debug("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "pendingConfirmation", pendingConfirmation, "blockNumber", blockNumber, "encodedSize", e.encodedResultSize) @@ -174,19 +165,6 @@ func (e *encodedBlobStore) GetEncodedResultSize() (int, uint64) { return len(e.encoded), e.encodedResultSize } -func (e *encodedBlobStore) MarkEncodedResultPendingConfirmation(blobKey disperser.BlobKey, quorumID core.QuorumID) error { - e.mu.Lock() - defer e.mu.Unlock() - - requestID := getRequestID(blobKey, quorumID) - if _, ok := e.encoded[requestID]; !ok { - return fmt.Errorf("MarkEncodedBlobPendingConfirmation: no such key (%s) in encoded set", requestID) - } - - e.encoded[requestID].Status = PendingConfirmation - return nil -} - func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID { return requestID(fmt.Sprintf("%s-%d", key.String(), quorumID)) } diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index c9b4fdcfb5..2dea5220e2 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -382,7 +382,6 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata Commitment: commits, Chunks: chunks, Assignments: res.Assignments, - Status: PendingDispersal, }, Err: nil, } @@ -575,16 +574,6 @@ func (e *EncodingStreamer) RemoveEncodedBlob(metadata *disperser.BlobMetadata) { } } -func (e *EncodingStreamer) MarkBlobPendingConfirmation(metadata *disperser.BlobMetadata) error { - for _, sp := range metadata.RequestMetadata.SecurityParams { - err := e.EncodedBlobstore.MarkEncodedResultPendingConfirmation(metadata.GetBlobKey(), sp.QuorumID) - if err != nil { - return fmt.Errorf("error marking blob pending confirmation: %w", err) - } - } - return nil -} - // getOperatorState returns the operator state for the blobs that have valid quorums func (e *EncodingStreamer) getOperatorState(ctx context.Context, metadatas []*disperser.BlobMetadata, blockNumber uint) (*core.IndexedOperatorState, error) { @@ -617,7 +606,7 @@ func (e *EncodingStreamer) validateMetadataQuorums(metadatas []*disperser.BlobMe valid := true for _, quorum := range metadata.RequestMetadata.SecurityParams { if aggKey, ok := state.AggKeys[quorum.QuorumID]; !ok || aggKey == nil { - e.logger.Warn("got blob with a quorum without APK. Will skip.", "quorum", quorum.QuorumID) + e.logger.Warn("got blob with a quorum without APK. Will skip.", "blobKey", metadata.GetBlobKey(), "quorum", quorum.QuorumID) valid = false } } diff --git a/disperser/batcher/finalizer.go b/disperser/batcher/finalizer.go index 2e4e76cde7..c7ca483c8f 100644 --- a/disperser/batcher/finalizer.go +++ b/disperser/batcher/finalizer.go @@ -179,7 +179,8 @@ func (f *finalizer) updateBlobs(ctx context.Context, metadatas []*disperser.Blob confirmationBlockNumber, err := f.getTransactionBlockNumber(ctx, confirmationMetadata.ConfirmationInfo.ConfirmationTxnHash) if errors.Is(err, ethereum.NotFound) { // The confirmed block is finalized, but the transaction is not found. It means the transaction should be considered forked/invalid and the blob should be considered as failed. - _, err := f.blobStore.HandleBlobFailure(ctx, m, f.maxNumRetriesPerBlob) + f.logger.Warn("confirmed transaction not found", "blobKey", blobKey.String(), "confirmationTxnHash", confirmationMetadata.ConfirmationInfo.ConfirmationTxnHash.Hex(), "confirmationBlockNumber", confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) + err := f.blobStore.MarkBlobFailed(ctx, m.GetBlobKey()) if err != nil { f.logger.Error("error marking blob as failed", "blobKey", blobKey.String(), "err", err) } diff --git a/disperser/batcher/finalizer_test.go b/disperser/batcher/finalizer_test.go index f1090e50fc..ff1c4e24b4 100644 --- a/disperser/batcher/finalizer_test.go +++ b/disperser/batcher/finalizer_test.go @@ -260,27 +260,11 @@ func TestNoReceipt(t *testing.T) { assert.Len(t, metadatas, 0) metadatas, err = queue.GetBlobMetadataByStatus(ctx, disperser.Failed) assert.NoError(t, err) - assert.Len(t, metadatas, 0) - metadatas, err = queue.GetBlobMetadataByStatus(ctx, disperser.Confirmed) - assert.NoError(t, err) assert.Len(t, metadatas, 1) - // num retries should be incremented - assert.Equal(t, metadatas[0].NumRetries, uint(1)) - - // try again - err = finalizer.FinalizeBlobs(context.Background()) - assert.NoError(t, err) - - // status should be transitioned to failed - metadatas, err = queue.GetBlobMetadataByStatus(ctx, disperser.Finalized) - assert.NoError(t, err) - assert.Len(t, metadatas, 0) metadatas, err = queue.GetBlobMetadataByStatus(ctx, disperser.Confirmed) assert.NoError(t, err) assert.Len(t, metadatas, 0) - metadatas, err = queue.GetBlobMetadataByStatus(ctx, disperser.Failed) + metadatas, err = queue.GetBlobMetadataByStatus(ctx, disperser.Processing) assert.NoError(t, err) - assert.Len(t, metadatas, 1) - // num retries should be the same - assert.Equal(t, metadatas[0].NumRetries, uint(1)) + assert.Len(t, metadatas, 0) } diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index e107df6c78..3dccd8dc47 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -156,6 +156,10 @@ func (s *SharedBlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadat return &newMetadata, s.blobMetadataStore.UpdateBlobMetadata(ctx, existingMetadata.GetBlobKey(), &newMetadata) } +func (s *SharedBlobStore) MarkBlobConfirming(ctx context.Context, metadataKey disperser.BlobKey) error { + return s.blobMetadataStore.SetBlobStatus(ctx, metadataKey, disperser.Confirming) +} + func (s *SharedBlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationInfo *disperser.ConfirmationInfo) (*disperser.BlobMetadata, error) { newMetadata := *existingMetadata newMetadata.BlobStatus = disperser.InsufficientSignatures @@ -235,6 +239,9 @@ func (s *SharedBlobStore) GetBlobMetadata(ctx context.Context, metadataKey dispe func (s *SharedBlobStore) HandleBlobFailure(ctx context.Context, metadata *disperser.BlobMetadata, maxRetry uint) (bool, error) { if metadata.NumRetries < maxRetry { + if err := s.MarkBlobProcessing(ctx, metadata.GetBlobKey()); err != nil { + return true, err + } return true, s.IncrementBlobRetryCount(ctx, metadata) } else { return false, s.MarkBlobFailed(ctx, metadata.GetBlobKey()) diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index d3d6c4e2e2..6caf45cfbb 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -99,6 +99,16 @@ func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *dis return &newMetadata, nil } +func (q *BlobStore) MarkBlobConfirming(ctx context.Context, blobKey disperser.BlobKey) error { + q.mu.Lock() + defer q.mu.Unlock() + if _, ok := q.Metadata[blobKey]; !ok { + return disperser.ErrBlobNotFound + } + q.Metadata[blobKey].BlobStatus = disperser.Confirming + return nil +} + func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationInfo *disperser.ConfirmationInfo) (*disperser.BlobMetadata, error) { q.mu.Lock() defer q.mu.Unlock() @@ -262,6 +272,9 @@ func (q *BlobStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobK func (q *BlobStore) HandleBlobFailure(ctx context.Context, metadata *disperser.BlobMetadata, maxRetry uint) (bool, error) { if metadata.NumRetries < maxRetry { + if err := q.MarkBlobProcessing(ctx, metadata.GetBlobKey()); err != nil { + return true, err + } return true, q.IncrementBlobRetryCount(ctx, metadata) } else { return false, q.MarkBlobFailed(ctx, metadata.GetBlobKey()) diff --git a/disperser/disperser.go b/disperser/disperser.go index 1819267b42..579710117b 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -26,6 +26,7 @@ const ( Failed Finalized InsufficientSignatures + Confirming ) var enumStrings = map[BlobStatus]string{ @@ -34,6 +35,7 @@ var enumStrings = map[BlobStatus]string{ Failed: "Failed", Finalized: "Finalized", InsufficientSignatures: "InsufficientSignatures", + Confirming: "Confirming", } func (bs BlobStatus) String() string { @@ -140,6 +142,8 @@ type BlobStore interface { // MarkBlobConfirmed updates blob metadata to Confirmed status with confirmation info // Returns the updated metadata and error MarkBlobConfirmed(ctx context.Context, existingMetadata *BlobMetadata, confirmationInfo *ConfirmationInfo) (*BlobMetadata, error) + // MarkBlobConfirming updates blob metadata to Confirming status + MarkBlobConfirming(ctx context.Context, blobKey BlobKey) error // MarkBlobInsufficientSignatures updates blob metadata to InsufficientSignatures status with confirmation info // Returns the updated metadata and error MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *BlobMetadata, confirmationInfo *ConfirmationInfo) (*BlobMetadata, error) @@ -202,6 +206,12 @@ func FromBlobStatusProto(status disperser_rpc.BlobStatus) (*BlobStatus, error) { case disperser_rpc.BlobStatus_FINALIZED: res = Finalized return &res, nil + case disperser_rpc.BlobStatus_INSUFFICIENT_SIGNATURES: + res = InsufficientSignatures + return &res, nil + case disperser_rpc.BlobStatus_CONFIRMING: + res = Confirming + return &res, nil } return nil, fmt.Errorf("unknown blob status: %v", status)