Skip to content

Commit

Permalink
Replace Confirming blob status with Dispersing status (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Apr 26, 2024
1 parent 8f24e8a commit 36307ab
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 49 deletions.
23 changes: 17 additions & 6 deletions api/grpc/disperser/disperser.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions api/proto/disperser/disperser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,26 @@ message RetrieveBlobReply {

// Data Types

// BlobStatus represents the status of a blob.
// The status of a blob is updated as the blob is processed by the disperser.
// The status of a blob can be queried by the client using the GetBlobStatus API.
// Intermediate states are states that the blob can be in while being processed, and it can be updated to a differet state:
// - PROCESSING
// - DISPERSING
// - CONFIRMED
// Terminal states are states that will not be updated to a different state:
// - FAILED
// - FINALIZED
// - INSUFFICIENT_SIGNATURES
enum BlobStatus {
UNKNOWN = 0;

// Intermediate states

// PROCESSING means that the blob is currently being processed by the disperser
PROCESSING = 1;
// CONFIRMED means that the blob has been dispersed to DA Nodes and the dispersed
// batch containing the blob has been confirmed onchain
CONFIRMED = 2;

// Terminal states

// FAILED means that the blob has failed permanently (for reasons other than insufficient
// signatures, which is a separate state)
FAILED = 3;
Expand All @@ -150,8 +157,8 @@ enum BlobStatus {
// for at least one quorum.
INSUFFICIENT_SIGNATURES = 5;

// CONFIRMING means that the blob has been dispersed to DA nodes and it's waiting for the confirmation onchain
CONFIRMING = 6;
// DISPERSING means that the blob is currently being dispersed to DA Nodes and being confirmed onchain
DISPERSING = 6;
}

// Types below correspond to the types necessary to verify a blob
Expand Down
2 changes: 1 addition & 1 deletion disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func (s *DispersalServer) updateQuorumConfig(ctx context.Context) (QuorumConfig,

func getResponseStatus(status disperser.BlobStatus) pb.BlobStatus {
switch status {
case disperser.Confirming, disperser.Processing:
case disperser.Dispersing, disperser.Processing:
return pb.BlobStatus_PROCESSING
case disperser.Confirmed:
return pb.BlobStatus_CONFIRMED
Expand Down
6 changes: 3 additions & 3 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestGetBlobStatus(t *testing.T) {
assert.Equal(t, reply.GetInfo().GetBlobVerificationProof().GetQuorumIndexes(), quorumIndexes)
}

func TestGetBlobConfirmingStatus(t *testing.T) {
func TestGetBlobDispersingStatus(t *testing.T) {
data := make([]byte, 1024)
_, err := rand.Read(data)
assert.NoError(t, err)
Expand All @@ -306,11 +306,11 @@ func TestGetBlobConfirmingStatus(t *testing.T) {
assert.NotNil(t, requestID)
blobKey, err := disperser.ParseBlobKey(string(requestID))
assert.NoError(t, err)
err = queue.MarkBlobConfirming(context.Background(), blobKey)
err = queue.MarkBlobDispersing(context.Background(), blobKey)
assert.NoError(t, err)
meta, err := queue.GetBlobMetadata(context.Background(), blobKey)
assert.NoError(t, err)
assert.Equal(t, meta.BlobStatus, disperser.Confirming)
assert.Equal(t, meta.BlobStatus, disperser.Dispersing)

reply, err := dispersalServer.GetBlobStatus(context.Background(), &pb.BlobStatusRequest{
RequestId: requestID,
Expand Down
17 changes: 1 addition & 16 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,6 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.
return result.ErrorOrNil()
}

func (b *Batcher) transitionBlobToConfirming(ctx context.Context, metadata *disperser.BlobMetadata) error {
err := b.Queue.MarkBlobConfirming(ctx, metadata.GetBlobKey())
if err != nil {
b.logger.Error("error marking blob as confirming", "err", err)
return err
}
// remove encoded blob from storage so we don't disperse it again
b.EncodingStreamer.RemoveEncodedBlob(metadata)
return nil
}

type confirmationMetadata struct {
batchHeader *core.BatchHeader
blobs []*disperser.BlobMetadata
Expand All @@ -421,7 +410,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
defer timer.ObserveDuration()

stageTimer := time.Now()
batch, err := b.EncodingStreamer.CreateBatch()
batch, err := b.EncodingStreamer.CreateBatch(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -504,10 +493,6 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
return fmt.Errorf("HandleSingleBatch: error sending confirmBatch transaction: %w", err)
}

for _, metadata := range batch.BlobMetadata {
_ = b.transitionBlobToConfirming(ctx, metadata)
}

return nil
}

Expand Down
8 changes: 4 additions & 4 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func TestBlobRetry(t *testing.T) {
// ConfirmBatch transaction has been sent. Waiting for transaction to be confirmed onchain
meta, err := blobStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Confirming, meta.BlobStatus)
assert.Equal(t, disperser.Dispersing, meta.BlobStatus)
encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.ErrorContains(t, err, "no such key")
assert.Nil(t, encodedResult)
Expand All @@ -428,7 +428,7 @@ func TestBlobRetry(t *testing.T) {
t.Fatal("shouldn't have picked up any blobs to encode")
case <-timer.C:
}
batch, err := components.encodingStreamer.CreateBatch()
batch, err := components.encodingStreamer.CreateBatch(context.Background())
assert.ErrorContains(t, err, "no encoded results")
assert.Nil(t, batch)

Expand All @@ -443,13 +443,13 @@ func TestBlobRetry(t *testing.T) {
case <-timer.C:
}

batch, err = components.encodingStreamer.CreateBatch()
batch, err = components.encodingStreamer.CreateBatch(context.Background())
assert.ErrorContains(t, err, "no encoded results")
assert.Nil(t, batch)

meta, err = blobStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Confirming, meta.BlobStatus)
assert.Equal(t, disperser.Dispersing, meta.BlobStatus)

// Trigger a retry
confirmationErr := errors.New("error")
Expand Down
18 changes: 17 additions & 1 deletion disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result Encod
// If successful, it returns a batch, and updates the reference block number for next batch to use.
// Otherwise, it returns an error and keeps the blobs in the encoded blob store.
// This function is meant to be called periodically in a single goroutine as it resets the state of the encoded blob store.
func (e *EncodingStreamer) CreateBatch() (*batch, error) {
func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) {
// lock to update e.ReferenceBlockNumber
e.mu.Lock()
defer e.mu.Unlock()
Expand Down Expand Up @@ -531,6 +531,10 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
metadatas := make([]*disperser.BlobMetadata, len(metadataByKey))
i := 0
for key := range metadataByKey {
err := e.transitionBlobToDispersing(ctx, metadataByKey[key])
if err != nil {
continue
}
encodedBlobs[i] = encodedBlobByKey[key]
blobHeaders[i] = blobHeaderByKey[key]
metadatas[i] = metadataByKey[key]
Expand Down Expand Up @@ -568,6 +572,18 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
}, nil
}

func (e *EncodingStreamer) transitionBlobToDispersing(ctx context.Context, metadata *disperser.BlobMetadata) error {
blobKey := metadata.GetBlobKey()
err := e.blobStore.MarkBlobDispersing(ctx, blobKey)
if err != nil {
e.logger.Error("error marking blob as dispersing", "err", err, "blobKey", blobKey.String())
return err
}
// remove encoded blob from storage so we don't disperse it again
e.RemoveEncodedBlob(metadata)
return nil
}

func (e *EncodingStreamer) RemoveEncodedBlob(metadata *disperser.BlobMetadata) {
for _, sp := range metadata.RequestMetadata.SecurityParams {
e.EncodedBlobstore.DeleteEncodingResult(metadata.GetBlobKey(), sp.QuorumID)
Expand Down
19 changes: 17 additions & 2 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func TestPartialBlob(t *testing.T) {

// get batch
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10))
batch, err := encodingStreamer.CreateBatch()
batch, err := encodingStreamer.CreateBatch(context.Background())
assert.Nil(t, err)
assert.NotNil(t, batch)
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(0))
Expand Down Expand Up @@ -644,10 +644,25 @@ func TestGetBatch(t *testing.T) {

// get batch
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(10))
batch, err := encodingStreamer.CreateBatch()
batch, err := encodingStreamer.CreateBatch(context.Background())
assert.Nil(t, err)
assert.NotNil(t, batch)
assert.Equal(t, encodingStreamer.ReferenceBlockNumber, uint(0))
metadata1, err = c.blobStore.GetBlobMetadata(ctx, metadataKey1)
assert.Nil(t, err)
assert.Equal(t, disperser.Dispersing, metadata1.BlobStatus)
metadata2, err = c.blobStore.GetBlobMetadata(ctx, metadataKey2)
assert.Equal(t, disperser.Dispersing, metadata2.BlobStatus)
assert.Nil(t, err)
res, err := encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(0))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey1, core.QuorumID(1))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(metadataKey2, core.QuorumID(0))
assert.Nil(t, res)
assert.ErrorContains(t, err, "GetEncodedBlob: no such key")

// Check BatchHeader
assert.NotNil(t, batch.BatchHeader)
Expand Down
4 changes: 2 additions & 2 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func (s *SharedBlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadat
return &newMetadata, s.blobMetadataStore.UpdateBlobMetadata(ctx, existingMetadata.GetBlobKey(), &newMetadata)
}

func (s *SharedBlobStore) MarkBlobConfirming(ctx context.Context, metadataKey disperser.BlobKey) error {
return s.blobMetadataStore.SetBlobStatus(ctx, metadataKey, disperser.Confirming)
func (s *SharedBlobStore) MarkBlobDispersing(ctx context.Context, metadataKey disperser.BlobKey) error {
return s.blobMetadataStore.SetBlobStatus(ctx, metadataKey, disperser.Dispersing)
}

func (s *SharedBlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationInfo *disperser.ConfirmationInfo) (*disperser.BlobMetadata, error) {
Expand Down
4 changes: 2 additions & 2 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *dis
return &newMetadata, nil
}

func (q *BlobStore) MarkBlobConfirming(ctx context.Context, blobKey disperser.BlobKey) error {
func (q *BlobStore) MarkBlobDispersing(ctx context.Context, blobKey disperser.BlobKey) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
}
q.Metadata[blobKey].BlobStatus = disperser.Confirming
q.Metadata[blobKey].BlobStatus = disperser.Dispersing
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
Failed
Finalized
InsufficientSignatures
Confirming
Dispersing
)

var enumStrings = map[BlobStatus]string{
Expand All @@ -35,7 +35,7 @@ var enumStrings = map[BlobStatus]string{
Failed: "Failed",
Finalized: "Finalized",
InsufficientSignatures: "InsufficientSignatures",
Confirming: "Confirming",
Dispersing: "Dispersing",
}

func (bs BlobStatus) String() string {
Expand Down Expand Up @@ -142,8 +142,8 @@ type BlobStore interface {
// MarkBlobConfirmed updates blob metadata to Confirmed status with confirmation info
// Returns the updated metadata and error
MarkBlobConfirmed(ctx context.Context, existingMetadata *BlobMetadata, confirmationInfo *ConfirmationInfo) (*BlobMetadata, error)
// MarkBlobConfirming updates blob metadata to Confirming status
MarkBlobConfirming(ctx context.Context, blobKey BlobKey) error
// MarkBlobDispersing updates blob metadata to Dispersing status
MarkBlobDispersing(ctx context.Context, blobKey BlobKey) error
// MarkBlobInsufficientSignatures updates blob metadata to InsufficientSignatures status with confirmation info
// Returns the updated metadata and error
MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *BlobMetadata, confirmationInfo *ConfirmationInfo) (*BlobMetadata, error)
Expand Down Expand Up @@ -209,8 +209,8 @@ func FromBlobStatusProto(status disperser_rpc.BlobStatus) (*BlobStatus, error) {
case disperser_rpc.BlobStatus_INSUFFICIENT_SIGNATURES:
res = InsufficientSignatures
return &res, nil
case disperser_rpc.BlobStatus_CONFIRMING:
res = Confirming
case disperser_rpc.BlobStatus_DISPERSING:
res = Dispersing
return &res, nil
}

Expand Down

0 comments on commit 36307ab

Please sign in to comment.