Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ejector #493

Merged
merged 13 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
blsapkreg "github.com/Layr-Labs/eigenda/contracts/bindings/BLSApkRegistry"
delegationmgr "github.com/Layr-Labs/eigenda/contracts/bindings/DelegationManager"
eigendasrvmg "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager"
ejectionmg "github.com/Layr-Labs/eigenda/contracts/bindings/EjectionManager"
indexreg "github.com/Layr-Labs/eigenda/contracts/bindings/IIndexRegistry"
opstateretriever "github.com/Layr-Labs/eigenda/contracts/bindings/OperatorStateRetriever"
regcoordinator "github.com/Layr-Labs/eigenda/contracts/bindings/RegistryCoordinator"
Expand Down Expand Up @@ -54,6 +55,7 @@ type ContractBindings struct {
RegistryCoordinator *regcoordinator.ContractRegistryCoordinator
StakeRegistry *stakereg.ContractStakeRegistry
EigenDAServiceManager *eigendasrvmg.ContractEigenDAServiceManager
EjectionManager *ejectionmg.ContractEjectionManager
AVSDirectory *avsdir.ContractAVSDirectory
}

Expand Down Expand Up @@ -362,6 +364,31 @@ func (t *Transactor) UpdateOperatorSocket(ctx context.Context, socket string) er
return nil
}

func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) {
byteIdsByQuorum := make([][][32]byte, len(operatorsByQuorum))
for i, ids := range operatorsByQuorum {
for _, id := range ids {
byteIdsByQuorum[i] = append(byteIdsByQuorum[i], [32]byte(id))
}
}
opts, err := t.EthClient.GetNoSendTransactOpts()
if err != nil {
t.Logger.Error("Failed to generate transact opts", "err", err)
return nil, err
}
txn, err := t.Bindings.EjectionManager.EjectOperators(opts, byteIdsByQuorum)
if err != nil {
t.Logger.Error("Failed to create transaction", "err", err)
return nil, err
}
receipt, err := t.EthClient.EstimateGasPriceAndLimitAndSendTx(context.Background(), txn, "EjectOperators", nil)
if err != nil {
t.Logger.Error("Failed to eject operators", "err", err)
return nil, err
}
return receipt, nil
}

// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
// is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum
// are also returned.
Expand Down Expand Up @@ -802,6 +829,17 @@ func (t *Transactor) updateContractBindings(blsOperatorStateRetrieverAddr, eigen
return err
}

contractEjectionManagerAddr, err := contractIRegistryCoordinator.Ejector(&bind.CallOpts{})
if err != nil {
t.Logger.Error("Failed to fetch EjectionManager address", "err", err)
return err
}
contractEjectionManager, err := ejectionmg.NewContractEjectionManager(contractEjectionManagerAddr, t.EthClient)
if err != nil {
t.Logger.Error("Failed to fetch EjectionManager contract", "err", err)
return err
}

contractBLSOpStateRetr, err := opstateretriever.NewContractOperatorStateRetriever(blsOperatorStateRetrieverAddr, t.EthClient)
if err != nil {
t.Logger.Error("Failed to fetch BLSOperatorStateRetriever contract", "err", err)
Expand Down Expand Up @@ -854,6 +892,7 @@ func (t *Transactor) updateContractBindings(blsOperatorStateRetrieverAddr, eigen
BLSApkRegistry: contractBLSPubkeyReg,
IndexRegistry: contractIIndexReg,
RegistryCoordinator: contractIRegistryCoordinator,
EjectionManager: contractEjectionManager,
StakeRegistry: contractStakeRegistry,
EigenDAServiceManager: contractEigenDAServiceManager,
DelegationManager: contractDelegationManager,
Expand Down
6 changes: 6 additions & 0 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ func (t *MockTransactor) UpdateOperatorSocket(ctx context.Context, socket string
return args.Error(0)
}

func (t *MockTransactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) {
args := t.Called()
result := args.Get(0)
return result.(*types.Receipt), args.Error(0)
}

func (t *MockTransactor) GetOperatorStakes(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (core.OperatorStakes, []core.QuorumID, error) {
args := t.Called()
result0 := args.Get(0)
Expand Down
5 changes: 5 additions & 0 deletions core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type Transactor interface {
// UpdateOperatorSocket updates the socket of the operator in all the quorums that it is registered with.
UpdateOperatorSocket(ctx context.Context, socket string) error

// EjectOperators ejects operators from AVS registryCoordinator.
// The operatorsByQuorum provides a list of operators for each quorum. Within a quorum,
// the operators are ordered; in case of rate limiting, the first operators will be ejected.
EjectOperators(ctx context.Context, operatorsByQuorum [][]OperatorID) (*types.Receipt, error)

// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
// is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum
// are also returned.
Expand Down
10 changes: 9 additions & 1 deletion disperser/cmd/dataapi/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"errors"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/geth"
Expand All @@ -25,6 +27,7 @@ type Config struct {
SubgraphApiOperatorStateAddr string
ServerMode string
AllowOrigins []string
EjectionToken string

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
Expand All @@ -39,6 +42,10 @@ func NewConfig(ctx *cli.Context) (Config, error) {
if err != nil {
return Config{}, err
}
ejectionToken := ctx.GlobalString(flags.EjectionTokenFlag.Name)
if len(ejectionToken) < 20 {
return Config{}, errors.New("the ejection token length must be at least 20")
}
config := Config{
BlobstoreConfig: blobstore.Config{
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
Expand All @@ -59,7 +66,8 @@ func NewConfig(ctx *cli.Context) (Config, error) {
Secret: ctx.GlobalString(flags.PrometheusServerSecretFlag.Name),
Cluster: ctx.GlobalString(flags.PrometheusMetricsClusterLabelFlag.Name),
},
AllowOrigins: ctx.GlobalStringSlice(flags.AllowOriginsFlag.Name),
AllowOrigins: ctx.GlobalStringSlice(flags.AllowOriginsFlag.Name),
EjectionToken: ejectionToken,
MetricsConfig: dataapi.MetricsConfig{
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
Expand Down
7 changes: 7 additions & 0 deletions disperser/cmd/dataapi/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "ALLOW_ORIGINS"),
Required: true,
}
EjectionTokenFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "ejection-token"),
Usage: "The token used for authorizing the ejection requests",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "EJECTION_TOKEN"),
}
EnableMetricsFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-metrics"),
Usage: "start metrics server",
Expand Down Expand Up @@ -146,6 +152,7 @@ var requiredFlags = []cli.Flag{
PrometheusServerSecretFlag,
PrometheusMetricsClusterLabelFlag,
AllowOriginsFlag,
EjectionTokenFlag,
EnableMetricsFlag,
DisperserHostnameFlag,
ChurnerHostnameFlag,
Expand Down
1 change: 1 addition & 0 deletions disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func RunDataApi(ctx *cli.Context) error {
ServerMode: config.ServerMode,
SocketAddr: config.SocketAddr,
AllowOrigins: config.AllowOrigins,
EjectionToken: config.EjectionToken,
DisperserHostname: config.DisperserHostname,
ChurnerHostname: config.ChurnerHostname,
BatcherHealthEndpt: config.BatcherHealthEndpt,
Expand Down
1 change: 1 addition & 0 deletions disperser/dataapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ type Config struct {
DisperserHostname string
ChurnerHostname string
BatcherHealthEndpt string
EjectionToken string
}
124 changes: 124 additions & 0 deletions disperser/dataapi/ejector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package dataapi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test for ejector


import (
"context"
"sort"
"sync"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigensdk-go/logging"
)

// The caller should ensure "stakeShare" is in range (0, 1].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

stakeShareToSLA converts stake share to the enforced SLA metric. 
`stakeShare` must be in range (0, 1].

so that it gets registered by godoc

func stakeShareToSLA(stakeShare float64) float64 {
switch {
case stakeShare > 0.1:
return 0.975
case stakeShare > 0.05:
return 0.95
default:
return 0.9
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to new SLA

}
}

// operatorPerfScore scores an operator based on its stake share and nonsigning rate. The
// performance score will be in range [0, 1], with higher score indicating better performance.
func operatorPerfScore(stakeShare float64, nonsigningRate float64) float64 {
if nonsigningRate == 0 {
return 1.0
}
sla := stakeShareToSLA(stakeShare)
perf := (1 - sla) / nonsigningRate
return perf / (1.0 + perf)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit uncertain about whether this scoring will give us the right ordering across SLA classes. Interested to know if there was a rationale for this particular scoring. I'll put some further thought into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The breakdown will be:

  • 1 - sla is the error budget,
  • nonsingingRate / (1 - sla) is how badly it has burnt the error budget
  • the inverse is indicating perf
  • then normalize it

}

func computePerfScore(metric *OperatorNonsigningPercentageMetrics) float64 {
return operatorPerfScore(metric.StakePercentage, metric.Percentage/100.0)
}

type ejector struct {
logger logging.Logger
transactor core.Transactor
metrics *Metrics

// For serializing the ejection requests.
mu sync.Mutex
}

func newEjector(logger logging.Logger, tx core.Transactor, metrics *Metrics) *ejector {
return &ejector{
logger: logger.With("component", "Ejector"),
transactor: tx,
metrics: metrics,
}
}

func (e *ejector) eject(ctx context.Context, nonsigningRate *OperatorsNonsigningPercentage, mode string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mode isn't used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'll be used after fixing the TODO to update metrics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this isn't needed even after fixing TODO -- removed it!

e.mu.Lock()
defer e.mu.Unlock()

nonsigners := make([]*OperatorNonsigningPercentageMetrics, 0)
for _, metric := range nonsigningRate.Data {
// Collect only the nonsigners who violate the SLA.
if metric.Percentage/100.0 > 1-stakeShareToSLA(metric.StakePercentage) {
nonsigners = append(nonsigners, metric)
}
}

// Rank the operators for each quorum by the operator performance score.
// The operators with lower perf score will get ejected with priority in case of
// rate limiting.
sort.Slice(nonsigners, func(i, j int) bool {
if nonsigners[i].QuorumId == nonsigners[j].QuorumId {
if computePerfScore(nonsigners[i]) == computePerfScore(nonsigners[j]) {
return float64(nonsigners[i].TotalUnsignedBatches)*nonsigners[i].StakePercentage > float64(nonsigners[j].TotalUnsignedBatches)*nonsigners[j].StakePercentage
}
return computePerfScore(nonsigners[i]) < computePerfScore(nonsigners[j])
}
return nonsigners[i].QuorumId < nonsigners[j].QuorumId
})

operatorsByQuorum, err := e.convertOperators(nonsigners)
if err != nil {
return err
}

receipt, err := e.transactor.EjectOperators(ctx, operatorsByQuorum)
if err != nil {
e.logger.Error("Ejection transaction failed", "err", err)
return err
}
e.logger.Info("Ejection transaction succeeded", "receipt", receipt)

e.metrics.UpdateEjectionGasUsed(receipt.GasUsed)

// TODO: get the txn response and update the metrics.

return nil
}

func (e *ejector) convertOperators(nonsigners []*OperatorNonsigningPercentageMetrics) ([][]core.OperatorID, error) {
var maxQuorumId uint8
for _, metric := range nonsigners {
if metric.QuorumId > maxQuorumId {
maxQuorumId = metric.QuorumId
}
}

numOperatorByQuorum := make(map[uint8]int)
stakeShareByQuorum := make(map[uint8]float64)

result := make([][]core.OperatorID, maxQuorumId+1)
for _, metric := range nonsigners {
id, err := core.OperatorIDFromHex(metric.OperatorId)
if err != nil {
return nil, err
}
result[metric.QuorumId] = append(result[metric.QuorumId], id)
numOperatorByQuorum[metric.QuorumId]++
stakeShareByQuorum[metric.QuorumId] += metric.StakePercentage
}
e.metrics.UpdateRequestedOperatorMetric(numOperatorByQuorum, stakeShareByQuorum)

return result, nil
}
68 changes: 68 additions & 0 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
)

type MetricsConfig struct {
Expand All @@ -25,6 +26,10 @@ type Metrics struct {
NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec

EjectionRequests *prometheus.CounterVec
Operators *prometheus.CounterVec
EjectionGasUsed prometheus.Gauge

httpPort string
logger logging.Logger
}
Expand Down Expand Up @@ -53,6 +58,39 @@ func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string,
},
[]string{"method"},
),
// EjectionRequests is a more detailed metric than NumRequests, specifically for tracking
// the ejection calls.
// The "mode" could be:
// - "periodic": periodically initiated ejection; or
// - "urgent": urgently invoked ejection in case of bad network health condition.
// The "status" indicates the final processing result of the ejection request.
EjectionRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "ejection_requests_total",
Help: "the total number of ejection requests",
},
[]string{"status", "mode"},
),
// The "state" could be:
// - "requested": operator is requested for ejection; or
// - "ejected": operator is actually ejected
// The "type" could be "number" or "stake", for the number of operators as well as the
// total stake share they represent.
Operators: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "operators_total",
Help: "the total number of operators to be ejected or actually ejected",
}, []string{"quorum", "state", "type"},
),
EjectionGasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "ejection_gas_used",
Help: "Gas used for operator ejection",
},
),
registry: reg,
httpPort: httpPort,
logger: logger.With("component", "DataAPIMetrics"),
Expand Down Expand Up @@ -81,6 +119,36 @@ func (g *Metrics) IncrementFailedRequestNum(method string) {
}).Inc()
}

func (g *Metrics) IncrementEjectionRequest(mode string, status codes.Code) {
g.EjectionRequests.With(prometheus.Labels{
"status": status.String(),
"mode": mode,
}).Inc()
}

func (g *Metrics) UpdateRequestedOperatorMetric(numOperatorsByQuorum map[uint8]int, stakeShareByQuorum map[uint8]float64) {
for q, count := range numOperatorsByQuorum {
for i := 0; i < count; i++ {
g.Operators.With(prometheus.Labels{
"quorum": fmt.Sprintf("%d", q),
"state": "requested",
"type": "number",
}).Inc()
}
}
for q, stakeShare := range stakeShareByQuorum {
g.Operators.With(prometheus.Labels{
"quorum": fmt.Sprintf("%d", q),
"state": "requested",
"type": "stake",
}).Add(stakeShare)
}
}

func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) {
g.EjectionGasUsed.Set(float64(gasUsed))
}

// Start starts the metrics server
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
Expand Down
Loading
Loading