diff --git a/core/mock/tx.go b/core/mock/tx.go index ca76c01514..7b335aee2e 100644 --- a/core/mock/tx.go +++ b/core/mock/tx.go @@ -43,7 +43,7 @@ func (t *MockTransactor) RegisterOperator( operatorToAvsRegistrationSigSalt [32]byte, operatorToAvsRegistrationSigExpiry *big.Int, ) error { - args := t.Called() + args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry) return args.Error(0) } @@ -56,7 +56,7 @@ func (t *MockTransactor) RegisterOperatorWithChurn( operatorToAvsRegistrationSigSalt [32]byte, operatorToAvsRegistrationSigExpiry *big.Int, churnReply *churner.ChurnReply) error { - args := t.Called() + args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry, churnReply) return args.Error(0) } diff --git a/node/churner_client.go b/node/churner_client.go new file mode 100644 index 0000000000..36db85be40 --- /dev/null +++ b/node/churner_client.go @@ -0,0 +1,100 @@ +package node + +import ( + "context" + "crypto/tls" + "errors" + "time" + + churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/operators/churner" + "github.com/Layr-Labs/eigensdk-go/logging" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +type ChurnerClient interface { + // Churn sends a churn request to the churner service + // The quorumIDs cannot be empty, but may contain quorums that the operator is already registered in. + // If the operator is already registered in a quorum, the churner will ignore it and continue with the other quorums. + Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) +} + +type churnerClient struct { + churnerURL string + useSecureGrpc bool + timeout time.Duration + logger logging.Logger +} + +func NewChurnerClient(churnerURL string, useSecureGrpc bool, timeout time.Duration, logger logging.Logger) ChurnerClient { + return &churnerClient{ + churnerURL: churnerURL, + useSecureGrpc: useSecureGrpc, + timeout: timeout, + logger: logger, + } +} + +func (c *churnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) { + if len(quorumIDs) == 0 { + return nil, errors.New("quorumIDs cannot be empty") + } + // generate salt + privateKeyBytes := []byte(keyPair.PrivKey.String()) + salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumIDs[:], privateKeyBytes) + + churnRequest := &churner.ChurnRequest{ + OperatorAddress: gethcommon.HexToAddress(operatorAddress), + OperatorToRegisterPubkeyG1: keyPair.PubKey, + OperatorToRegisterPubkeyG2: keyPair.GetPubKeyG2(), + OperatorRequestSignature: &core.Signature{}, + QuorumIDs: quorumIDs, + } + + copy(churnRequest.Salt[:], salt) + + // sign the request + churnRequest.OperatorRequestSignature = keyPair.SignMessage(churner.CalculateRequestHash(churnRequest)) + + // convert to protobuf + churnRequestPb := &churnerpb.ChurnRequest{ + OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(), + OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(), + OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(), + Salt: salt[:], + OperatorAddress: operatorAddress, + } + + churnRequestPb.QuorumIds = make([]uint32, len(quorumIDs)) + for i, quorumID := range quorumIDs { + churnRequestPb.QuorumIds[i] = uint32(quorumID) + } + credential := insecure.NewCredentials() + if c.useSecureGrpc { + config := &tls.Config{} + credential = credentials.NewTLS(config) + } + + conn, err := grpc.Dial( + c.churnerURL, + grpc.WithTransportCredentials(credential), + ) + if err != nil { + c.logger.Error("Node cannot connect to churner", "err", err) + return nil, err + } + defer conn.Close() + + gc := churnerpb.NewChurnerClient(conn) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300) + + return gc.Churn(ctx, churnRequestPb, opt) +} diff --git a/node/config.go b/node/config.go index 1a8fdd40e2..2d5c4c6e44 100644 --- a/node/config.go +++ b/node/config.go @@ -90,6 +90,9 @@ func NewConfig(ctx *cli.Context) (*Config, error) { } ids = append(ids, core.QuorumID(val)) } + if len(ids) == 0 { + return nil, errors.New("no quorum ids provided") + } expirationPollIntervalSec := ctx.GlobalUint64(flags.ExpirationPollIntervalSecFlag.Name) if expirationPollIntervalSec <= minExpirationPollIntervalSec { diff --git a/node/flags/flags.go b/node/flags/flags.go index 300d5a7adb..b2a4cac593 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -81,7 +81,7 @@ var ( } QuorumIDListFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "quorum-id-list"), - Usage: "Comma separated list of quorum IDs that the node will participate in", + Usage: "Comma separated list of quorum IDs that the node will participate in. There should be at least one quorum ID. This list can contain quorums node is already registered with. If the node opts in to quorums already registered with, it will be a no-op.", Required: true, EnvVar: common.PrefixEnvVar(EnvVarPrefix, "QUORUM_ID_LIST"), } diff --git a/node/mock/churner_client.go b/node/mock/churner_client.go new file mode 100644 index 0000000000..a69d4f4eb7 --- /dev/null +++ b/node/mock/churner_client.go @@ -0,0 +1,30 @@ +package mock + +import ( + "context" + + churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/node" + "github.com/stretchr/testify/mock" +) + +type ChurnerClient struct { + mock.Mock +} + +var _ node.ChurnerClient = (*ChurnerClient)(nil) + +func (c *ChurnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) { + args := c.Called() + var reply *churnerpb.ChurnReply + if args.Get(0) != nil { + reply = (args.Get(0)).(*churnerpb.ChurnReply) + } + + var err error + if args.Get(1) != nil { + err = (args.Get(1)).(error) + } + return reply, err +} diff --git a/node/node.go b/node/node.go index 9da6a326af..87d4581668 100644 --- a/node/node.go +++ b/node/node.go @@ -200,7 +200,8 @@ func (n *Node) Start(ctx context.Context) error { OperatorId: n.Config.ID, QuorumIDs: n.Config.QuorumIDList, } - err = RegisterOperator(ctx, operator, n.Transactor, n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Logger) + churnerClient := NewChurnerClient(n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Config.Timeout, n.Logger) + err = RegisterOperator(ctx, operator, n.Transactor, churnerClient, n.Logger) if err != nil { return fmt.Errorf("failed to register the operator: %w", err) } diff --git a/node/operator.go b/node/operator.go index 23947750c5..ac5b3ccb5b 100644 --- a/node/operator.go +++ b/node/operator.go @@ -3,21 +3,14 @@ package node import ( "context" "crypto/ecdsa" - "crypto/tls" - "errors" "fmt" "math/big" + "slices" "time" - grpcchurner "github.com/Layr-Labs/eigenda/api/grpc/churner" "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/operators/churner" "github.com/Layr-Labs/eigensdk-go/logging" - gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" ) type Operator struct { @@ -31,27 +24,21 @@ type Operator struct { } // RegisterOperator operator registers the operator with the given public key for the given quorum IDs. -func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerUrl string, useSecureGrpc bool, logger logging.Logger) error { - registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, operator.OperatorId) +func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerClient ChurnerClient, logger logging.Logger) error { + quorumsToRegister, err := operator.getQuorumIdsToRegister(ctx, transactor) if err != nil { - return fmt.Errorf("failed to get registered quorum ids for an operator: %w", err) + return fmt.Errorf("failed to get quorum ids to register: %w", err) } - - logger.Debug("Registered quorum ids", "registeredQuorumIds", registeredQuorumIds) - if len(registeredQuorumIds) != 0 { + if len(quorumsToRegister) == 0 { return nil } - logger.Info("Quorums to register for", "quorums", operator.QuorumIDs) - - if len(operator.QuorumIDs) == 0 { - return errors.New("an operator should be in at least one quorum to be useful") - } + logger.Info("Quorums to register for", "quorums", quorumsToRegister) // register for quorums shouldCallChurner := false // check if one of the quorums to register for is full - for _, quorumID := range operator.QuorumIDs { + for _, quorumID := range quorumsToRegister { operatorSetParams, err := transactor.GetOperatorSetParams(ctx, quorumID) if err != nil { return err @@ -75,22 +62,22 @@ func RegisterOperator(ctx context.Context, operator *Operator, transactor core.T privateKeyBytes := []byte(operator.KeyPair.PrivKey.String()) salt := [32]byte{} - copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), operator.QuorumIDs[:], privateKeyBytes)) + copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumsToRegister, privateKeyBytes)) // Get the current block number expiry := big.NewInt((time.Now().Add(10 * time.Minute)).Unix()) // if we should call the churner, call it if shouldCallChurner { - churnReply, err := requestChurnApproval(ctx, operator, churnerUrl, useSecureGrpc, logger) + churnReply, err := churnerClient.Churn(ctx, operator.Address, operator.KeyPair, quorumsToRegister) if err != nil { return fmt.Errorf("failed to request churn approval: %w", err) } - return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry, churnReply) + return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry, churnReply) } else { // other wise just register normally - return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry) + return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry) } } @@ -108,67 +95,23 @@ func UpdateOperatorSocket(ctx context.Context, transactor core.Transactor, socke return transactor.UpdateOperatorSocket(ctx, socket) } -func requestChurnApproval(ctx context.Context, operator *Operator, churnerUrl string, useSecureGrpc bool, logger logging.Logger) (*grpcchurner.ChurnReply, error) { - logger.Info("churner url", "url", churnerUrl) - - credential := insecure.NewCredentials() - if useSecureGrpc { - config := &tls.Config{} - credential = credentials.NewTLS(config) +// getQuorumIdsToRegister returns the quorum ids that the operator is not registered in. +func (c *Operator) getQuorumIdsToRegister(ctx context.Context, transactor core.Transactor) ([]core.QuorumID, error) { + if len(c.QuorumIDs) == 0 { + return nil, fmt.Errorf("an operator should be in at least one quorum to be useful") } - conn, err := grpc.Dial( - churnerUrl, - grpc.WithTransportCredentials(credential), - ) + registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, c.OperatorId) if err != nil { - logger.Error("Node cannot connect to churner", "err", err) - return nil, err + return nil, fmt.Errorf("failed to get registered quorum ids for an operator: %w", err) } - defer conn.Close() - - gc := grpcchurner.NewChurnerClient(conn) - ctx, cancel := context.WithTimeout(ctx, operator.Timeout) - defer cancel() - - request := newChurnRequest(operator.Address, operator.KeyPair, operator.QuorumIDs) - opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300) - - return gc.Churn(ctx, request, opt) -} - -func newChurnRequest(address string, KeyPair *core.KeyPair, QuorumIDs []core.QuorumID) *grpcchurner.ChurnRequest { - // generate salt - privateKeyBytes := []byte(KeyPair.PrivKey.String()) - salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), QuorumIDs[:], privateKeyBytes) - - churnRequest := &churner.ChurnRequest{ - OperatorAddress: gethcommon.HexToAddress(address), - OperatorToRegisterPubkeyG1: KeyPair.PubKey, - OperatorToRegisterPubkeyG2: KeyPair.GetPubKeyG2(), - OperatorRequestSignature: &core.Signature{}, - QuorumIDs: QuorumIDs, - } - - copy(churnRequest.Salt[:], salt) - - // sign the request - churnRequest.OperatorRequestSignature = KeyPair.SignMessage(churner.CalculateRequestHash(churnRequest)) - - // convert to protobuf - churnRequestPb := &grpcchurner.ChurnRequest{ - OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(), - OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(), - OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(), - Salt: salt[:], - OperatorAddress: address, - } - - churnRequestPb.QuorumIds = make([]uint32, len(QuorumIDs)) - for i, quorumID := range QuorumIDs { - churnRequestPb.QuorumIds[i] = uint32(quorumID) + quorumIdsToRegister := make([]core.QuorumID, 0, len(c.QuorumIDs)) + for _, quorumID := range c.QuorumIDs { + if !slices.Contains(registeredQuorumIds, quorumID) { + quorumIdsToRegister = append(quorumIdsToRegister, quorumID) + } } - return churnRequestPb + return quorumIdsToRegister, nil } diff --git a/node/operator_test.go b/node/operator_test.go new file mode 100644 index 0000000000..02e9e1364b --- /dev/null +++ b/node/operator_test.go @@ -0,0 +1,78 @@ +package node_test + +import ( + "context" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" + "github.com/Layr-Labs/eigenda/node" + nodemock "github.com/Layr-Labs/eigenda/node/mock" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestRegisterOperator(t *testing.T) { + logger := logging.NewNoopLogger() + operatorID := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad")) + keyPair, err := core.GenRandomBlsKeys() + assert.NoError(t, err) + // Create a new operator + operator := &node.Operator{ + Address: "0xB7Ad27737D88B07De48CDc2f379917109E993Be4", + Socket: "localhost:50051", + Timeout: 10 * time.Second, + PrivKey: nil, + KeyPair: keyPair, + OperatorId: operatorID, + QuorumIDs: []core.QuorumID{0, 1}, + } + tx := &coremock.MockTransactor{} + tx.On("GetRegisteredQuorumIdsForOperator").Return([]uint8{0}, nil) + tx.On("GetOperatorSetParams", mock.Anything, mock.Anything).Return(&core.OperatorSetParam{ + MaxOperatorCount: 1, + ChurnBIPsOfOperatorStake: 20, + ChurnBIPsOfTotalStake: 20000, + }, nil) + tx.On("GetNumberOfRegisteredOperatorForQuorum").Return(uint32(0), nil) + tx.On("RegisterOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + churnerClient := &nodemock.ChurnerClient{} + churnerClient.On("Churn").Return(nil, nil) + err = node.RegisterOperator(context.Background(), operator, tx, churnerClient, logger) + assert.NoError(t, err) + tx.AssertCalled(t, "RegisterOperator", mock.Anything, mock.Anything, mock.Anything, []core.QuorumID{1}, mock.Anything, mock.Anything, mock.Anything) +} + +func TestRegisterOperatorWithChurn(t *testing.T) { + logger := logging.NewNoopLogger() + operatorID := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad")) + keyPair, err := core.GenRandomBlsKeys() + assert.NoError(t, err) + // Create a new operator + operator := &node.Operator{ + Address: "0xB7Ad27737D88B07De48CDc2f379917109E993Be4", + Socket: "localhost:50051", + Timeout: 10 * time.Second, + PrivKey: nil, + KeyPair: keyPair, + OperatorId: operatorID, + QuorumIDs: []core.QuorumID{0, 1}, + } + tx := &coremock.MockTransactor{} + tx.On("GetRegisteredQuorumIdsForOperator").Return([]uint8{0}, nil) + tx.On("GetOperatorSetParams", mock.Anything, mock.Anything).Return(&core.OperatorSetParam{ + MaxOperatorCount: 1, + ChurnBIPsOfOperatorStake: 20, + ChurnBIPsOfTotalStake: 20000, + }, nil) + tx.On("GetNumberOfRegisteredOperatorForQuorum").Return(uint32(1), nil) + tx.On("RegisterOperatorWithChurn", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + churnerClient := &nodemock.ChurnerClient{} + churnerClient.On("Churn").Return(nil, nil) + err = node.RegisterOperator(context.Background(), operator, tx, churnerClient, logger) + assert.NoError(t, err) + tx.AssertCalled(t, "RegisterOperatorWithChurn", mock.Anything, mock.Anything, mock.Anything, []core.QuorumID{1}, mock.Anything, mock.Anything, mock.Anything, mock.Anything) +} diff --git a/node/plugin/cmd/main.go b/node/plugin/cmd/main.go index 6476318946..dd9cf86f61 100644 --- a/node/plugin/cmd/main.go +++ b/node/plugin/cmd/main.go @@ -125,9 +125,10 @@ func pluginOps(ctx *cli.Context) { OperatorId: keyPair.GetPubKeyG1().GetOperatorID(), QuorumIDs: config.QuorumIDList, } + churnerClient := node.NewChurnerClient(config.ChurnerUrl, true, operator.Timeout, logger) if config.Operation == "opt-in" { log.Printf("Info: Operator with Operator Address: %x is opting in to EigenDA", sk.Address) - err = node.RegisterOperator(context.Background(), operator, tx, config.ChurnerUrl, true, logger) + err = node.RegisterOperator(context.Background(), operator, tx, churnerClient, logger) if err != nil { log.Printf("Error: failed to opt-in EigenDA Node Network for operator ID: %x, operator address: %x, error: %v", operatorID, sk.Address, err) return diff --git a/node/plugin/config.go b/node/plugin/config.go index 1800ff4e63..dc17b8c051 100644 --- a/node/plugin/config.go +++ b/node/plugin/config.go @@ -66,7 +66,7 @@ var ( } QuorumIDListFlag = cli.StringFlag{ Name: "quorum-id-list", - Usage: "Comma separated list of quorum IDs that the node will participate in", + Usage: "Comma separated list of quorum IDs that the node will participate in. There should be at least one quorum ID. This list can contain quorums node is already registered with. If the node opts in to quorums already registered with, it will be a no-op.", Required: true, EnvVar: common.PrefixEnvVar(flags.EnvVarPrefix, "QUORUM_ID_LIST"), } @@ -131,6 +131,9 @@ func NewConfig(ctx *cli.Context) (*Config, error) { } ids = append(ids, core.QuorumID(val)) } + if len(ids) == 0 { + return nil, errors.New("no quorum ids provided") + } op := ctx.GlobalString(OperationFlag.Name) if op != "opt-in" && op != "opt-out" {