Skip to content

Commit

Permalink
wip adds v2 metadatastore flags to dataapi
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Nov 20, 2024
1 parent afd5894 commit 6a2ba50
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 286 deletions.
21 changes: 14 additions & 7 deletions disperser/cmd/dataapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
)

type Config struct {
AwsClientConfig aws.ClientConfig
BlobstoreConfig blobstore.Config
EthClientConfig geth.EthClientConfig
LoggerConfig common.LoggerConfig
PrometheusConfig prometheus.Config
MetricsConfig dataapi.MetricsConfig
ChainStateConfig thegraph.Config
AwsClientConfig aws.ClientConfig
BlobstoreConfig blobstore.Config
BlobstoreConfigV2 blobstore.Config
EthClientConfig geth.EthClientConfig
LoggerConfig common.LoggerConfig
PrometheusConfig prometheus.Config
MetricsConfig dataapi.MetricsConfig
ChainStateConfig thegraph.Config

SocketAddr string
PrometheusApiAddr string
Expand All @@ -34,6 +35,7 @@ type Config struct {
DisperserHostname string
ChurnerHostname string
BatcherHealthEndpt string
MetadataStoreV2 string
}

func NewConfig(ctx *cli.Context) (Config, error) {
Expand All @@ -47,6 +49,10 @@ func NewConfig(ctx *cli.Context) (Config, error) {
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
TableName: ctx.GlobalString(flags.DynamoTableNameFlag.Name),
},
BlobstoreConfigV2: blobstore.Config{
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
TableName: ctx.GlobalString(flags.DynamoMetadataStoreV2Flag.Name),
},
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
EthClientConfig: ethClientConfig,
LoggerConfig: *loggerConfig,
Expand All @@ -72,6 +78,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name),
BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name),
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
MetadataStoreV2: ctx.GlobalString(flags.DynamoMetadataStoreV2Flag.Name),
}
return config, nil
}
7 changes: 7 additions & 0 deletions disperser/cmd/dataapi/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMO_TABLE_NAME"),
}
DynamoMetadataStoreV2Flag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "dynamo-metadata-store-v2"),
Usage: "Name of the dynamo table to store v2 blob metadata",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMO_METADATA_STORE_V2"),
}
S3BucketNameFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "s3-bucket-name"),
Usage: "Name of the bucket to store blobs",
Expand Down Expand Up @@ -136,6 +142,7 @@ var (

var requiredFlags = []cli.Flag{
DynamoTableNameFlag,
DynamoMetadataStoreV2Flag,
SocketAddrFlag,
S3BucketNameFlag,
SubgraphApiBatchMetadataAddrFlag,
Expand Down
22 changes: 15 additions & 7 deletions disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/disperser/cmd/dataapi/flags"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
v2blobstore "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/dataapi"
"github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus"
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
Expand Down Expand Up @@ -91,13 +92,19 @@ func RunDataApi(ctx *cli.Context) error {
var (
promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0)
sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger)
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr)
subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger)
chainState = coreeth.NewChainState(tx, client)
indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger)
metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger)
server = dataapi.NewServer(
sharedStorage = blobstore.NewSharedStorage(
config.BlobstoreConfig.BucketName,
s3Client,
blobMetadataStore,
logger,
)
blobMetadataStoreV2 = v2blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfigV2.TableName)
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr)
subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger)
chainState = coreeth.NewChainState(tx, client)
indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger)
metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger)
server = dataapi.NewServer(
dataapi.Config{
ServerMode: config.ServerMode,
SocketAddr: config.SocketAddr,
Expand All @@ -107,6 +114,7 @@ func RunDataApi(ctx *cli.Context) error {
BatcherHealthEndpt: config.BatcherHealthEndpt,
},
sharedStorage,
blobMetadataStoreV2,
promClient,
subgraphClient,
tx,
Expand Down
10 changes: 10 additions & 0 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func NewBlobMetadataStore(dynamoDBClient commondynamodb.Client, logger logging.L
}
}

func NewBlobMetadataStoreV2(dynamoDBClient commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore {
logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl)
return &BlobMetadataStore{
dynamoDBClient: dynamoDBClient,
logger: logger.With("component", "BlobMetadataStoreV2"),
tableName: tableName,
ttl: ttl,
}
}

func (s *BlobMetadataStore) QueueNewBlobMetadata(ctx context.Context, blobMetadata *disperser.BlobMetadata) error {
item, err := MarshalBlobMetadata(blobMetadata)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/semver"
v2blobstore "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/dataapi/docs"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/logger"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -196,6 +198,7 @@ type (
allowOrigins []string
logger logging.Logger
blobstore disperser.BlobStore
blobstoreV2 *v2blobstore.BlobMetadataStore
promClient PrometheusClient
subgraphClient SubgraphClient
transactor core.Reader
Expand All @@ -214,6 +217,7 @@ type (
func NewServer(
config Config,
blobstore disperser.BlobStore,
blobstoreV2 *v2blobstore.BlobMetadataStore,
promClient PrometheusClient,
subgraphClient SubgraphClient,
transactor core.Reader,
Expand Down Expand Up @@ -245,6 +249,7 @@ func NewServer(
socketAddr: config.SocketAddr,
allowOrigins: config.AllowOrigins,
blobstore: blobstore,
blobstoreV2: blobstoreV2,
promClient: promClient,
subgraphClient: subgraphClient,
transactor: transactor,
Expand Down
Loading

0 comments on commit 6a2ba50

Please sign in to comment.