Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Controller Entrypoint #873

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions disperser/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ build_encoder:
build_dataapi:
go build -o ./bin/dataapi ./cmd/dataapi

build_controller:
go build -o ./bin/controller ./cmd/controller

run_batcher: build_batcher
./bin/batcher \
--batcher.pull-interval 10s \
Expand Down
92 changes: 92 additions & 0 deletions disperser/cmd/controller/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package main

import (
"fmt"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core/thegraph"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags"
"github.com/Layr-Labs/eigenda/disperser/controller"
"github.com/Layr-Labs/eigenda/indexer"
"github.com/urfave/cli"
)

const MaxUint16 = ^uint16(0)

type Config struct {
EncodingManagerConfig controller.EncodingManagerConfig
DispatcherConfig controller.DispatcherConfig
NumConcurrentEncodingRequests int
NumConcurrentDispersalRequests int
NodeClientCacheSize int

DynamoDBTableName string

EthClientConfig geth.EthClientConfig
AwsClientConfig aws.ClientConfig
LoggerConfig common.LoggerConfig
IndexerConfig indexer.Config
ChainStateConfig thegraph.Config
UseGraph bool
IndexerDataDir string

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
}

func NewConfig(ctx *cli.Context) (Config, error) {
loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix)
if err != nil {
return Config{}, err
}
ethClientConfig := geth.ReadEthClientConfig(ctx)
numRelayAssignments := ctx.GlobalInt(flags.NumRelayAssignmentFlag.Name)
if numRelayAssignments < 1 || numRelayAssignments > int(MaxUint16) {
return Config{}, fmt.Errorf("invalid number of relay assignments: %d", numRelayAssignments)
}
availableRelays := ctx.GlobalIntSlice(flags.AvailableRelaysFlag.Name)
if len(availableRelays) == 0 {
return Config{}, fmt.Errorf("no available relays specified")
}
relays := make([]corev2.RelayKey, len(availableRelays))
for i, relay := range availableRelays {
if relay < 1 || relay > 65_535 {
return Config{}, fmt.Errorf("invalid relay: %d", relay)
}
relays[i] = corev2.RelayKey(relay)
}
config := Config{
DynamoDBTableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name),
EthClientConfig: ethClientConfig,
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
LoggerConfig: *loggerConfig,
EncodingManagerConfig: controller.EncodingManagerConfig{
PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name),
EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name),
StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name),
NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name),
NumRelayAssignment: uint16(numRelayAssignments),
AvailableRelays: relays,
},
DispatcherConfig: controller.DispatcherConfig{
PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name),
FinalizationBlockDelay: ctx.GlobalUint64(flags.FinalizationBlockDelayFlag.Name),
NodeRequestTimeout: ctx.GlobalDuration(flags.NodeRequestTimeoutFlag.Name),
NumRequestRetries: ctx.GlobalInt(flags.NumRequestRetriesFlag.Name),
},
NumConcurrentEncodingRequests: ctx.GlobalInt(flags.NumConcurrentEncodingRequestsFlag.Name),
NumConcurrentDispersalRequests: ctx.GlobalInt(flags.NumConcurrentDispersalRequestsFlag.Name),
NodeClientCacheSize: ctx.GlobalInt(flags.NodeClientCacheNumEntriesFlag.Name),
IndexerConfig: indexer.ReadIndexerConfig(ctx),
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
UseGraph: ctx.GlobalBool(flags.UseGraphFlag.Name),
IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name),

BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
}
return config, nil
}
183 changes: 183 additions & 0 deletions disperser/cmd/controller/flags/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package flags

import (
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/indexer"
"github.com/urfave/cli"
)

const (
FlagPrefix = "controller"
envVarPrefix = "CONTROLLER"
)

var (
DynamoDBTableNameFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "dynamodb-table-name"),
Usage: "Name of the dynamodb table to store blob metadata",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMODB_TABLE_NAME"),
}
BlsOperatorStateRetrieverFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "bls-operator-state-retriever"),
Usage: "Address of the BLS Operator State Retriever",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "BLS_OPERATOR_STATE_RETRIVER"),
}
EigenDAServiceManagerFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "eigenda-service-manager"),
Usage: "Address of the EigenDA Service Manager",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_SERVICE_MANAGER"),
}
UseGraphFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "use-graph"),
Usage: "Whether to use the graph node",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "USE_GRAPH"),
}
IndexerDataDirFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "indexer-data-dir"),
Usage: "the data directory for the indexer",
EnvVar: common.PrefixEnvVar(envVarPrefix, "INDEXER_DATA_DIR"),
Required: false,
Value: "./data/",
}
// EncodingManager Flags
EncodingPullIntervalFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-pull-interval"),
Usage: "Interval at which to pull from the queue",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODING_PULL_INTERVAL"),
}
AvailableRelaysFlag = cli.IntSliceFlag{
Name: common.PrefixFlag(FlagPrefix, "available-relays"),
Usage: "List of available relays",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "AVAILABLE_RELAYS"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work? Each relay has a integer assigned to it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will be a list of integer representing the disperser's relay keys

}
EncodingRequestTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-request-timeout"),
Usage: "Timeout for encoding requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODING_REQUEST_TIMEOUT"),
Value: 5 * time.Minute,
}
EncodingStoreTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-store-timeout"),
Usage: "Timeout for interacting with blob store",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODING_STORE_TIMEOUT"),
Value: 15 * time.Second,
}
NumEncodingRetriesFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-encoding-retries"),
Usage: "Number of retries for encoding requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_ENCODING_RETRIES"),
Value: 3,
}
NumRelayAssignmentFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-relay-assignment"),
Usage: "Number of relays to assign to each encoding request",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_RELAY_ASSIGNMENT"),
Value: 2,
}
NumConcurrentEncodingRequestsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-concurrent-encoding-requests"),
Usage: "Number of concurrent encoding requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONCURRENT_ENCODING_REQUESTS"),
Value: 250,
}

// Dispatcher Flags
DispatcherPullIntervalFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "dispatcher-pull-interval"),
Usage: "Interval at which to pull from the queue",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DISPATCHER_PULL_INTERVAL"),
}
NodeRequestTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "node-request-timeout"),
Usage: "Timeout for node requests",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NODE_REQUEST_TIMEOUT"),
}
NumConnectionsToNodesFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-connections-to-nodes"),
Usage: "Max number of connections to nodes",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONNECTIONS_TO_NODES"),
}
FinalizationBlockDelayFlag = cli.Uint64Flag{
Name: common.PrefixFlag(FlagPrefix, "finalization-block-delay"),
Usage: "Number of blocks to wait before finalizing",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"),
Value: 75,
}
NumRequestRetriesFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-request-retries"),
Usage: "Number of retries for node requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_REQUEST_RETRIES"),
Value: 3,
}
NumConcurrentDispersalRequestsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-concurrent-dispersal-requests"),
Usage: "Number of concurrent dispersal requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONCURRENT_DISPERSAL_REQUESTS"),
Value: 600,
}
NodeClientCacheNumEntriesFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "node-client-cache-num-entries"),
Usage: "Size (number of entries) of the node client cache",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NODE_CLIENT_CACHE_NUM_ENTRIES"),
Value: 400,
}
)

var requiredFlags = []cli.Flag{
DynamoDBTableNameFlag,
BlsOperatorStateRetrieverFlag,
EigenDAServiceManagerFlag,
UseGraphFlag,
EncodingPullIntervalFlag,
AvailableRelaysFlag,
DispatcherPullIntervalFlag,
NodeRequestTimeoutFlag,
NumConnectionsToNodesFlag,
}

var optionalFlags = []cli.Flag{
IndexerDataDirFlag,
EncodingRequestTimeoutFlag,
EncodingStoreTimeoutFlag,
NumEncodingRetriesFlag,
NumRelayAssignmentFlag,
NumConcurrentEncodingRequestsFlag,
FinalizationBlockDelayFlag,
NumRequestRetriesFlag,
NumConcurrentDispersalRequestsFlag,
NodeClientCacheNumEntriesFlag,
}

var Flags []cli.Flag

func init() {
Flags = append(requiredFlags, optionalFlags...)
Flags = append(Flags, geth.EthClientFlags(envVarPrefix)...)
Flags = append(Flags, common.LoggerCLIFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, indexer.CLIFlags(envVarPrefix)...)
Flags = append(Flags, aws.ClientFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, thegraph.CLIFlags(envVarPrefix)...)
}
Loading
Loading