Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AlreadyExists error from disperser #1026

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func NewErrorCanceled(msg string) error {
return newErrorGRPC(codes.Canceled, msg)
}

func NewErrorAlreadyExists(msg string) error {
return newErrorGRPC(codes.AlreadyExists, msg)
}

// ErrorFailover is returned by the disperser-client and eigenda-client to signify
// that eigenda is temporarily unavailable, and suggest to the caller
// (most likely some rollup batcher via the eigenda-proxy) to failover
Expand Down
15 changes: 15 additions & 0 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package apiserver

import (
"context"
"errors"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser/common"
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/rs"
Expand Down Expand Up @@ -60,6 +62,11 @@ func (s *DispersalServerV2) StoreBlob(ctx context.Context, data []byte, blobHead
}

if err := s.blobStore.StoreBlob(ctx, blobKey, data); err != nil {
s.logger.Warn("failed to store blob", "err", err, "blobKey", blobKey.Hex())
if errors.Is(err, common.ErrAlreadyExists) {
return corev2.BlobKey{}, api.NewErrorAlreadyExists(fmt.Sprintf("blob already exists: %s", blobKey.Hex()))
}

return corev2.BlobKey{}, api.NewErrorInternal(fmt.Sprintf("failed to store blob: %v", err))
}

Expand All @@ -73,6 +80,14 @@ func (s *DispersalServerV2) StoreBlob(ctx context.Context, data []byte, blobHead
UpdatedAt: uint64(requestedAt.UnixNano()),
}
err = s.blobMetadataStore.PutBlobMetadata(ctx, blobMetadata)
if err != nil {
s.logger.Warn("failed to store blob metadata", "err", err, "blobKey", blobKey.Hex())
if errors.Is(err, common.ErrAlreadyExists) {
return corev2.BlobKey{}, api.NewErrorAlreadyExists(fmt.Sprintf("blob metadata already exists: %s", blobKey.Hex()))
}

return corev2.BlobKey{}, api.NewErrorInternal(fmt.Sprintf("failed to store blob metadata: %v", err))
}
return blobKey, err
}

Expand Down
8 changes: 8 additions & 0 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func TestV2DisperseBlob(t *testing.T) {
assert.Greater(t, blobMetadata.Expiry, uint64(now.Unix()))
assert.Greater(t, blobMetadata.RequestedAt, uint64(now.UnixNano()))
assert.Equal(t, blobMetadata.RequestedAt, blobMetadata.UpdatedAt)

// Try dispersing the same blob
reply, err = c.DispersalServerV2.DisperseBlob(ctx, &pbv2.DisperseBlobRequest{
Data: data,
BlobHeader: blobHeaderProto,
})
assert.Nil(t, reply)
assert.ErrorContains(t, err, "blob already exists")
}

func TestV2DisperseBlobRequestValidation(t *testing.T) {
Expand Down
8 changes: 7 additions & 1 deletion disperser/common/v2/blobstore/s3_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ func NewBlobStore(s3BucketName string, s3Client s3.Client, logger logging.Logger

// StoreBlob adds a blob to the blob store
func (b *BlobStore) StoreBlob(ctx context.Context, key corev2.BlobKey, data []byte) error {
err := b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data)
_, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))
if err == nil {
b.logger.Warnf("blob already exists in bucket %s: %s", b.bucketName, key)
return common.ErrAlreadyExists
}

err = b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data)
if err != nil {
b.logger.Errorf("failed to upload blob in bucket %s: %v", b.bucketName, err)
return err
Expand Down
Loading