diff --git a/disperser/Makefile b/disperser/Makefile index e8c10036de..66e13fc119 100644 --- a/disperser/Makefile +++ b/disperser/Makefile @@ -15,6 +15,9 @@ build_encoder: build_dataapi: go build -o ./bin/dataapi ./cmd/dataapi +build_controller: + go build -o ./bin/controller ./cmd/controller + run_batcher: build_batcher ./bin/batcher \ --batcher.pull-interval 10s \ diff --git a/disperser/cmd/controller/config.go b/disperser/cmd/controller/config.go new file mode 100644 index 0000000000..7685b76222 --- /dev/null +++ b/disperser/cmd/controller/config.go @@ -0,0 +1,93 @@ +package main + +import ( + "fmt" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/geth" + "github.com/Layr-Labs/eigenda/core/thegraph" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags" + "github.com/Layr-Labs/eigenda/disperser/controller" + "github.com/Layr-Labs/eigenda/indexer" + "github.com/urfave/cli" +) + +const MaxUint16 = ^uint16(0) + +type Config struct { + EncodingManagerConfig controller.EncodingManagerConfig + DispatcherConfig controller.DispatcherConfig + NumConcurrentEncodingRequests int + NumConcurrentDispersalRequests int + NodeClientCacheSize int + + DynamoDBTableName string + + EthClientConfig geth.EthClientConfig + AwsClientConfig aws.ClientConfig + LoggerConfig common.LoggerConfig + IndexerConfig indexer.Config + ChainStateConfig thegraph.Config + UseGraph bool + IndexerDataDir string + + BLSOperatorStateRetrieverAddr string + EigenDAServiceManagerAddr string +} + +func NewConfig(ctx *cli.Context) (Config, error) { + loggerConfig, err := common.ReadLoggerCLIConfig(ctx, flags.FlagPrefix) + if err != nil { + return Config{}, err + } + ethClientConfig := geth.ReadEthClientConfig(ctx) + numRelayAssignments := ctx.GlobalInt(flags.NumRelayAssignmentFlag.Name) + if numRelayAssignments < 1 || numRelayAssignments > int(MaxUint16) { + return Config{}, fmt.Errorf("invalid number of relay assignments: %d", numRelayAssignments) + } + availableRelays := ctx.GlobalIntSlice(flags.AvailableRelaysFlag.Name) + if len(availableRelays) == 0 { + return Config{}, fmt.Errorf("no available relays specified") + } + relays := make([]corev2.RelayKey, len(availableRelays)) + for i, relay := range availableRelays { + if relay < 1 || relay > 65_535 { + return Config{}, fmt.Errorf("invalid relay: %d", relay) + } + relays[i] = corev2.RelayKey(relay) + } + config := Config{ + DynamoDBTableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name), + EthClientConfig: ethClientConfig, + AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix), + LoggerConfig: *loggerConfig, + EncodingManagerConfig: controller.EncodingManagerConfig{ + PullInterval: ctx.GlobalDuration(flags.EncodingPullIntervalFlag.Name), + EncodingRequestTimeout: ctx.GlobalDuration(flags.EncodingRequestTimeoutFlag.Name), + StoreTimeout: ctx.GlobalDuration(flags.EncodingStoreTimeoutFlag.Name), + NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name), + NumRelayAssignment: uint16(numRelayAssignments), + AvailableRelays: relays, + }, + DispatcherConfig: controller.DispatcherConfig{ + PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name), + FinalizationBlockDelay: ctx.GlobalUint64(flags.FinalizationBlockDelayFlag.Name), + NodeRequestTimeout: ctx.GlobalDuration(flags.NodeRequestTimeoutFlag.Name), + NumRequestRetries: ctx.GlobalInt(flags.NumRequestRetriesFlag.Name), + NumConnectionsToNodes: ctx.GlobalInt(flags.NumConnectionsToNodesFlag.Name), + }, + NumConcurrentEncodingRequests: ctx.GlobalInt(flags.NumConcurrentEncodingRequestsFlag.Name), + NumConcurrentDispersalRequests: ctx.GlobalInt(flags.NumConcurrentDispersalRequestsFlag.Name), + NodeClientCacheSize: ctx.GlobalInt(flags.NodeClientCacheNumEntriesFlag.Name), + IndexerConfig: indexer.ReadIndexerConfig(ctx), + ChainStateConfig: thegraph.ReadCLIConfig(ctx), + UseGraph: ctx.GlobalBool(flags.UseGraphFlag.Name), + IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name), + + BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), + EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name), + } + return config, nil +} diff --git a/disperser/cmd/controller/flags/flags.go b/disperser/cmd/controller/flags/flags.go new file mode 100644 index 0000000000..8092e26bdb --- /dev/null +++ b/disperser/cmd/controller/flags/flags.go @@ -0,0 +1,183 @@ +package flags + +import ( + "time" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/geth" + "github.com/Layr-Labs/eigenda/core/thegraph" + "github.com/Layr-Labs/eigenda/indexer" + "github.com/urfave/cli" +) + +const ( + FlagPrefix = "controller" + envVarPrefix = "CONTROLLER" +) + +var ( + DynamoDBTableNameFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "dynamodb-table-name"), + Usage: "Name of the dynamodb table to store blob metadata", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMODB_TABLE_NAME"), + } + BlsOperatorStateRetrieverFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "bls-operator-state-retriever"), + Usage: "Address of the BLS Operator State Retriever", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "BLS_OPERATOR_STATE_RETRIVER"), + } + EigenDAServiceManagerFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "eigenda-service-manager"), + Usage: "Address of the EigenDA Service Manager", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_SERVICE_MANAGER"), + } + UseGraphFlag = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "use-graph"), + Usage: "Whether to use the graph node", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "USE_GRAPH"), + } + IndexerDataDirFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "indexer-data-dir"), + Usage: "the data directory for the indexer", + EnvVar: common.PrefixEnvVar(envVarPrefix, "INDEXER_DATA_DIR"), + Required: false, + Value: "./data/", + } + // EncodingManager Flags + EncodingPullIntervalFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "encoding-pull-interval"), + Usage: "Interval at which to pull from the queue", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODING_PULL_INTERVAL"), + } + AvailableRelaysFlag = cli.IntSliceFlag{ + Name: common.PrefixFlag(FlagPrefix, "available-relays"), + Usage: "List of available relays", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "AVAILABLE_RELAYS"), + } + EncodingRequestTimeoutFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "encoding-request-timeout"), + Usage: "Timeout for encoding requests", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODING_REQUEST_TIMEOUT"), + Value: 5 * time.Minute, + } + EncodingStoreTimeoutFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "encoding-store-timeout"), + Usage: "Timeout for interacting with blob store", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODING_STORE_TIMEOUT"), + Value: 15 * time.Second, + } + NumEncodingRetriesFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "num-encoding-retries"), + Usage: "Number of retries for encoding requests", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_ENCODING_RETRIES"), + Value: 3, + } + NumRelayAssignmentFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "num-relay-assignment"), + Usage: "Number of relays to assign to each encoding request", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_RELAY_ASSIGNMENT"), + Value: 2, + } + NumConcurrentEncodingRequestsFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "num-concurrent-encoding-requests"), + Usage: "Number of concurrent encoding requests", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONCURRENT_ENCODING_REQUESTS"), + Value: 250, + } + + // Dispatcher Flags + DispatcherPullIntervalFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "dispatcher-pull-interval"), + Usage: "Interval at which to pull from the queue", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DISPATCHER_PULL_INTERVAL"), + } + NodeRequestTimeoutFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "node-request-timeout"), + Usage: "Timeout for node requests", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "NODE_REQUEST_TIMEOUT"), + } + NumConnectionsToNodesFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "num-connections-to-nodes"), + Usage: "Max number of connections to nodes", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONNECTIONS_TO_NODES"), + } + FinalizationBlockDelayFlag = cli.Uint64Flag{ + Name: common.PrefixFlag(FlagPrefix, "finalization-block-delay"), + Usage: "Number of blocks to wait before finalizing", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"), + Value: 75, + } + NumRequestRetriesFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "num-request-retries"), + Usage: "Number of retries for node requests", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_REQUEST_RETRIES"), + Value: 3, + } + NumConcurrentDispersalRequestsFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "num-concurrent-dispersal-requests"), + Usage: "Number of concurrent dispersal requests", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "NUM_CONCURRENT_DISPERSAL_REQUESTS"), + Value: 600, + } + NodeClientCacheNumEntriesFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "node-client-cache-num-entries"), + Usage: "Size (number of entries) of the node client cache", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "NODE_CLIENT_CACHE_NUM_ENTRIES"), + Value: 400, + } +) + +var requiredFlags = []cli.Flag{ + DynamoDBTableNameFlag, + BlsOperatorStateRetrieverFlag, + EigenDAServiceManagerFlag, + UseGraphFlag, + EncodingPullIntervalFlag, + AvailableRelaysFlag, + DispatcherPullIntervalFlag, + NodeRequestTimeoutFlag, + NumConnectionsToNodesFlag, +} + +var optionalFlags = []cli.Flag{ + IndexerDataDirFlag, + EncodingRequestTimeoutFlag, + EncodingStoreTimeoutFlag, + NumEncodingRetriesFlag, + NumRelayAssignmentFlag, + NumConcurrentEncodingRequestsFlag, + FinalizationBlockDelayFlag, + NumRequestRetriesFlag, + NumConcurrentDispersalRequestsFlag, + NodeClientCacheNumEntriesFlag, +} + +var Flags []cli.Flag + +func init() { + Flags = append(requiredFlags, optionalFlags...) + Flags = append(Flags, geth.EthClientFlags(envVarPrefix)...) + Flags = append(Flags, common.LoggerCLIFlags(envVarPrefix, FlagPrefix)...) + Flags = append(Flags, indexer.CLIFlags(envVarPrefix)...) + Flags = append(Flags, aws.ClientFlags(envVarPrefix, FlagPrefix)...) + Flags = append(Flags, thegraph.CLIFlags(envVarPrefix)...) +} diff --git a/disperser/cmd/controller/main.go b/disperser/cmd/controller/main.go new file mode 100644 index 0000000000..14c343adb4 --- /dev/null +++ b/disperser/cmd/controller/main.go @@ -0,0 +1,153 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/common/geth" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/eth" + "github.com/Layr-Labs/eigenda/core/indexer" + "github.com/Layr-Labs/eigenda/core/thegraph" + "github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags" + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/disperser/controller" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + "github.com/gammazero/workerpool" + "github.com/urfave/cli" +) + +var ( + version string + gitCommit string + gitDate string +) + +func main() { + app := cli.NewApp() + app.Flags = flags.Flags + app.Version = fmt.Sprintf("%s-%s-%s", version, gitCommit, gitDate) + app.Name = "controller" + app.Usage = "EigenDA Controller" + app.Description = "EigenDA control plane for encoding and dispatching blobs" + + app.Action = RunController + err := app.Run(os.Args) + if err != nil { + log.Fatalf("application failed: %v", err) + } + select {} +} + +func RunController(ctx *cli.Context) error { + config, err := NewConfig(ctx) + if err != nil { + return err + } + + logger, err := common.NewLogger(config.LoggerConfig) + if err != nil { + return err + } + + dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger) + if err != nil { + return err + } + gethClient, err := geth.NewMultiHomingClient(config.EthClientConfig, gethcommon.Address{}, logger) + if err != nil { + logger.Error("Cannot create chain.Client", "err", err) + return err + } + chainReader, err := eth.NewReader(logger, gethClient, config.BLSOperatorStateRetrieverAddr, config.EigenDAServiceManagerAddr) + if err != nil { + return err + } + + blobMetadataStore := blobstore.NewBlobMetadataStore( + dynamoClient, + logger, + config.DynamoDBTableName, + ) + + encodingPool := workerpool.New(config.NumConcurrentEncodingRequests) + encodingManager, err := controller.NewEncodingManager( + config.EncodingManagerConfig, + blobMetadataStore, + encodingPool, + nil, // TODO(ian-shim): configure encodingClient + chainReader, + logger, + ) + if err != nil { + return fmt.Errorf("failed to create encoding manager: %v", err) + } + + sigAgg, err := core.NewStdSignatureAggregator(logger, chainReader) + if err != nil { + return fmt.Errorf("failed to create signature aggregator: %v", err) + } + dispatcherPool := workerpool.New(config.NumConcurrentDispersalRequests) + chainState := eth.NewChainState(chainReader, gethClient) + var ics core.IndexedChainState + if config.UseGraph { + logger.Info("Using graph node") + + logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) + ics = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger) + } else { + logger.Info("Using built-in indexer") + rpcClient, err := rpc.Dial(config.EthClientConfig.RPCURLs[0]) + if err != nil { + return err + } + idx, err := indexer.CreateNewIndexer( + &config.IndexerConfig, + gethClient, + rpcClient, + config.EigenDAServiceManagerAddr, + logger, + ) + if err != nil { + return err + } + ics, err = indexer.NewIndexedChainState(chainState, idx) + if err != nil { + return err + } + } + nodeClientManager, err := controller.NewNodeClientManager(config.NodeClientCacheSize, logger) + if err != nil { + return fmt.Errorf("failed to create node client manager: %v", err) + } + dispatcher, err := controller.NewDispatcher( + config.DispatcherConfig, + blobMetadataStore, + dispatcherPool, + ics, + sigAgg, + nodeClientManager, + logger, + ) + if err != nil { + return fmt.Errorf("failed to create dispatcher: %v", err) + } + + c := context.Background() + err = encodingManager.Start(c) + if err != nil { + return fmt.Errorf("failed to start encoding manager: %v", err) + } + + err = dispatcher.Start(c) + if err != nil { + return fmt.Errorf("failed to start dispatcher: %v", err) + } + + return nil +} diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index 6a9ad4219a..c7c39ad283 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -74,6 +74,11 @@ func NewDispatcher( } func (d *Dispatcher) Start(ctx context.Context) error { + err := d.chainState.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start chain state: %w", err) + } + go func() { ticker := time.NewTicker(d.PullInterval) defer ticker.Stop()