Skip to content

Commit

Permalink
operator register quorums
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Mar 21, 2024
1 parent 43b5464 commit 275874d
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 86 deletions.
4 changes: 2 additions & 2 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (t *MockTransactor) RegisterOperator(
operatorToAvsRegistrationSigSalt [32]byte,
operatorToAvsRegistrationSigExpiry *big.Int,
) error {
args := t.Called()
args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry)
return args.Error(0)
}

Expand All @@ -56,7 +56,7 @@ func (t *MockTransactor) RegisterOperatorWithChurn(
operatorToAvsRegistrationSigSalt [32]byte,
operatorToAvsRegistrationSigExpiry *big.Int,
churnReply *churner.ChurnReply) error {
args := t.Called()
args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry, churnReply)
return args.Error(0)
}

Expand Down
100 changes: 100 additions & 0 deletions node/churner_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package node

import (
"context"
"crypto/tls"
"errors"
"time"

churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/operators/churner"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type ChurnerClient interface {
// Churn sends a churn request to the churner service
// The quorumIDs cannot be empty, but may contain quorums that the operator is already registered in.
// If the operator is already registered in a quorum, the churner will ignore it and continue with the other quorums.
Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error)
}

type churnerClient struct {
churnerURL string
useSecureGrpc bool
timeout time.Duration
logger logging.Logger
}

func NewChurnerClient(churnerURL string, useSecureGrpc bool, timeout time.Duration, logger logging.Logger) ChurnerClient {
return &churnerClient{
churnerURL: churnerURL,
useSecureGrpc: useSecureGrpc,
timeout: timeout,
logger: logger,
}
}

func (c *churnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) {
if len(quorumIDs) == 0 {
return nil, errors.New("quorumIDs cannot be empty")
}
// generate salt
privateKeyBytes := []byte(keyPair.PrivKey.String())
salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumIDs[:], privateKeyBytes)

churnRequest := &churner.ChurnRequest{
OperatorAddress: gethcommon.HexToAddress(operatorAddress),
OperatorToRegisterPubkeyG1: keyPair.PubKey,
OperatorToRegisterPubkeyG2: keyPair.GetPubKeyG2(),
OperatorRequestSignature: &core.Signature{},
QuorumIDs: quorumIDs,
}

copy(churnRequest.Salt[:], salt)

// sign the request
churnRequest.OperatorRequestSignature = keyPair.SignMessage(churner.CalculateRequestHash(churnRequest))

// convert to protobuf
churnRequestPb := &churnerpb.ChurnRequest{
OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(),
OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(),
OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(),
Salt: salt[:],
OperatorAddress: operatorAddress,
}

churnRequestPb.QuorumIds = make([]uint32, len(quorumIDs))
for i, quorumID := range quorumIDs {
churnRequestPb.QuorumIds[i] = uint32(quorumID)
}
credential := insecure.NewCredentials()
if c.useSecureGrpc {
config := &tls.Config{}
credential = credentials.NewTLS(config)
}

conn, err := grpc.Dial(
c.churnerURL,
grpc.WithTransportCredentials(credential),
)
if err != nil {
c.logger.Error("Node cannot connect to churner", "err", err)
return nil, err
}
defer conn.Close()

gc := churnerpb.NewChurnerClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)

return gc.Churn(ctx, churnRequestPb, opt)
}
3 changes: 3 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
}
ids = append(ids, core.QuorumID(val))
}
if len(ids) == 0 {
return nil, errors.New("no quorum ids provided")
}

expirationPollIntervalSec := ctx.GlobalUint64(flags.ExpirationPollIntervalSecFlag.Name)
if expirationPollIntervalSec <= minExpirationPollIntervalSec {
Expand Down
2 changes: 1 addition & 1 deletion node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var (
}
QuorumIDListFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "quorum-id-list"),
Usage: "Comma separated list of quorum IDs that the node will participate in",
Usage: "Comma separated list of quorum IDs that the node will participate in. There should be at least one quorum ID. This list can contain quorums node is already registered with. If the node opts in to quorums already registered with, it will be a no-op.",
Required: true,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "QUORUM_ID_LIST"),
}
Expand Down
30 changes: 30 additions & 0 deletions node/mock/churner_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package mock

import (
"context"

churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/node"
"github.com/stretchr/testify/mock"
)

type ChurnerClient struct {
mock.Mock
}

var _ node.ChurnerClient = (*ChurnerClient)(nil)

func (c *ChurnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) {
args := c.Called()
var reply *churnerpb.ChurnReply
if args.Get(0) != nil {
reply = (args.Get(0)).(*churnerpb.ChurnReply)
}

var err error
if args.Get(1) != nil {
err = (args.Get(1)).(error)
}
return reply, err
}
3 changes: 2 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ func (n *Node) Start(ctx context.Context) error {
OperatorId: n.Config.ID,
QuorumIDs: n.Config.QuorumIDList,
}
err = RegisterOperator(ctx, operator, n.Transactor, n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Logger)
churnerClient := NewChurnerClient(n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Config.Timeout, n.Logger)
err = RegisterOperator(ctx, operator, n.Transactor, churnerClient, n.Logger)
if err != nil {
return fmt.Errorf("failed to register the operator: %w", err)
}
Expand Down
103 changes: 23 additions & 80 deletions node/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@ package node
import (
"context"
"crypto/ecdsa"
"crypto/tls"
"errors"
"fmt"
"math/big"
"slices"
"time"

grpcchurner "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/operators/churner"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type Operator struct {
Expand All @@ -31,27 +24,21 @@ type Operator struct {
}

// RegisterOperator operator registers the operator with the given public key for the given quorum IDs.
func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerUrl string, useSecureGrpc bool, logger logging.Logger) error {
registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, operator.OperatorId)
func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerClient ChurnerClient, logger logging.Logger) error {
quorumsToRegister, err := operator.getQuorumIdsToRegister(ctx, transactor)
if err != nil {
return fmt.Errorf("failed to get registered quorum ids for an operator: %w", err)
return fmt.Errorf("failed to get quorum ids to register: %w", err)
}

logger.Debug("Registered quorum ids", "registeredQuorumIds", registeredQuorumIds)
if len(registeredQuorumIds) != 0 {
if len(quorumsToRegister) == 0 {
return nil
}

logger.Info("Quorums to register for", "quorums", operator.QuorumIDs)

if len(operator.QuorumIDs) == 0 {
return errors.New("an operator should be in at least one quorum to be useful")
}
logger.Info("Quorums to register for", "quorums", quorumsToRegister)

// register for quorums
shouldCallChurner := false
// check if one of the quorums to register for is full
for _, quorumID := range operator.QuorumIDs {
for _, quorumID := range quorumsToRegister {
operatorSetParams, err := transactor.GetOperatorSetParams(ctx, quorumID)
if err != nil {
return err
Expand All @@ -75,22 +62,22 @@ func RegisterOperator(ctx context.Context, operator *Operator, transactor core.T

privateKeyBytes := []byte(operator.KeyPair.PrivKey.String())
salt := [32]byte{}
copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), operator.QuorumIDs[:], privateKeyBytes))
copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumsToRegister, privateKeyBytes))

// Get the current block number
expiry := big.NewInt((time.Now().Add(10 * time.Minute)).Unix())

// if we should call the churner, call it
if shouldCallChurner {
churnReply, err := requestChurnApproval(ctx, operator, churnerUrl, useSecureGrpc, logger)
churnReply, err := churnerClient.Churn(ctx, operator.Address, operator.KeyPair, quorumsToRegister)
if err != nil {
return fmt.Errorf("failed to request churn approval: %w", err)
}

return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry, churnReply)
return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry, churnReply)
} else {
// other wise just register normally
return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry)
return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry)
}
}

Expand All @@ -108,67 +95,23 @@ func UpdateOperatorSocket(ctx context.Context, transactor core.Transactor, socke
return transactor.UpdateOperatorSocket(ctx, socket)
}

func requestChurnApproval(ctx context.Context, operator *Operator, churnerUrl string, useSecureGrpc bool, logger logging.Logger) (*grpcchurner.ChurnReply, error) {
logger.Info("churner url", "url", churnerUrl)

credential := insecure.NewCredentials()
if useSecureGrpc {
config := &tls.Config{}
credential = credentials.NewTLS(config)
// getQuorumIdsToRegister returns the quorum ids that the operator is not registered in.
func (c *Operator) getQuorumIdsToRegister(ctx context.Context, transactor core.Transactor) ([]core.QuorumID, error) {
if len(c.QuorumIDs) == 0 {
return nil, fmt.Errorf("an operator should be in at least one quorum to be useful")
}

conn, err := grpc.Dial(
churnerUrl,
grpc.WithTransportCredentials(credential),
)
registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, c.OperatorId)
if err != nil {
logger.Error("Node cannot connect to churner", "err", err)
return nil, err
return nil, fmt.Errorf("failed to get registered quorum ids for an operator: %w", err)
}
defer conn.Close()

gc := grpcchurner.NewChurnerClient(conn)
ctx, cancel := context.WithTimeout(ctx, operator.Timeout)
defer cancel()

request := newChurnRequest(operator.Address, operator.KeyPair, operator.QuorumIDs)
opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)

return gc.Churn(ctx, request, opt)
}

func newChurnRequest(address string, KeyPair *core.KeyPair, QuorumIDs []core.QuorumID) *grpcchurner.ChurnRequest {

// generate salt
privateKeyBytes := []byte(KeyPair.PrivKey.String())
salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), QuorumIDs[:], privateKeyBytes)

churnRequest := &churner.ChurnRequest{
OperatorAddress: gethcommon.HexToAddress(address),
OperatorToRegisterPubkeyG1: KeyPair.PubKey,
OperatorToRegisterPubkeyG2: KeyPair.GetPubKeyG2(),
OperatorRequestSignature: &core.Signature{},
QuorumIDs: QuorumIDs,
}

copy(churnRequest.Salt[:], salt)

// sign the request
churnRequest.OperatorRequestSignature = KeyPair.SignMessage(churner.CalculateRequestHash(churnRequest))

// convert to protobuf
churnRequestPb := &grpcchurner.ChurnRequest{
OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(),
OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(),
OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(),
Salt: salt[:],
OperatorAddress: address,
}

churnRequestPb.QuorumIds = make([]uint32, len(QuorumIDs))
for i, quorumID := range QuorumIDs {
churnRequestPb.QuorumIds[i] = uint32(quorumID)
quorumIdsToRegister := make([]core.QuorumID, 0, len(c.QuorumIDs))
for _, quorumID := range c.QuorumIDs {
if !slices.Contains(registeredQuorumIds, quorumID) {
quorumIdsToRegister = append(quorumIdsToRegister, quorumID)
}
}

return churnRequestPb
return quorumIdsToRegister, nil
}
Loading

0 comments on commit 275874d

Please sign in to comment.