Skip to content

Commit

Permalink
wip adds v2 metadatastore flags to dataapi
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Nov 20, 2024
1 parent afd5894 commit b4b5cc8
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 288 deletions.
16 changes: 9 additions & 7 deletions disperser/cmd/dataapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type Config struct {
BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string

DisperserHostname string
ChurnerHostname string
BatcherHealthEndpt string
DisperserHostname string
ChurnerHostname string
BatcherHealthEndpt string
BlobMetadataStoreV2 string
}

func NewConfig(ctx *cli.Context) (Config, error) {
Expand Down Expand Up @@ -68,10 +69,11 @@ func NewConfig(ctx *cli.Context) (Config, error) {
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
},
DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name),
ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name),
BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name),
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name),
ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name),
BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name),
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
BlobMetadataStoreV2: ctx.GlobalString(flags.DynamoMetadataStoreV2Flag.Name),
}
return config, nil
}
7 changes: 7 additions & 0 deletions disperser/cmd/dataapi/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMO_TABLE_NAME"),
}
DynamoMetadataStoreV2Flag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "dynamo-metadata-store-v2"),
Usage: "Name of the dynamo table to store v2 blob metadata",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMO_METADATA_STORE_V2"),
}
S3BucketNameFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "s3-bucket-name"),
Usage: "Name of the bucket to store blobs",
Expand Down Expand Up @@ -136,6 +142,7 @@ var (

var requiredFlags = []cli.Flag{
DynamoTableNameFlag,
DynamoMetadataStoreV2Flag,
SocketAddrFlag,
S3BucketNameFlag,
SubgraphApiBatchMetadataAddrFlag,
Expand Down
21 changes: 12 additions & 9 deletions disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/disperser/cmd/dataapi/flags"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
v2blobstore "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/dataapi"
"github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus"
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
Expand Down Expand Up @@ -89,15 +90,16 @@ func RunDataApi(ctx *cli.Context) error {
}

var (
promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0)
sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger)
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr)
subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger)
chainState = coreeth.NewChainState(tx, client)
indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger)
metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger)
server = dataapi.NewServer(
promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0)
sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger)
blobMetadataStoreV2 = v2blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobMetadataStoreV2)
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr)
subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger)
chainState = coreeth.NewChainState(tx, client)
indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger)
metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger)
server = dataapi.NewServer(
dataapi.Config{
ServerMode: config.ServerMode,
SocketAddr: config.SocketAddr,
Expand All @@ -107,6 +109,7 @@ func RunDataApi(ctx *cli.Context) error {
BatcherHealthEndpt: config.BatcherHealthEndpt,
},
sharedStorage,
blobMetadataStoreV2,
promClient,
subgraphClient,
tx,
Expand Down
10 changes: 10 additions & 0 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func NewBlobMetadataStore(dynamoDBClient commondynamodb.Client, logger logging.L
}
}

func NewBlobMetadataStoreV2(dynamoDBClient commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore {
logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl)
return &BlobMetadataStore{
dynamoDBClient: dynamoDBClient,
logger: logger.With("component", "BlobMetadataStoreV2"),
tableName: tableName,
ttl: ttl,
}
}

func (s *BlobMetadataStore) QueueNewBlobMetadata(ctx context.Context, blobMetadata *disperser.BlobMetadata) error {
item, err := MarshalBlobMetadata(blobMetadata)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/semver"
v2blobstore "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/dataapi/docs"
"github.com/gin-contrib/cors"
"github.com/gin-contrib/logger"
Expand Down Expand Up @@ -196,6 +197,7 @@ type (
allowOrigins []string
logger logging.Logger
blobstore disperser.BlobStore
blobstoreV2 *v2blobstore.BlobMetadataStore
promClient PrometheusClient
subgraphClient SubgraphClient
transactor core.Reader
Expand All @@ -214,6 +216,7 @@ type (
func NewServer(
config Config,
blobstore disperser.BlobStore,
blobstoreV2 *v2blobstore.BlobMetadataStore,
promClient PrometheusClient,
subgraphClient SubgraphClient,
transactor core.Reader,
Expand Down Expand Up @@ -245,6 +248,7 @@ func NewServer(
socketAddr: config.SocketAddr,
allowOrigins: config.AllowOrigins,
blobstore: blobstore,
blobstoreV2: blobstoreV2,
promClient: promClient,
subgraphClient: subgraphClient,
transactor: transactor,
Expand Down
Loading

0 comments on commit b4b5cc8

Please sign in to comment.