Skip to content

Commit

Permalink
update confirmation block number even when not finalized
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jun 7, 2024
1 parent 2814241 commit e9cdadb
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 24 deletions.
12 changes: 11 additions & 1 deletion disperser/batcher/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,22 @@ func (f *finalizer) updateBlobs(ctx context.Context, metadatas []*disperser.Blob
continue
}

if confirmationBlockNumber != uint64(confirmationMetadata.ConfirmationInfo.ConfirmationBlockNumber) {
// Confirmation block number has changed due to reorg. Update the confirmation block number in the metadata
err := f.blobStore.UpdateConfirmationBlockNumber(ctx, m, uint32(confirmationBlockNumber))
if err != nil {
f.logger.Error("error updating confirmation block number", "blobKey", blobKey.String(), "err", err)
f.metrics.IncrementNumBlobs("failed")
continue
}
}

// Leave as confirmed if the reorged confirmation block is after the latest finalized block (not yet finalized)
if uint64(confirmationBlockNumber) > lastFinalBlock {
continue
}

_, err = f.blobStore.MarkBlobFinalized(ctx, confirmationMetadata, confirmationBlockNumber)
err = f.blobStore.MarkBlobFinalized(ctx, blobKey)
if err != nil {
f.logger.Error("error marking blob as finalized", "blobKey", blobKey.String(), "err", err)
f.metrics.IncrementNumBlobs("failed")
Expand Down
24 changes: 24 additions & 0 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,30 @@ func (s *BlobMetadataStore) IncrementNumRetries(ctx context.Context, existingMet
return err
}

func (s *BlobMetadataStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint32) error {
updated := *existingMetadata
if updated.ConfirmationInfo == nil {
return fmt.Errorf("failed to update confirmation block number because confirmation info is missing for metadata %s", existingMetadata.GetBlobKey().String())
}

updated.ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber
item, err := MarshalBlobMetadata(&updated)
if err != nil {
return err
}

_, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{
"BlobHash": &types.AttributeValueMemberS{
Value: existingMetadata.BlobHash,
},
"MetadataHash": &types.AttributeValueMemberS{
Value: existingMetadata.MetadataHash,
},
}, item)

return err
}

func (s *BlobMetadataStore) UpdateBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey, updated *disperser.BlobMetadata) error {
item, err := MarshalBlobMetadata(updated)
if err != nil {
Expand Down
17 changes: 6 additions & 11 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,8 @@ func (s *SharedBlobStore) MarkBlobInsufficientSignatures(ctx context.Context, ex
return &newMetadata, s.blobMetadataStore.UpdateBlobMetadata(ctx, existingMetadata.GetBlobKey(), &newMetadata)
}

func (s *SharedBlobStore) MarkBlobFinalized(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint64) (*disperser.BlobMetadata, error) {
if existingMetadata == nil {
return nil, errors.New("metadata is nil")
}
newMetadata := *existingMetadata
newMetadata.BlobStatus = disperser.Finalized
if confirmationBlockNumber > 0 {
newMetadata.ConfirmationInfo.ConfirmationBlockNumber = uint32(confirmationBlockNumber)
}

return &newMetadata, s.blobMetadataStore.UpdateBlobMetadata(ctx, existingMetadata.GetBlobKey(), &newMetadata)
func (s *SharedBlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.BlobKey) error {
return s.blobMetadataStore.SetBlobStatus(ctx, blobKey, disperser.Finalized)
}

func (s *SharedBlobStore) MarkBlobProcessing(ctx context.Context, metadataKey disperser.BlobKey) error {
Expand All @@ -199,6 +190,10 @@ func (s *SharedBlobStore) IncrementBlobRetryCount(ctx context.Context, existingM
return s.blobMetadataStore.IncrementNumRetries(ctx, existingMetadata)
}

func (s *SharedBlobStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint32) error {
return s.blobMetadataStore.UpdateConfirmationBlockNumber(ctx, existingMetadata, confirmationBlockNumber)
}

func (s *SharedBlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperser.BlobMetadata) (map[disperser.BlobKey]*core.Blob, error) {
pool := workerpool.New(maxS3BlobFetchWorkers)
resultChan := make(chan blobResultOrError, len(metadata))
Expand Down
13 changes: 10 additions & 3 deletions disperser/common/blobstore/shared_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,17 @@ func TestSharedBlobStore(t *testing.T) {
assert.Nil(t, err)
assertMetadata(t, blobKey, blobSize, requestedAt, disperser.Confirmed, metadata1)

updatedMetadata, err = sharedStorage.MarkBlobFinalized(ctx, metadata1, 151)
err = sharedStorage.UpdateConfirmationBlockNumber(ctx, metadata1, 151)
assert.Nil(t, err)
assert.Equal(t, disperser.Finalized, updatedMetadata.BlobStatus)
assert.Equal(t, uint32(151), updatedMetadata.ConfirmationInfo.ConfirmationBlockNumber)
metadata1, err = sharedStorage.GetBlobMetadata(ctx, blobKey)
assert.Nil(t, err)
assert.Equal(t, uint32(151), metadata1.ConfirmationInfo.ConfirmationBlockNumber)

err = sharedStorage.MarkBlobFinalized(ctx, blobKey)
assert.Nil(t, err)
metadata1, err = sharedStorage.GetBlobMetadata(ctx, blobKey)
assert.Nil(t, err)
assert.Equal(t, disperser.Finalized, metadata1.BlobStatus)

metadata1, err = sharedStorage.GetBlobMetadata(ctx, blobKey)
assert.Nil(t, err)
Expand Down
28 changes: 20 additions & 8 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -123,19 +124,15 @@ func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existing
return &newMetadata, nil
}

func (q *BlobStore) MarkBlobFinalized(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint64) (*disperser.BlobMetadata, error) {
func (q *BlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.BlobKey) error {
q.mu.Lock()
defer q.mu.Unlock()
blobKey := existingMetadata.GetBlobKey()
if _, ok := q.Metadata[blobKey]; !ok {
return nil, disperser.ErrBlobNotFound
return disperser.ErrBlobNotFound
}

newMetadata := *existingMetadata
newMetadata.BlobStatus = disperser.Finalized
newMetadata.ConfirmationInfo.ConfirmationBlockNumber = uint32(confirmationBlockNumber)
q.Metadata[blobKey] = &newMetadata
return &newMetadata, nil
q.Metadata[blobKey].BlobStatus = disperser.Finalized
return nil
}

func (q *BlobStore) MarkBlobProcessing(ctx context.Context, blobKey disperser.BlobKey) error {
Expand Down Expand Up @@ -171,6 +168,21 @@ func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadat
return nil
}

func (q *BlobStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationBlockNumber uint32) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok {
return disperser.ErrBlobNotFound
}

if q.Metadata[existingMetadata.GetBlobKey()].ConfirmationInfo == nil {
return fmt.Errorf("cannot update confirmation block number for blob without confirmation info: %s", existingMetadata.GetBlobKey().String())
}

q.Metadata[existingMetadata.GetBlobKey()].ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber
return nil
}

func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperser.BlobMetadata) (map[disperser.BlobKey]*core.Blob, error) {
q.mu.RLock()
defer q.mu.RUnlock()
Expand Down
5 changes: 5 additions & 0 deletions disperser/common/inmem/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ func TestBlobStore(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, disperser.Confirmed, updated.BlobStatus)

err = bs.UpdateConfirmationBlockNumber(ctx, updated, 151)
assert.Nil(t, err)

meta2, err = bs.GetBlobMetadata(ctx, blobKey2)
assert.Nil(t, err)
assert.Equal(t, meta2.BlobStatus, disperser.Confirmed)
assert.Equal(t, uint32(151), meta2.ConfirmationInfo.ConfirmationBlockNumber)

meta1, err = bs.GetBlobMetadata(ctx, blobKey1)
assert.Nil(t, err)
assert.Equal(t, meta1.BlobStatus, disperser.Processing)
Expand Down
4 changes: 3 additions & 1 deletion disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ type BlobStore interface {
// Returns the updated metadata and error
MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *BlobMetadata, confirmationInfo *ConfirmationInfo) (*BlobMetadata, error)
// MarkBlobFinalized marks a blob as finalized
MarkBlobFinalized(ctx context.Context, existingMetadata *BlobMetadata, confirmationBlockNumber uint64) (*BlobMetadata, error)
MarkBlobFinalized(ctx context.Context, blobKey BlobKey) error
// MarkBlobProcessing marks a blob as processing
MarkBlobProcessing(ctx context.Context, blobKey BlobKey) error
// MarkBlobFailed marks a blob as failed
MarkBlobFailed(ctx context.Context, blobKey BlobKey) error
// IncrementBlobRetryCount increments the retry count of a blob
IncrementBlobRetryCount(ctx context.Context, existingMetadata *BlobMetadata) error
// UpdateConfirmationBlockNumber updates the confirmation block number of a blob
UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *BlobMetadata, confirmationBlockNumber uint32) error
// GetBlobsByMetadata retrieves a list of blobs given a list of metadata
GetBlobsByMetadata(ctx context.Context, metadata []*BlobMetadata) (map[BlobKey]*core.Blob, error)
// GetBlobMetadataByStatus returns a list of blob metadata for blobs with the given status
Expand Down

0 comments on commit e9cdadb

Please sign in to comment.