diff --git a/api/errors.go b/api/errors.go index 42632da105..de026f1655 100644 --- a/api/errors.go +++ b/api/errors.go @@ -62,6 +62,10 @@ func NewErrorCanceled(msg string) error { return newErrorGRPC(codes.Canceled, msg) } +func NewErrorAlreadyExists(msg string) error { + return newErrorGRPC(codes.AlreadyExists, msg) +} + // ErrorFailover is returned by the disperser-client and eigenda-client to signify // that eigenda is temporarily unavailable, and suggest to the caller // (most likely some rollup batcher via the eigenda-proxy) to failover diff --git a/disperser/apiserver/disperse_blob_v2.go b/disperser/apiserver/disperse_blob_v2.go index bffb62ad1b..c53c1bc66e 100644 --- a/disperser/apiserver/disperse_blob_v2.go +++ b/disperser/apiserver/disperse_blob_v2.go @@ -2,12 +2,14 @@ package apiserver import ( "context" + "errors" "fmt" "time" "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/disperser/common" dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/rs" @@ -60,6 +62,11 @@ func (s *DispersalServerV2) StoreBlob(ctx context.Context, data []byte, blobHead } if err := s.blobStore.StoreBlob(ctx, blobKey, data); err != nil { + s.logger.Warn("failed to store blob", "err", err, "blobKey", blobKey.Hex()) + if errors.Is(err, common.ErrAlreadyExists) { + return corev2.BlobKey{}, api.NewErrorAlreadyExists(fmt.Sprintf("blob already exists: %s", blobKey.Hex())) + } + return corev2.BlobKey{}, api.NewErrorInternal(fmt.Sprintf("failed to store blob: %v", err)) } @@ -73,6 +80,14 @@ func (s *DispersalServerV2) StoreBlob(ctx context.Context, data []byte, blobHead UpdatedAt: uint64(requestedAt.UnixNano()), } err = s.blobMetadataStore.PutBlobMetadata(ctx, blobMetadata) + if err != nil { + s.logger.Warn("failed to store blob metadata", "err", err, "blobKey", blobKey.Hex()) + if errors.Is(err, common.ErrAlreadyExists) { + return corev2.BlobKey{}, api.NewErrorAlreadyExists(fmt.Sprintf("blob metadata already exists: %s", blobKey.Hex())) + } + + return corev2.BlobKey{}, api.NewErrorInternal(fmt.Sprintf("failed to store blob metadata: %v", err)) + } return blobKey, err } diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index a5decc958b..c26260ec66 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -106,6 +106,14 @@ func TestV2DisperseBlob(t *testing.T) { assert.Greater(t, blobMetadata.Expiry, uint64(now.Unix())) assert.Greater(t, blobMetadata.RequestedAt, uint64(now.UnixNano())) assert.Equal(t, blobMetadata.RequestedAt, blobMetadata.UpdatedAt) + + // Try dispersing the same blob + reply, err = c.DispersalServerV2.DisperseBlob(ctx, &pbv2.DisperseBlobRequest{ + Data: data, + BlobHeader: blobHeaderProto, + }) + assert.Nil(t, reply) + assert.ErrorContains(t, err, "blob already exists") } func TestV2DisperseBlobRequestValidation(t *testing.T) { diff --git a/disperser/common/v2/blobstore/s3_blob_store.go b/disperser/common/v2/blobstore/s3_blob_store.go index 5bcfbf1176..433af0a765 100644 --- a/disperser/common/v2/blobstore/s3_blob_store.go +++ b/disperser/common/v2/blobstore/s3_blob_store.go @@ -26,7 +26,13 @@ func NewBlobStore(s3BucketName string, s3Client s3.Client, logger logging.Logger // StoreBlob adds a blob to the blob store func (b *BlobStore) StoreBlob(ctx context.Context, key corev2.BlobKey, data []byte) error { - err := b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data) + _, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key)) + if err == nil { + b.logger.Warnf("blob already exists in bucket %s: %s", b.bucketName, key) + return common.ErrAlreadyExists + } + + err = b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data) if err != nil { b.logger.Errorf("failed to upload blob in bucket %s: %v", b.bucketName, err) return err