Skip to content

Commit

Permalink
Ejector (#493)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Apr 26, 2024
1 parent e38ffb9 commit 8f24e8a
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 2 deletions.
39 changes: 39 additions & 0 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
blsapkreg "github.com/Layr-Labs/eigenda/contracts/bindings/BLSApkRegistry"
delegationmgr "github.com/Layr-Labs/eigenda/contracts/bindings/DelegationManager"
eigendasrvmg "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager"
ejectionmg "github.com/Layr-Labs/eigenda/contracts/bindings/EjectionManager"
indexreg "github.com/Layr-Labs/eigenda/contracts/bindings/IIndexRegistry"
opstateretriever "github.com/Layr-Labs/eigenda/contracts/bindings/OperatorStateRetriever"
regcoordinator "github.com/Layr-Labs/eigenda/contracts/bindings/RegistryCoordinator"
Expand Down Expand Up @@ -54,6 +55,7 @@ type ContractBindings struct {
RegistryCoordinator *regcoordinator.ContractRegistryCoordinator
StakeRegistry *stakereg.ContractStakeRegistry
EigenDAServiceManager *eigendasrvmg.ContractEigenDAServiceManager
EjectionManager *ejectionmg.ContractEjectionManager
AVSDirectory *avsdir.ContractAVSDirectory
}

Expand Down Expand Up @@ -362,6 +364,31 @@ func (t *Transactor) UpdateOperatorSocket(ctx context.Context, socket string) er
return nil
}

func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) {
byteIdsByQuorum := make([][][32]byte, len(operatorsByQuorum))
for i, ids := range operatorsByQuorum {
for _, id := range ids {
byteIdsByQuorum[i] = append(byteIdsByQuorum[i], [32]byte(id))
}
}
opts, err := t.EthClient.GetNoSendTransactOpts()
if err != nil {
t.Logger.Error("Failed to generate transact opts", "err", err)
return nil, err
}
txn, err := t.Bindings.EjectionManager.EjectOperators(opts, byteIdsByQuorum)
if err != nil {
t.Logger.Error("Failed to create transaction", "err", err)
return nil, err
}
receipt, err := t.EthClient.EstimateGasPriceAndLimitAndSendTx(context.Background(), txn, "EjectOperators", nil)
if err != nil {
t.Logger.Error("Failed to eject operators", "err", err)
return nil, err
}
return receipt, nil
}

// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
// is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum
// are also returned.
Expand Down Expand Up @@ -808,6 +835,17 @@ func (t *Transactor) updateContractBindings(blsOperatorStateRetrieverAddr, eigen
return err
}

contractEjectionManagerAddr, err := contractIRegistryCoordinator.Ejector(&bind.CallOpts{})
if err != nil {
t.Logger.Error("Failed to fetch EjectionManager address", "err", err)
return err
}
contractEjectionManager, err := ejectionmg.NewContractEjectionManager(contractEjectionManagerAddr, t.EthClient)
if err != nil {
t.Logger.Error("Failed to fetch EjectionManager contract", "err", err)
return err
}

contractBLSOpStateRetr, err := opstateretriever.NewContractOperatorStateRetriever(blsOperatorStateRetrieverAddr, t.EthClient)
if err != nil {
t.Logger.Error("Failed to fetch BLSOperatorStateRetriever contract", "err", err)
Expand Down Expand Up @@ -860,6 +898,7 @@ func (t *Transactor) updateContractBindings(blsOperatorStateRetrieverAddr, eigen
BLSApkRegistry: contractBLSPubkeyReg,
IndexRegistry: contractIIndexReg,
RegistryCoordinator: contractIRegistryCoordinator,
EjectionManager: contractEjectionManager,
StakeRegistry: contractStakeRegistry,
EigenDAServiceManager: contractEigenDAServiceManager,
DelegationManager: contractDelegationManager,
Expand Down
6 changes: 6 additions & 0 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ func (t *MockTransactor) UpdateOperatorSocket(ctx context.Context, socket string
return args.Error(0)
}

func (t *MockTransactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) {
args := t.Called()
result := args.Get(0)
return result.(*types.Receipt), args.Error(1)
}

func (t *MockTransactor) GetOperatorStakes(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (core.OperatorStakes, []core.QuorumID, error) {
args := t.Called()
result0 := args.Get(0)
Expand Down
5 changes: 5 additions & 0 deletions core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type Transactor interface {
// UpdateOperatorSocket updates the socket of the operator in all the quorums that it is registered with.
UpdateOperatorSocket(ctx context.Context, socket string) error

// EjectOperators ejects operators from AVS registryCoordinator.
// The operatorsByQuorum provides a list of operators for each quorum. Within a quorum,
// the operators are ordered; in case of rate limiting, the first operators will be ejected.
EjectOperators(ctx context.Context, operatorsByQuorum [][]OperatorID) (*types.Receipt, error)

// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
// is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum
// are also returned.
Expand Down
10 changes: 9 additions & 1 deletion disperser/cmd/dataapi/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"errors"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
Expand All @@ -25,6 +27,7 @@ type Config struct {
SubgraphApiOperatorStateAddr string
ServerMode string
AllowOrigins []string
EjectionToken string

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
Expand All @@ -39,6 +42,10 @@ func NewConfig(ctx *cli.Context) (Config, error) {
if err != nil {
return Config{}, err
}
ejectionToken := ctx.GlobalString(flags.EjectionTokenFlag.Name)
if len(ejectionToken) < 20 {
return Config{}, errors.New("the ejection token length must be at least 20")
}
config := Config{
BlobstoreConfig: blobstore.Config{
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
Expand All @@ -59,7 +66,8 @@ func NewConfig(ctx *cli.Context) (Config, error) {
Secret: ctx.GlobalString(flags.PrometheusServerSecretFlag.Name),
Cluster: ctx.GlobalString(flags.PrometheusMetricsClusterLabelFlag.Name),
},
AllowOrigins: ctx.GlobalStringSlice(flags.AllowOriginsFlag.Name),
AllowOrigins: ctx.GlobalStringSlice(flags.AllowOriginsFlag.Name),
EjectionToken: ejectionToken,
MetricsConfig: dataapi.MetricsConfig{
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
Expand Down
7 changes: 7 additions & 0 deletions disperser/cmd/dataapi/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "ALLOW_ORIGINS"),
Required: true,
}
EjectionTokenFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "ejection-token"),
Usage: "The token used for authorizing the ejection requests",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "EJECTION_TOKEN"),
}
EnableMetricsFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-metrics"),
Usage: "start metrics server",
Expand Down Expand Up @@ -146,6 +152,7 @@ var requiredFlags = []cli.Flag{
PrometheusServerSecretFlag,
PrometheusMetricsClusterLabelFlag,
AllowOriginsFlag,
EjectionTokenFlag,
EnableMetricsFlag,
DisperserHostnameFlag,
ChurnerHostnameFlag,
Expand Down
1 change: 1 addition & 0 deletions disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func RunDataApi(ctx *cli.Context) error {
ServerMode: config.ServerMode,
SocketAddr: config.SocketAddr,
AllowOrigins: config.AllowOrigins,
EjectionToken: config.EjectionToken,
DisperserHostname: config.DisperserHostname,
ChurnerHostname: config.ChurnerHostname,
BatcherHealthEndpt: config.BatcherHealthEndpt,
Expand Down
1 change: 1 addition & 0 deletions disperser/dataapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ type Config struct {
DisperserHostname string
ChurnerHostname string
BatcherHealthEndpt string
EjectionToken string
}
127 changes: 127 additions & 0 deletions disperser/dataapi/ejector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package dataapi

import (
"context"
"sort"
"sync"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigensdk-go/logging"
)

// stakeShareToSLA returns the SLA for a given stake share in a quorum.
// The caller should ensure "stakeShare" is in range (0, 1].
func stakeShareToSLA(stakeShare float64) float64 {
switch {
case stakeShare > 0.15:
return 0.995
case stakeShare > 0.1:
return 0.98
case stakeShare > 0.05:
return 0.95
default:
return 0.9
}
}

// operatorPerfScore scores an operator based on its stake share and nonsigning rate. The
// performance score will be in range [0, 1], with higher score indicating better performance.
func operatorPerfScore(stakeShare float64, nonsigningRate float64) float64 {
if nonsigningRate == 0 {
return 1.0
}
sla := stakeShareToSLA(stakeShare / 100.0)
perf := (1 - sla) / nonsigningRate
return perf / (1.0 + perf)
}

func computePerfScore(metric *OperatorNonsigningPercentageMetrics) float64 {
return operatorPerfScore(metric.StakePercentage, metric.Percentage)
}

type ejector struct {
logger logging.Logger
transactor core.Transactor
metrics *Metrics

// For serializing the ejection requests.
mu sync.Mutex
}

func newEjector(logger logging.Logger, tx core.Transactor, metrics *Metrics) *ejector {
return &ejector{
logger: logger.With("component", "Ejector"),
transactor: tx,
metrics: metrics,
}
}

func (e *ejector) eject(ctx context.Context, nonsigningRate *OperatorsNonsigningPercentage) error {
e.mu.Lock()
defer e.mu.Unlock()

nonsigners := make([]*OperatorNonsigningPercentageMetrics, 0)
for _, metric := range nonsigningRate.Data {
// Collect only the nonsigners who violate the SLA.
if metric.Percentage/100.0 > 1-stakeShareToSLA(metric.StakePercentage/100.0) {
nonsigners = append(nonsigners, metric)
}
}

// Rank the operators for each quorum by the operator performance score.
// The operators with lower perf score will get ejected with priority in case of
// rate limiting.
sort.Slice(nonsigners, func(i, j int) bool {
if nonsigners[i].QuorumId == nonsigners[j].QuorumId {
if computePerfScore(nonsigners[i]) == computePerfScore(nonsigners[j]) {
return float64(nonsigners[i].TotalUnsignedBatches)*nonsigners[i].StakePercentage > float64(nonsigners[j].TotalUnsignedBatches)*nonsigners[j].StakePercentage
}
return computePerfScore(nonsigners[i]) < computePerfScore(nonsigners[j])
}
return nonsigners[i].QuorumId < nonsigners[j].QuorumId
})

operatorsByQuorum, err := e.convertOperators(nonsigners)
if err != nil {
return err
}

receipt, err := e.transactor.EjectOperators(ctx, operatorsByQuorum)
if err != nil {
e.logger.Error("Ejection transaction failed", "err", err)
return err
}
e.logger.Info("Ejection transaction succeeded", "receipt", receipt)

e.metrics.UpdateEjectionGasUsed(receipt.GasUsed)

// TODO: get the txn response and update the metrics.

return nil
}

func (e *ejector) convertOperators(nonsigners []*OperatorNonsigningPercentageMetrics) ([][]core.OperatorID, error) {
var maxQuorumId uint8
for _, metric := range nonsigners {
if metric.QuorumId > maxQuorumId {
maxQuorumId = metric.QuorumId
}
}

numOperatorByQuorum := make(map[uint8]int)
stakeShareByQuorum := make(map[uint8]float64)

result := make([][]core.OperatorID, maxQuorumId+1)
for _, metric := range nonsigners {
id, err := core.OperatorIDFromHex(metric.OperatorId)
if err != nil {
return nil, err
}
result[metric.QuorumId] = append(result[metric.QuorumId], id)
numOperatorByQuorum[metric.QuorumId]++
stakeShareByQuorum[metric.QuorumId] += metric.StakePercentage
}
e.metrics.UpdateRequestedOperatorMetric(numOperatorByQuorum, stakeShareByQuorum)

return result, nil
}
Loading

0 comments on commit 8f24e8a

Please sign in to comment.