diff --git a/disperser/cmd/controller/config.go b/disperser/cmd/controller/config.go index 49f736f552..1a5ded7edd 100644 --- a/disperser/cmd/controller/config.go +++ b/disperser/cmd/controller/config.go @@ -42,7 +42,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { if err != nil { return Config{}, err } - ethClientConfig := geth.ReadEthClientConfig(ctx) + ethClientConfig := geth.ReadEthClientConfigRPCOnly(ctx) numRelayAssignments := ctx.GlobalInt(flags.NumRelayAssignmentFlag.Name) if numRelayAssignments < 1 || numRelayAssignments > int(MaxUint16) { return Config{}, fmt.Errorf("invalid number of relay assignments: %d", numRelayAssignments) @@ -53,7 +53,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { } relays := make([]corev2.RelayKey, len(availableRelays)) for i, relay := range availableRelays { - if relay < 1 || relay > 65_535 { + if relay < 0 || relay > 65_535 { return Config{}, fmt.Errorf("invalid relay: %d", relay) } relays[i] = corev2.RelayKey(relay) @@ -70,6 +70,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name), NumRelayAssignment: uint16(numRelayAssignments), AvailableRelays: relays, + EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name), }, DispatcherConfig: controller.DispatcherConfig{ PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name), diff --git a/disperser/cmd/controller/flags/flags.go b/disperser/cmd/controller/flags/flags.go index 8092e26bdb..4b6c73173d 100644 --- a/disperser/cmd/controller/flags/flags.go +++ b/disperser/cmd/controller/flags/flags.go @@ -61,6 +61,12 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "AVAILABLE_RELAYS"), } + EncoderAddressFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "encoder-address"), + Usage: "the http ip:port which the distributed encoder server is listening", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODER_ADDRESS"), + } EncodingRequestTimeoutFlag = cli.DurationFlag{ Name: common.PrefixFlag(FlagPrefix, "encoding-request-timeout"), Usage: "Timeout for encoding requests", @@ -153,6 +159,7 @@ var requiredFlags = []cli.Flag{ UseGraphFlag, EncodingPullIntervalFlag, AvailableRelaysFlag, + EncoderAddressFlag, DispatcherPullIntervalFlag, NodeRequestTimeoutFlag, NumConnectionsToNodesFlag, diff --git a/disperser/cmd/controller/main.go b/disperser/cmd/controller/main.go index 14c343adb4..f3d21bb340 100644 --- a/disperser/cmd/controller/main.go +++ b/disperser/cmd/controller/main.go @@ -16,6 +16,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/disperser/controller" + "github.com/Layr-Labs/eigenda/disperser/encoder" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" "github.com/gammazero/workerpool" @@ -75,12 +76,16 @@ func RunController(ctx *cli.Context) error { config.DynamoDBTableName, ) + encoderClient, err := encoder.NewEncoderClientV2(config.EncodingManagerConfig.EncoderAddress) + if err != nil { + return fmt.Errorf("failed to create encoder client: %v", err) + } encodingPool := workerpool.New(config.NumConcurrentEncodingRequests) encodingManager, err := controller.NewEncodingManager( config.EncodingManagerConfig, blobMetadataStore, encodingPool, - nil, // TODO(ian-shim): configure encodingClient + encoderClient, chainReader, logger, ) diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index 28ce060b7d..43a49531e1 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -22,7 +22,7 @@ import ( var errNoBlobsToEncode = errors.New("no blobs to encode") type EncodingManagerConfig struct { - PullInterval time.Duration + PullInterval time.Duration EncodingRequestTimeout time.Duration StoreTimeout time.Duration @@ -32,6 +32,8 @@ type EncodingManagerConfig struct { NumRelayAssignment uint16 // AvailableRelays is a list of available relays AvailableRelays []corev2.RelayKey + // EncoderAddress is the address of the encoder + EncoderAddress string } // EncodingManager is responsible for pulling queued blobs from the blob diff --git a/disperser/encoder/client_v2.go b/disperser/encoder/client_v2.go index 6096087ea8..d2328fcffd 100644 --- a/disperser/encoder/client_v2.go +++ b/disperser/encoder/client_v2.go @@ -3,7 +3,6 @@ package encoder import ( "context" "fmt" - "time" corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser" @@ -14,14 +13,12 @@ import ( ) type clientV2 struct { - addr string - timeout time.Duration + addr string } -func NewEncoderClientV2(addr string, timeout time.Duration) (disperser.EncoderClientV2, error) { +func NewEncoderClientV2(addr string) (disperser.EncoderClientV2, error) { return &clientV2{ - addr: addr, - timeout: timeout, + addr: addr, }, nil } @@ -48,13 +45,6 @@ func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encod }, } - // Add timeout if specified - if c.timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, c.timeout) - defer cancel() - } - // Make the RPC call reply, err := client.EncodeBlob(ctx, req) if err != nil {