diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index b4c90c93de..734d93d8f8 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -44,11 +44,12 @@ var ( queue disperser.BlobStore dispersalServer *apiserver.DispersalServer - dockertestPool *dockertest.Pool - dockertestResource *dockertest.Resource - UUID = uuid.New() - metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) - bucketTableName = fmt.Sprintf("test-BucketStore-%v", UUID) + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + UUID = uuid.New() + metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) + shadowMetadataTableName = fmt.Sprintf("test-BlobMetadata-Shadow-%v", UUID) + bucketTableName = fmt.Sprintf("test-BucketStore-%v", UUID) deployLocalStack bool localStackPort = "4568" @@ -585,7 +586,7 @@ func setup() { } - err = deploy.DeployResources(dockertestPool, localStackPort, metadataTableName, bucketTableName) + err = deploy.DeployResources(dockertestPool, localStackPort, metadataTableName, shadowMetadataTableName, bucketTableName) if err != nil { teardown() panic("failed to deploy AWS resources") @@ -631,7 +632,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { if err != nil { panic("failed to create dynamoDB client") } - blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour) + blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, shadowMetadataTableName, time.Hour) globalParams := common.GlobalRateParams{ CountFailed: false, diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index 03d592aac8..6ae168f458 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -22,6 +22,7 @@ type Config struct { RateConfig apiserver.RateConfig EnableRatelimiter bool BucketTableName string + ShadowTableName string BucketStoreSize int EthClientConfig geth.EthClientConfig @@ -53,8 +54,9 @@ func NewConfig(ctx *cli.Context) (Config, error) { GrpcTimeout: ctx.GlobalDuration(flags.GrpcTimeoutFlag.Name), }, BlobstoreConfig: blobstore.Config{ - BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), - TableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name), + BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), + TableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name), + ShadowTableName: ctx.GlobalString(flags.ShadowTableNameFlag.Name), }, LoggerConfig: *loggerConfig, MetricsConfig: disperser.MetricsConfig{ diff --git a/disperser/cmd/apiserver/flags/flags.go b/disperser/cmd/apiserver/flags/flags.go index ebc28b5f0d..36d856628b 100644 --- a/disperser/cmd/apiserver/flags/flags.go +++ b/disperser/cmd/apiserver/flags/flags.go @@ -30,6 +30,13 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMODB_TABLE_NAME"), } + ShadowTableNameFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "shadow-table-name"), + Usage: "Name of the dynamodb table to shadow write blob metadata", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "SHADOW_TABLE_NAME"), + Value: "", + } GrpcPortFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "grpc-port"), Usage: "Port at which disperser listens for grpc calls", @@ -104,6 +111,7 @@ var optionalFlags = []cli.Flag{ EnableRatelimiter, BucketStoreSize, GrpcTimeoutFlag, + ShadowTableNameFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index e6fa92df06..36bf71d7c6 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -89,7 +89,7 @@ func RunDisperserServer(ctx *cli.Context) error { bucketName := config.BlobstoreConfig.BucketName logger.Info("Creating blob store", "bucket", bucketName) - blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) + blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) reg := prometheus.NewRegistry() diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 3735940fe3..bdbb43aa52 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -192,7 +192,7 @@ func RunBatcher(ctx *cli.Context) error { if err != nil || storeDurationBlocks == 0 { return fmt.Errorf("failed to get STORE_DURATION_BLOCKS: %w", err) } - blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) + blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) queue := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) cs := coreeth.NewChainState(tx, client) diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index abddf5b1ea..28d475c8c6 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -89,7 +89,7 @@ func RunDataApi(ctx *cli.Context) error { var ( promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster) - blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0) + blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, 0) sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger) subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr) subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger) diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 7ebe9b0b15..4a1ea381d8 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -27,19 +27,24 @@ const ( // - StatusIndex: (Partition Key: Status, Sort Key: RequestedAt) -> Metadata // - BatchIndex: (Partition Key: BatchHeaderHash, Sort Key: BlobIndex) -> Metadata type BlobMetadataStore struct { - dynamoDBClient *commondynamodb.Client - logger logging.Logger - tableName string - ttl time.Duration + dynamoDBClient *commondynamodb.Client + logger logging.Logger + tableName string + shadowTableName string + ttl time.Duration } -func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { +func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, shadowTableName string, ttl time.Duration) *BlobMetadataStore { logger.Debugf("creating blob metadata store with table %s with TTL: %s", tableName, ttl) + if shadowTableName != "" { + logger.Debugf("shadow blob metadata will be written to table %s with TTL: %s", shadowTableName, ttl) + } return &BlobMetadataStore{ - dynamoDBClient: dynamoDBClient, - logger: logger.With("component", "BlobMetadataStore"), - tableName: tableName, - ttl: ttl, + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "BlobMetadataStore"), + tableName: tableName, + shadowTableName: shadowTableName, + ttl: ttl, } } @@ -49,6 +54,13 @@ func (s *BlobMetadataStore) QueueNewBlobMetadata(ctx context.Context, blobMetada return err } + if s.shadowTableName != "" && s.shadowTableName != s.tableName { + err = s.dynamoDBClient.PutItem(ctx, s.shadowTableName, item) + if err != nil { + s.logger.Error("failed to put item into shadow table %s : %v", s.shadowTableName, err) + } + } + return s.dynamoDBClient.PutItem(ctx, s.tableName, item) } diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index eca3e23f88..4624619460 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -9,6 +9,7 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/ethereum/go-ethereum/common" @@ -197,6 +198,58 @@ func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) { assert.Nil(t, lastEvaluatedKey) } +func TestShadowWriteBlobMetadata(t *testing.T) { + ctx := context.Background() + + blobKey := disperser.BlobKey{ + BlobHash: "shadowblob", + MetadataHash: "shadowhash", + } + metadata := &disperser.BlobMetadata{ + MetadataHash: blobKey.MetadataHash, + BlobHash: blobKey.BlobHash, + BlobStatus: disperser.Processing, + Expiry: 0, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + + err := shadowBlobMetadataStore.QueueNewBlobMetadata(ctx, metadata) + assert.NoError(t, err) + assert.NoError(t, err) + err = blobMetadataStore.SetBlobStatus(context.Background(), blobKey, disperser.Dispersing) + assert.NoError(t, err) + primaryMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, disperser.Dispersing, primaryMetadata.BlobStatus) + + // Check that the shadow metadata exists but status has NOT been updated + shadowMetadataItem, err := dynamoClient.GetItem(ctx, shadowMetadataTableName, map[string]types.AttributeValue{ + "MetadataHash": &types.AttributeValueMemberS{ + Value: blobKey.MetadataHash, + }, + "BlobHash": &types.AttributeValueMemberS{ + Value: blobKey.BlobHash, + }, + }) + assert.NoError(t, err) + shadowMetadata := disperser.BlobMetadata{} + err = attributevalue.UnmarshalMap(shadowMetadataItem, &shadowMetadata) + assert.NoError(t, err) + assert.Equal(t, disperser.Processing, shadowMetadata.BlobStatus) + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, + }, + }) +} + func deleteItems(t *testing.T, keys []commondynamodb.Key) { _, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) assert.NoError(t, err) diff --git a/disperser/common/blobstore/blobstore_test.go b/disperser/common/blobstore/blobstore_test.go index f07b3316df..58fb4ea4c6 100644 --- a/disperser/common/blobstore/blobstore_test.go +++ b/disperser/common/blobstore/blobstore_test.go @@ -45,12 +45,14 @@ var ( deployLocalStack bool localStackPort = "4569" - dynamoClient *dynamodb.Client - blobMetadataStore *blobstore.BlobMetadataStore - sharedStorage *blobstore.SharedBlobStore - - UUID = uuid.New() - metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) + dynamoClient *dynamodb.Client + blobMetadataStore *blobstore.BlobMetadataStore + shadowBlobMetadataStore *blobstore.BlobMetadataStore + sharedStorage *blobstore.SharedBlobStore + + UUID = uuid.New() + metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) + shadowMetadataTableName = fmt.Sprintf("test-BlobMetadata-Shadow-%v", UUID) ) func TestMain(m *testing.M) { @@ -90,13 +92,22 @@ func setup(m *testing.M) { panic("failed to create dynamodb table: " + err.Error()) } + if shadowMetadataTableName != "" { + _, err = test_utils.CreateTable(context.Background(), cfg, shadowMetadataTableName, blobstore.GenerateTableSchema(shadowMetadataTableName, 10, 10)) + if err != nil { + teardown() + panic("failed to create shadow dynamodb table: " + err.Error()) + } + } + dynamoClient, err = dynamodb.NewClient(cfg, logger) if err != nil { teardown() panic("failed to create dynamodb client: " + err.Error()) } - blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour) + blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, metadataTableName, time.Hour) + shadowBlobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, shadowMetadataTableName, time.Hour) sharedStorage = blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) } diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index ab2d0acd21..1e8b7a45f2 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -46,8 +46,9 @@ type SharedBlobStore struct { } type Config struct { - BucketName string - TableName string + BucketName string + TableName string + ShadowTableName string } // This represents the s3 fetch result for a blob. diff --git a/inabox/deploy/cmd/main.go b/inabox/deploy/cmd/main.go index 2f519b1b91..4a50e3da84 100644 --- a/inabox/deploy/cmd/main.go +++ b/inabox/deploy/cmd/main.go @@ -16,8 +16,9 @@ var ( localstackFlagName = "localstack-port" deployResourcesFlagName = "deploy-resources" - metadataTableName = "test-BlobMetadata" - bucketTableName = "test-BucketStore" + metadataTableName = "test-BlobMetadata" + shadowMetadataTableName = "" // not used + bucketTableName = "test-BucketStore" chainCmdName = "chain" localstackCmdName = "localstack" @@ -137,7 +138,7 @@ func localstack(ctx *cli.Context) error { } if ctx.Bool(deployResourcesFlagName) { - return deploy.DeployResources(pool, ctx.String(localstackFlagName), metadataTableName, bucketTableName) + return deploy.DeployResources(pool, ctx.String(localstackFlagName), metadataTableName, shadowMetadataTableName, bucketTableName) } return nil diff --git a/inabox/deploy/localstack.go b/inabox/deploy/localstack.go index 020f807b65..ca3277c4ad 100644 --- a/inabox/deploy/localstack.go +++ b/inabox/deploy/localstack.go @@ -86,7 +86,7 @@ func StartDockertestWithLocalstackContainer(localStackPort string) (*dockertest. return pool, resource, nil } -func DeployResources(pool *dockertest.Pool, localStackPort, metadataTableName, bucketTableName string) error { +func DeployResources(pool *dockertest.Pool, localStackPort, metadataTableName, shadowTableName, bucketTableName string) error { if pool == nil { var err error diff --git a/inabox/tests/integration_suite_test.go b/inabox/tests/integration_suite_test.go index c3ec09703f..692d582c32 100644 --- a/inabox/tests/integration_suite_test.go +++ b/inabox/tests/integration_suite_test.go @@ -39,15 +39,16 @@ var ( dockertestResource *dockertest.Resource localStackPort string - metadataTableName = "test-BlobMetadata" - bucketTableName = "test-BucketStore" - logger logging.Logger - ethClient common.EthClient - rpcClient common.RPCEthClient - mockRollup *rollupbindings.ContractMockRollup - retrievalClient clients.RetrievalClient - numConfirmations int = 3 - numRetries = 0 + metadataTableName = "test-BlobMetadata" + shadowMetadataTableName = "" + bucketTableName = "test-BucketStore" + logger logging.Logger + ethClient common.EthClient + rpcClient common.RPCEthClient + mockRollup *rollupbindings.ContractMockRollup + retrievalClient clients.RetrievalClient + numConfirmations int = 3 + numRetries = 0 cancel context.CancelFunc ) @@ -91,7 +92,7 @@ var _ = BeforeSuite(func() { dockertestPool = pool dockertestResource = resource - err = deploy.DeployResources(pool, localStackPort, metadataTableName, bucketTableName) + err = deploy.DeployResources(pool, localStackPort, metadataTableName, shadowMetadataTableName, bucketTableName) Expect(err).To(BeNil()) } else {