Skip to content

Commit

Permalink
Enable shadow PutItem blob metadata requests to separate dynamo table…
Browse files Browse the repository at this point in the history
… for minibatch preprod testing (#669)
  • Loading branch information
pschork authored Jul 30, 2024
1 parent b2c0bc3 commit 7fdcf5a
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 44 deletions.
15 changes: 8 additions & 7 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ var (
queue disperser.BlobStore
dispersalServer *apiserver.DispersalServer

dockertestPool *dockertest.Pool
dockertestResource *dockertest.Resource
UUID = uuid.New()
metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID)
bucketTableName = fmt.Sprintf("test-BucketStore-%v", UUID)
dockertestPool *dockertest.Pool
dockertestResource *dockertest.Resource
UUID = uuid.New()
metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID)
shadowMetadataTableName = fmt.Sprintf("test-BlobMetadata-Shadow-%v", UUID)
bucketTableName = fmt.Sprintf("test-BucketStore-%v", UUID)

deployLocalStack bool
localStackPort = "4568"
Expand Down Expand Up @@ -585,7 +586,7 @@ func setup() {

}

err = deploy.DeployResources(dockertestPool, localStackPort, metadataTableName, bucketTableName)
err = deploy.DeployResources(dockertestPool, localStackPort, metadataTableName, shadowMetadataTableName, bucketTableName)
if err != nil {
teardown()
panic("failed to deploy AWS resources")
Expand Down Expand Up @@ -631,7 +632,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
if err != nil {
panic("failed to create dynamoDB client")
}
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour)
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, shadowMetadataTableName, time.Hour)

globalParams := common.GlobalRateParams{
CountFailed: false,
Expand Down
6 changes: 4 additions & 2 deletions disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
RateConfig apiserver.RateConfig
EnableRatelimiter bool
BucketTableName string
ShadowTableName string
BucketStoreSize int
EthClientConfig geth.EthClientConfig

Expand Down Expand Up @@ -53,8 +54,9 @@ func NewConfig(ctx *cli.Context) (Config, error) {
GrpcTimeout: ctx.GlobalDuration(flags.GrpcTimeoutFlag.Name),
},
BlobstoreConfig: blobstore.Config{
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
TableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name),
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
TableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name),
ShadowTableName: ctx.GlobalString(flags.ShadowTableNameFlag.Name),
},
LoggerConfig: *loggerConfig,
MetricsConfig: disperser.MetricsConfig{
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/apiserver/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMODB_TABLE_NAME"),
}
ShadowTableNameFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "shadow-table-name"),
Usage: "Name of the dynamodb table to shadow write blob metadata",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "SHADOW_TABLE_NAME"),
Value: "",
}
GrpcPortFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "grpc-port"),
Usage: "Port at which disperser listens for grpc calls",
Expand Down Expand Up @@ -104,6 +111,7 @@ var optionalFlags = []cli.Flag{
EnableRatelimiter,
BucketStoreSize,
GrpcTimeoutFlag,
ShadowTableNameFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func RunDisperserServer(ctx *cli.Context) error {

bucketName := config.BlobstoreConfig.BucketName
logger.Info("Creating blob store", "bucket", bucketName)
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger)

reg := prometheus.NewRegistry()
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func RunBatcher(ctx *cli.Context) error {
if err != nil || storeDurationBlocks == 0 {
return fmt.Errorf("failed to get STORE_DURATION_BLOCKS: %w", err)
}
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
queue := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger)

cs := coreeth.NewChainState(tx, client)
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func RunDataApi(ctx *cli.Context) error {

var (
promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, 0)
sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger)
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr)
subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger)
Expand Down
30 changes: 21 additions & 9 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,24 @@ const (
// - StatusIndex: (Partition Key: Status, Sort Key: RequestedAt) -> Metadata
// - BatchIndex: (Partition Key: BatchHeaderHash, Sort Key: BlobIndex) -> Metadata
type BlobMetadataStore struct {
dynamoDBClient *commondynamodb.Client
logger logging.Logger
tableName string
ttl time.Duration
dynamoDBClient *commondynamodb.Client
logger logging.Logger
tableName string
shadowTableName string
ttl time.Duration
}

func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore {
func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, shadowTableName string, ttl time.Duration) *BlobMetadataStore {
logger.Debugf("creating blob metadata store with table %s with TTL: %s", tableName, ttl)
if shadowTableName != "" {
logger.Debugf("shadow blob metadata will be written to table %s with TTL: %s", shadowTableName, ttl)
}
return &BlobMetadataStore{
dynamoDBClient: dynamoDBClient,
logger: logger.With("component", "BlobMetadataStore"),
tableName: tableName,
ttl: ttl,
dynamoDBClient: dynamoDBClient,
logger: logger.With("component", "BlobMetadataStore"),
tableName: tableName,
shadowTableName: shadowTableName,
ttl: ttl,
}
}

Expand All @@ -49,6 +54,13 @@ func (s *BlobMetadataStore) QueueNewBlobMetadata(ctx context.Context, blobMetada
return err
}

if s.shadowTableName != "" && s.shadowTableName != s.tableName {
err = s.dynamoDBClient.PutItem(ctx, s.shadowTableName, item)
if err != nil {
s.logger.Error("failed to put item into shadow table %s : %v", s.shadowTableName, err)
}
}

return s.dynamoDBClient.PutItem(ctx, s.tableName, item)
}

Expand Down
53 changes: 53 additions & 0 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -197,6 +198,58 @@ func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) {
assert.Nil(t, lastEvaluatedKey)
}

func TestShadowWriteBlobMetadata(t *testing.T) {
ctx := context.Background()

blobKey := disperser.BlobKey{
BlobHash: "shadowblob",
MetadataHash: "shadowhash",
}
metadata := &disperser.BlobMetadata{
MetadataHash: blobKey.MetadataHash,
BlobHash: blobKey.BlobHash,
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
RequestedAt: 123,
},
ConfirmationInfo: &disperser.ConfirmationInfo{},
}

err := shadowBlobMetadataStore.QueueNewBlobMetadata(ctx, metadata)
assert.NoError(t, err)
assert.NoError(t, err)
err = blobMetadataStore.SetBlobStatus(context.Background(), blobKey, disperser.Dispersing)
assert.NoError(t, err)
primaryMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Dispersing, primaryMetadata.BlobStatus)

// Check that the shadow metadata exists but status has NOT been updated
shadowMetadataItem, err := dynamoClient.GetItem(ctx, shadowMetadataTableName, map[string]types.AttributeValue{
"MetadataHash": &types.AttributeValueMemberS{
Value: blobKey.MetadataHash,
},
"BlobHash": &types.AttributeValueMemberS{
Value: blobKey.BlobHash,
},
})
assert.NoError(t, err)
shadowMetadata := disperser.BlobMetadata{}
err = attributevalue.UnmarshalMap(shadowMetadataItem, &shadowMetadata)
assert.NoError(t, err)
assert.Equal(t, disperser.Processing, shadowMetadata.BlobStatus)
deleteItems(t, []commondynamodb.Key{
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash},
},
})
}

func deleteItems(t *testing.T, keys []commondynamodb.Key) {
_, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys)
assert.NoError(t, err)
Expand Down
25 changes: 18 additions & 7 deletions disperser/common/blobstore/blobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ var (
deployLocalStack bool
localStackPort = "4569"

dynamoClient *dynamodb.Client
blobMetadataStore *blobstore.BlobMetadataStore
sharedStorage *blobstore.SharedBlobStore

UUID = uuid.New()
metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID)
dynamoClient *dynamodb.Client
blobMetadataStore *blobstore.BlobMetadataStore
shadowBlobMetadataStore *blobstore.BlobMetadataStore
sharedStorage *blobstore.SharedBlobStore

UUID = uuid.New()
metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID)
shadowMetadataTableName = fmt.Sprintf("test-BlobMetadata-Shadow-%v", UUID)
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -90,13 +92,22 @@ func setup(m *testing.M) {
panic("failed to create dynamodb table: " + err.Error())
}

if shadowMetadataTableName != "" {
_, err = test_utils.CreateTable(context.Background(), cfg, shadowMetadataTableName, blobstore.GenerateTableSchema(shadowMetadataTableName, 10, 10))
if err != nil {
teardown()
panic("failed to create shadow dynamodb table: " + err.Error())
}
}

dynamoClient, err = dynamodb.NewClient(cfg, logger)
if err != nil {
teardown()
panic("failed to create dynamodb client: " + err.Error())
}

blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, metadataTableName, time.Hour)
shadowBlobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, shadowMetadataTableName, time.Hour)
sharedStorage = blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger)
}

Expand Down
5 changes: 3 additions & 2 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type SharedBlobStore struct {
}

type Config struct {
BucketName string
TableName string
BucketName string
TableName string
ShadowTableName string
}

// This represents the s3 fetch result for a blob.
Expand Down
7 changes: 4 additions & 3 deletions inabox/deploy/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ var (
localstackFlagName = "localstack-port"
deployResourcesFlagName = "deploy-resources"

metadataTableName = "test-BlobMetadata"
bucketTableName = "test-BucketStore"
metadataTableName = "test-BlobMetadata"
shadowMetadataTableName = "" // not used
bucketTableName = "test-BucketStore"

chainCmdName = "chain"
localstackCmdName = "localstack"
Expand Down Expand Up @@ -137,7 +138,7 @@ func localstack(ctx *cli.Context) error {
}

if ctx.Bool(deployResourcesFlagName) {
return deploy.DeployResources(pool, ctx.String(localstackFlagName), metadataTableName, bucketTableName)
return deploy.DeployResources(pool, ctx.String(localstackFlagName), metadataTableName, shadowMetadataTableName, bucketTableName)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion inabox/deploy/localstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func StartDockertestWithLocalstackContainer(localStackPort string) (*dockertest.
return pool, resource, nil
}

func DeployResources(pool *dockertest.Pool, localStackPort, metadataTableName, bucketTableName string) error {
func DeployResources(pool *dockertest.Pool, localStackPort, metadataTableName, shadowTableName, bucketTableName string) error {

if pool == nil {
var err error
Expand Down
21 changes: 11 additions & 10 deletions inabox/tests/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ var (
dockertestResource *dockertest.Resource
localStackPort string

metadataTableName = "test-BlobMetadata"
bucketTableName = "test-BucketStore"
logger logging.Logger
ethClient common.EthClient
rpcClient common.RPCEthClient
mockRollup *rollupbindings.ContractMockRollup
retrievalClient clients.RetrievalClient
numConfirmations int = 3
numRetries = 0
metadataTableName = "test-BlobMetadata"
shadowMetadataTableName = ""
bucketTableName = "test-BucketStore"
logger logging.Logger
ethClient common.EthClient
rpcClient common.RPCEthClient
mockRollup *rollupbindings.ContractMockRollup
retrievalClient clients.RetrievalClient
numConfirmations int = 3
numRetries = 0

cancel context.CancelFunc
)
Expand Down Expand Up @@ -91,7 +92,7 @@ var _ = BeforeSuite(func() {
dockertestPool = pool
dockertestResource = resource

err = deploy.DeployResources(pool, localStackPort, metadataTableName, bucketTableName)
err = deploy.DeployResources(pool, localStackPort, metadataTableName, shadowMetadataTableName, bucketTableName)
Expect(err).To(BeNil())

} else {
Expand Down

0 comments on commit 7fdcf5a

Please sign in to comment.