Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up v2 EncodingClient in EncodingManager #897

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions disperser/cmd/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
if err != nil {
return Config{}, err
}
ethClientConfig := geth.ReadEthClientConfig(ctx)
ethClientConfig := geth.ReadEthClientConfigRPCOnly(ctx)
numRelayAssignments := ctx.GlobalInt(flags.NumRelayAssignmentFlag.Name)
if numRelayAssignments < 1 || numRelayAssignments > int(MaxUint16) {
return Config{}, fmt.Errorf("invalid number of relay assignments: %d", numRelayAssignments)
Expand All @@ -53,7 +53,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
}
relays := make([]corev2.RelayKey, len(availableRelays))
for i, relay := range availableRelays {
if relay < 1 || relay > 65_535 {
if relay < 0 || relay > 65_535 {
return Config{}, fmt.Errorf("invalid relay: %d", relay)
}
relays[i] = corev2.RelayKey(relay)
Expand All @@ -70,6 +70,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
NumEncodingRetries: ctx.GlobalInt(flags.NumEncodingRetriesFlag.Name),
NumRelayAssignment: uint16(numRelayAssignments),
AvailableRelays: relays,
EncoderAddress: ctx.GlobalString(flags.EncoderAddressFlag.Name),
},
DispatcherConfig: controller.DispatcherConfig{
PullInterval: ctx.GlobalDuration(flags.DispatcherPullIntervalFlag.Name),
Expand Down
7 changes: 7 additions & 0 deletions disperser/cmd/controller/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "AVAILABLE_RELAYS"),
}
EncoderAddressFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "encoder-address"),
Usage: "the http ip:port which the distributed encoder server is listening",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENCODER_ADDRESS"),
}
EncodingRequestTimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "encoding-request-timeout"),
Usage: "Timeout for encoding requests",
Expand Down Expand Up @@ -153,6 +159,7 @@ var requiredFlags = []cli.Flag{
UseGraphFlag,
EncodingPullIntervalFlag,
AvailableRelaysFlag,
EncoderAddressFlag,
DispatcherPullIntervalFlag,
NodeRequestTimeoutFlag,
NumConnectionsToNodesFlag,
Expand Down
7 changes: 6 additions & 1 deletion disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/controller"
"github.com/Layr-Labs/eigenda/disperser/encoder"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gammazero/workerpool"
Expand Down Expand Up @@ -75,12 +76,16 @@ func RunController(ctx *cli.Context) error {
config.DynamoDBTableName,
)

encoderClient, err := encoder.NewEncoderClientV2(config.EncodingManagerConfig.EncoderAddress)
if err != nil {
return fmt.Errorf("failed to create encoder client: %v", err)
}
encodingPool := workerpool.New(config.NumConcurrentEncodingRequests)
encodingManager, err := controller.NewEncodingManager(
config.EncodingManagerConfig,
blobMetadataStore,
encodingPool,
nil, // TODO(ian-shim): configure encodingClient
encoderClient,
chainReader,
logger,
)
Expand Down
4 changes: 3 additions & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
var errNoBlobsToEncode = errors.New("no blobs to encode")

type EncodingManagerConfig struct {
PullInterval time.Duration
PullInterval time.Duration

EncodingRequestTimeout time.Duration
StoreTimeout time.Duration
Expand All @@ -32,6 +32,8 @@ type EncodingManagerConfig struct {
NumRelayAssignment uint16
// AvailableRelays is a list of available relays
AvailableRelays []corev2.RelayKey
// EncoderAddress is the address of the encoder
EncoderAddress string
}

// EncodingManager is responsible for pulling queued blobs from the blob
Expand Down
16 changes: 3 additions & 13 deletions disperser/encoder/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package encoder
import (
"context"
"fmt"
"time"

corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser"
Expand All @@ -14,14 +13,12 @@ import (
)

type clientV2 struct {
addr string
timeout time.Duration
addr string
}

func NewEncoderClientV2(addr string, timeout time.Duration) (disperser.EncoderClientV2, error) {
func NewEncoderClientV2(addr string) (disperser.EncoderClientV2, error) {
return &clientV2{
addr: addr,
timeout: timeout,
addr: addr,
}, nil
}

Expand All @@ -48,13 +45,6 @@ func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encod
},
}

// Add timeout if specified
if c.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.timeout)
defer cancel()
}

// Make the RPC call
reply, err := client.EncodeBlob(ctx, req)
if err != nil {
Expand Down
Loading