Skip to content

Commit

Permalink
Set up v2 EncodingClient in EncodingManager (#897)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 15, 2024
1 parent d0dfd2c commit 23c9718
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 17 deletions.
5 changes: 3 additions & 2 deletions disperser/cmd/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
if err != nil {
return Config{}, err
}
ethClientConfig := geth.ReadEthClientConfig(ctx)
ethClientConfig := geth.ReadEthClientConfigRPCOnly(ctx)
numRelayAssignments := ctx.GlobalInt(flags.NumRelayAssignmentFlag.Name)
if numRelayAssignments < 1 || numRelayAssignments > int(MaxUint16) {
return Config{}, fmt.Errorf("invalid number of relay assignments: %d", numRelayAssignments)
Expand All @@ -53,7 +53,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
}
relays := make([]corev2.RelayKey, len(availableRelays))
for i, relay := range availableRelays {
if relay < 1 || relay > 65_535 {
if relay < 0 || relay > 65_535 {
return Config{}, fmt.Errorf("invalid relay: %d", relay)
}
relays[i] = corev2.RelayKey(relay)
Expand All @@ -70,6 +70,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name),
NumRelayAssignment: uint16(numRelayAssignments),
AvailableRelays: relays,
EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name),
},
DispatcherConfig: controller.DispatcherConfig{
PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name),
Expand Down
7 changes: 7 additions & 0 deletions disperser/cmd/controller/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "AVAILABLE_RELAYS"),
}
EncoderAddressFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "encoder-address"),
Usage: "the http ip:port which the distributed encoder server is listening",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODER_ADDRESS"),
}
EncodingRequestTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-request-timeout"),
Usage: "Timeout for encoding requests",
Expand Down Expand Up @@ -153,6 +159,7 @@ var requiredFlags = []cli.Flag{
UseGraphFlag,
EncodingPullIntervalFlag,
AvailableRelaysFlag,
EncoderAddressFlag,
DispatcherPullIntervalFlag,
NodeRequestTimeoutFlag,
NumConnectionsToNodesFlag,
Expand Down
7 changes: 6 additions & 1 deletion disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/controller"
"github.com/Layr-Labs/eigenda/disperser/encoder"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gammazero/workerpool"
Expand Down Expand Up @@ -75,12 +76,16 @@ func RunController(ctx *cli.Context) error {
config.DynamoDBTableName,
)

encoderClient, err := encoder.NewEncoderClientV2(config.EncodingManagerConfig.EncoderAddress)
if err != nil {
return fmt.Errorf("failed to create encoder client: %v", err)
}
encodingPool := workerpool.New(config.NumConcurrentEncodingRequests)
encodingManager, err := controller.NewEncodingManager(
config.EncodingManagerConfig,
blobMetadataStore,
encodingPool,
nil, // TODO(ian-shim): configure encodingClient
encoderClient,
chainReader,
logger,
)
Expand Down
4 changes: 3 additions & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
var errNoBlobsToEncode = errors.New("no blobs to encode")

type EncodingManagerConfig struct {
PullInterval time.Duration
PullInterval time.Duration

EncodingRequestTimeout time.Duration
StoreTimeout time.Duration
Expand All @@ -32,6 +32,8 @@ type EncodingManagerConfig struct {
NumRelayAssignment uint16
// AvailableRelays is a list of available relays
AvailableRelays []corev2.RelayKey
// EncoderAddress is the address of the encoder
EncoderAddress string
}

// EncodingManager is responsible for pulling queued blobs from the blob
Expand Down
16 changes: 3 additions & 13 deletions disperser/encoder/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package encoder
import (
"context"
"fmt"
"time"

corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser"
Expand All @@ -14,14 +13,12 @@ import (
)

type clientV2 struct {
addr string
timeout time.Duration
addr string
}

func NewEncoderClientV2(addr string, timeout time.Duration) (disperser.EncoderClientV2, error) {
func NewEncoderClientV2(addr string) (disperser.EncoderClientV2, error) {
return &clientV2{
addr: addr,
timeout: timeout,
addr: addr,
}, nil
}

Expand All @@ -48,13 +45,6 @@ func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encod
},
}

// Add timeout if specified
if c.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.timeout)
defer cancel()
}

// Make the RPC call
reply, err := client.EncodeBlob(ctx, req)
if err != nil {
Expand Down

0 comments on commit 23c9718

Please sign in to comment.