diff --git a/core/eth/tx.go b/core/eth/tx.go index 7c4b414b51..e02117adec 100644 --- a/core/eth/tx.go +++ b/core/eth/tx.go @@ -364,7 +364,7 @@ func (t *Transactor) UpdateOperatorSocket(ctx context.Context, socket string) er return nil } -func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Transaction, error) { +func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) { byteIdsByQuorum := make([][][32]byte, len(operatorsByQuorum)) for i, ids := range operatorsByQuorum { for _, id := range ids { @@ -376,7 +376,17 @@ func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]c t.Logger.Error("Failed to generate transact opts", "err", err) return nil, err } - return t.Bindings.EjectionManager.EjectOperators(opts, byteIdsByQuorum) + txn, err := t.Bindings.EjectionManager.EjectOperators(opts, byteIdsByQuorum) + if err != nil { + t.Logger.Error("Failed to create transaction", "err", err) + return nil, err + } + receipt, err := t.EthClient.EstimateGasPriceAndLimitAndSendTx(context.Background(), txn, "EjectOperators", nil) + if err != nil { + t.Logger.Error("Failed to eject operators", "err", err) + return nil, err + } + return receipt, nil } // GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId diff --git a/core/mock/tx.go b/core/mock/tx.go index 782c423853..8cab75f9ed 100644 --- a/core/mock/tx.go +++ b/core/mock/tx.go @@ -70,10 +70,10 @@ func (t *MockTransactor) UpdateOperatorSocket(ctx context.Context, socket string return args.Error(0) } -func (t *MockTransactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Transaction, error) { +func (t *MockTransactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) { args := t.Called() result := args.Get(0) - return result.(*types.Transaction), args.Error(0) + return result.(*types.Receipt), args.Error(0) } func (t *MockTransactor) GetOperatorStakes(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (core.OperatorStakes, []core.QuorumID, error) { diff --git a/core/tx.go b/core/tx.go index a3f65fe96f..5d2ef0781b 100644 --- a/core/tx.go +++ b/core/tx.go @@ -73,7 +73,7 @@ type Transactor interface { // EjectOperators ejects operators from AVS registryCoordinator. // The operatorsByQuorum provides a list of operators for each quorum. Within a quorum, // the operators are ordered; in case of rate limiting, the first operators will be ejected. - EjectOperators(ctx context.Context, operatorsByQuorum [][]OperatorID) (*types.Transaction, error) + EjectOperators(ctx context.Context, operatorsByQuorum [][]OperatorID) (*types.Receipt, error) // GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId // is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum diff --git a/disperser/dataapi/ejector.go b/disperser/dataapi/ejector.go index 023b9cad29..6dee349da1 100644 --- a/disperser/dataapi/ejector.go +++ b/disperser/dataapi/ejector.go @@ -37,9 +37,9 @@ func computePerfScore(metric *OperatorNonsigningPercentageMetrics) float64 { } type Ejector struct { - Logger logging.Logger - Transactor core.Transactor - Metrics *Metrics + logger logging.Logger + transactor core.Transactor + metrics *Metrics // For serializing the ejection requests. mu sync.Mutex @@ -47,9 +47,9 @@ type Ejector struct { func NewEjector(logger logging.Logger, tx core.Transactor, metrics *Metrics) *Ejector { return &Ejector{ - Logger: logger.With("component", "Ejector"), - Transactor: tx, - Metrics: metrics, + logger: logger.With("component", "Ejector"), + transactor: tx, + metrics: metrics, } } @@ -70,6 +70,9 @@ func (e *Ejector) eject(ctx context.Context, nonsigningRate *OperatorsNonsigning // rate limiting. sort.Slice(nonsigners, func(i, j int) bool { if nonsigners[i].QuorumId == nonsigners[j].QuorumId { + if computePerfScore(nonsigners[i]) == computePerfScore(nonsigners[j]) { + return float64(nonsigners[i].TotalUnsignedBatches)*nonsigners[i].StakePercentage > float64(nonsigners[j].TotalUnsignedBatches)*nonsigners[j].StakePercentage + } return computePerfScore(nonsigners[i]) < computePerfScore(nonsigners[j]) } return nonsigners[i].QuorumId < nonsigners[j].QuorumId @@ -80,11 +83,12 @@ func (e *Ejector) eject(ctx context.Context, nonsigningRate *OperatorsNonsigning return err } - if _, err = e.Transactor.EjectOperators(ctx, operatorsByQuorum); err != nil { - e.Logger.Error("Ejection transaction failed", "err", err) + receipt, err := e.transactor.EjectOperators(ctx, operatorsByQuorum) + if err != nil { + e.logger.Error("Ejection transaction failed", "err", err) return err } - e.Logger.Info("Ejection transaction succeeded") + e.logger.Info("Ejection transaction succeeded", "receipt", receipt) // TODO: get the txn response and update the metrics. @@ -113,7 +117,7 @@ func (e *Ejector) convertOperators(nonsigners []*OperatorNonsigningPercentageMet stakeShareByQuorum[metric.QuorumId] += metric.StakePercentage } - e.Metrics.UpdateRequestedOperatorMetric(numOperatorByQuorum, stakeShareByQuorum) + e.metrics.UpdateRequestedOperatorMetric(numOperatorByQuorum, stakeShareByQuorum) return result, nil } diff --git a/disperser/dataapi/metrics.go b/disperser/dataapi/metrics.go index ed3ead9114..9ecb0f4c46 100644 --- a/disperser/dataapi/metrics.go +++ b/disperser/dataapi/metrics.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc/codes" ) type MetricsConfig struct { @@ -110,16 +111,9 @@ func (g *Metrics) IncrementFailedRequestNum(method string) { }).Inc() } -func (g *Metrics) IncrementSuccessfulEjection(mode string) { +func (g *Metrics) IncrementEjectionRequest(mode string, status codes.Code) { g.EjectionRequests.With(prometheus.Labels{ - "status": "success", - "mode": mode, - }).Inc() -} - -func (g *Metrics) IncrementFailedEjection(mode string) { - g.EjectionRequests.With(prometheus.Labels{ - "status": "failed", + "status": status.String(), "mode": mode, }).Inc() } diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 55dd1b6b98..79dba1fc0b 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -15,6 +15,7 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" + "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" "github.com/Layr-Labs/eigenda/disperser" @@ -312,11 +313,18 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) { })) defer timer.ObserveDuration() + mode := "periodic" + if c.Query("mode") != "" { + mode = c.Query("mode") + } + endTime := time.Now() if c.Query("end") != "" { var err error endTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("end")) if err != nil { + s.metrics.IncrementFailedRequestNum("EjectOperators") + s.metrics.IncrementEjectionRequest(mode, codes.InvalidArgument) errorResponse(c, err) return } @@ -327,21 +335,18 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) { interval = 86400 } - mode := "periodic" - if c.Query("mode") != "" { - mode = c.Query("mode") - } - nonSigningRate, err := s.getOperatorNonsigningRate(c.Request.Context(), endTime.Unix()-interval, endTime.Unix()) if err == nil { err = s.ejector.eject(c.Request.Context(), nonSigningRate, mode) } if err != nil { s.metrics.IncrementFailedRequestNum("EjectOperators") + s.metrics.IncrementEjectionRequest(mode, codes.Internal) errorResponse(c, err) return } s.metrics.IncrementSuccessfulRequestNum("EjectOperators") + s.metrics.IncrementEjectionRequest(mode, codes.OK) c.Status(http.StatusOK) }