Skip to content

Commit

Permalink
controller main
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 8, 2024
1 parent 469b084 commit e6a2d4f
Show file tree
Hide file tree
Showing 5 changed files with 434 additions and 0 deletions.
3 changes: 3 additions & 0 deletions disperser/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ build_encoder:
build_dataapi:
go build -o ./bin/dataapi ./cmd/dataapi

build_controller:
go build -o ./bin/controller ./cmd/controller

run_batcher: build_batcher
./bin/batcher \
--batcher.pull-interval 10s \
Expand Down
91 changes: 91 additions & 0 deletions disperser/cmd/controller/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package main

import (
"fmt"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core/thegraph"
v2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags"
"github.com/Layr-Labs/eigenda/disperser/controller"
"github.com/Layr-Labs/eigenda/indexer"
"github.com/urfave/cli"
)

type Config struct {
EncodingManagerConfig controller.EncodingManagerConfig
DispatcherConfig controller.DispatcherConfig
NumConcurrentEncodingRequests int
NumConcurrentDispersalRequests int
NodeClientCacheSize int

DynamoDBTableName string

EthClientConfig geth.EthClientConfig
AwsClientConfig aws.ClientConfig
LoggerConfig common.LoggerConfig
IndexerConfig indexer.Config
ChainStateConfig thegraph.Config
UseGraph bool
IndexerDataDir string

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
}

func NewConfig(ctx *cli.Context) (Config, error) {
loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix)
if err != nil {
return Config{}, err
}
ethClientConfig := geth.ReadEthClientConfig(ctx)
numRelayAssignments := ctx.GlobalInt(flags.NumRelayAssignmentFlag.Name)
if numRelayAssignments < 1 || numRelayAssignments > 65_535 {
return Config{}, fmt.Errorf("invalid number of relay assignments: %d", numRelayAssignments)
}
availableRelays := ctx.GlobalIntSlice(flags.AvailableRelaysFlag.Name)
if len(availableRelays) == 0 {
return Config{}, fmt.Errorf("no available relays specified")
}
relays := make([]v2.RelayKey, len(availableRelays))
for i, relay := range availableRelays {
if relay < 1 || relay > 65_535 {
return Config{}, fmt.Errorf("invalid relay: %d", relay)
}
relays[i] = v2.RelayKey(relay)
}
config := Config{
DynamoDBTableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name),
EthClientConfig: ethClientConfig,
AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix),
LoggerConfig: *loggerConfig,
EncodingManagerConfig: controller.EncodingManagerConfig{
PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name),
EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name),
StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name),
NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name),
NumRelayAssignment: uint16(numRelayAssignments),
AvailableRelays: relays,
},
DispatcherConfig: controller.DispatcherConfig{
PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name),
FinalizationBlockDelay: ctx.GlobalUint64(flags.FinalizationBlockDelayFlag.Name),
NodeRequestTimeout: ctx.GlobalDuration(flags.NodeRequestTimeoutFlag.Name),
NumRequestRetries: ctx.GlobalInt(flags.NumRequestRetriesFlag.Name),
NumConnectionsToNodes: ctx.GlobalInt(flags.NumConnectionsToNodesFlag.Name),
},
NumConcurrentEncodingRequests: ctx.GlobalInt(flags.NumConcurrentEncodingRequestsFlag.Name),
NumConcurrentDispersalRequests: ctx.GlobalInt(flags.NumConcurrentDispersalRequestsFlag.Name),
NodeClientCacheSize: ctx.GlobalInt(flags.NodeClientCacheSizeFlag.Name),
IndexerConfig: indexer.ReadIndexerConfig(ctx),
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
UseGraph: ctx.GlobalBool(flags.UseGraphFlag.Name),
IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name),

BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
}
return config, nil
}
182 changes: 182 additions & 0 deletions disperser/cmd/controller/flags/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package flags

import (
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/indexer"
"github.com/urfave/cli"
)

const (
FlagPrefix = "controller"
envVarPrefix = "CONTROLLER"
)

var (
DynamoDBTableNameFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "dynamodb-table-name"),
Usage: "Name of the dynamodb table to store blob metadata",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMODB_TABLE_NAME"),
}
BlsOperatorStateRetrieverFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "bls-operator-state-retriever"),
Usage: "Address of the BLS Operator State Retriever",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "BLS_OPERATOR_STATE_RETRIVER"),
}
EigenDAServiceManagerFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "eigenda-service-manager"),
Usage: "Address of the EigenDA Service Manager",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_SERVICE_MANAGER"),
}
UseGraphFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "use-graph"),
Usage: "Whether to use the graph node",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "USE_GRAPH"),
}
IndexerDataDirFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "indexer-data-dir"),
Usage: "the data directory for the indexer",
EnvVar: common.PrefixEnvVar(envVarPrefix, "INDEXER_DATA_DIR"),
Value: "./data/",
}
// EncodingManager Flags
EncodingPullIntervalFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-pull-interval"),
Usage: "Interval at which to pull from the queue",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODING_PULL_INTERVAL"),
}
AvailableRelaysFlag = cli.IntSliceFlag{
Name: common.PrefixFlag(FlagPrefix, "available-relays"),
Usage: "List of available relays",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "AVAILABLE_RELAYS"),
}
EncodingRequestTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-request-timeout"),
Usage: "Timeout for encoding requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODING_REQUEST_TIMEOUT"),
Value: 5 * time.Minute,
}
EncodingStoreTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-store-timeout"),
Usage: "Timeout for interacting with blob store",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODING_STORE_TIMEOUT"),
Value: 15 * time.Second,
}
NumEncodingRetriesFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-encoding-retries"),
Usage: "Number of retries for encoding requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_ENCODING_RETRIES"),
Value: 3,
}
NumRelayAssignmentFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-relay-assignment"),
Usage: "Number of relays to assign to each encoding request",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_RELAY_ASSIGNMENT"),
Value: 2,
}
NumConcurrentEncodingRequestsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-concurrent-encoding-requests"),
Usage: "Number of concurrent encoding requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONCURRENT_ENCODING_REQUESTS"),
Value: 250,
}

// Dispatcher Flags
DispatcherPullIntervalFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "dispatcher-pull-interval"),
Usage: "Interval at which to pull from the queue",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DISPATCHER_PULL_INTERVAL"),
}
NodeRequestTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "node-request-timeout"),
Usage: "Timeout for node requests",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NODE_REQUEST_TIMEOUT"),
}
NumConnectionsToNodesFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-connections-to-nodes"),
Usage: "Max number of connections to nodes",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONNECTIONS_TO_NODES"),
}
FinalizationBlockDelayFlag = cli.Uint64Flag{
Name: common.PrefixFlag(FlagPrefix, "finalization-block-delay"),
Usage: "Number of blocks to wait before finalizing",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"),
Value: 75,
}
NumRequestRetriesFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-request-retries"),
Usage: "Number of retries for node requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_REQUEST_RETRIES"),
Value: 3,
}
NumConcurrentDispersalRequestsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "num-concurrent-dispersal-requests"),
Usage: "Number of concurrent dispersal requests",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONCURRENT_DISPERSAL_REQUESTS"),
Value: 600,
}
NodeClientCacheSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "node-client-cache-size"),
Usage: "Size of the node client cache",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "NODE_CLIENT_CACHE_SIZE"),
Value: 400,
}
)

var requiredFlags = []cli.Flag{
DynamoDBTableNameFlag,
BlsOperatorStateRetrieverFlag,
EigenDAServiceManagerFlag,
UseGraphFlag,
EncodingPullIntervalFlag,
AvailableRelaysFlag,
DispatcherPullIntervalFlag,
NodeRequestTimeoutFlag,
NumConnectionsToNodesFlag,
}

var optionalFlags = []cli.Flag{
IndexerDataDirFlag,
EncodingRequestTimeoutFlag,
EncodingStoreTimeoutFlag,
NumEncodingRetriesFlag,
NumRelayAssignmentFlag,
NumConcurrentEncodingRequestsFlag,
FinalizationBlockDelayFlag,
NumRequestRetriesFlag,
NumConcurrentDispersalRequestsFlag,
NodeClientCacheSizeFlag,
}

var Flags []cli.Flag

func init() {
Flags = append(requiredFlags, optionalFlags...)
Flags = append(Flags, geth.EthClientFlags(envVarPrefix)...)
Flags = append(Flags, common.LoggerCLIFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, indexer.CLIFlags(envVarPrefix)...)
Flags = append(Flags, aws.ClientFlags(envVarPrefix, FlagPrefix)...)
Flags = append(Flags, thegraph.CLIFlags(envVarPrefix)...)
}
Loading

0 comments on commit e6a2d4f

Please sign in to comment.