Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix committed Apr 18, 2024
1 parent f52d82e commit 4cef860
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 29 deletions.
14 changes: 12 additions & 2 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (t *Transactor) UpdateOperatorSocket(ctx context.Context, socket string) er
return nil
}

func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Transaction, error) {
func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) {
byteIdsByQuorum := make([][][32]byte, len(operatorsByQuorum))
for i, ids := range operatorsByQuorum {
for _, id := range ids {
Expand All @@ -376,7 +376,17 @@ func (t *Transactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]c
t.Logger.Error("Failed to generate transact opts", "err", err)
return nil, err
}
return t.Bindings.EjectionManager.EjectOperators(opts, byteIdsByQuorum)
txn, err := t.Bindings.EjectionManager.EjectOperators(opts, byteIdsByQuorum)
if err != nil {
t.Logger.Error("Failed to create transaction", "err", err)
return nil, err
}
receipt, err := t.EthClient.EstimateGasPriceAndLimitAndSendTx(context.Background(), txn, "EjectOperators", nil)
if err != nil {
t.Logger.Error("Failed to eject operators", "err", err)
return nil, err
}
return receipt, nil
}

// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
Expand Down
4 changes: 2 additions & 2 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func (t *MockTransactor) UpdateOperatorSocket(ctx context.Context, socket string
return args.Error(0)
}

func (t *MockTransactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Transaction, error) {
func (t *MockTransactor) EjectOperators(ctx context.Context, operatorsByQuorum [][]core.OperatorID) (*types.Receipt, error) {
args := t.Called()
result := args.Get(0)
return result.(*types.Transaction), args.Error(0)
return result.(*types.Receipt), args.Error(0)
}

func (t *MockTransactor) GetOperatorStakes(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (core.OperatorStakes, []core.QuorumID, error) {
Expand Down
2 changes: 1 addition & 1 deletion core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Transactor interface {
// EjectOperators ejects operators from AVS registryCoordinator.
// The operatorsByQuorum provides a list of operators for each quorum. Within a quorum,
// the operators are ordered; in case of rate limiting, the first operators will be ejected.
EjectOperators(ctx context.Context, operatorsByQuorum [][]OperatorID) (*types.Transaction, error)
EjectOperators(ctx context.Context, operatorsByQuorum [][]OperatorID) (*types.Receipt, error)

// GetOperatorStakes returns the stakes of all operators within the quorums that the operator represented by operatorId
// is registered with. The returned stakes are for the block number supplied. The indices of the operators within each quorum
Expand Down
24 changes: 14 additions & 10 deletions disperser/dataapi/ejector.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ func computePerfScore(metric *OperatorNonsigningPercentageMetrics) float64 {
}

type Ejector struct {
Logger logging.Logger
Transactor core.Transactor
Metrics *Metrics
logger logging.Logger
transactor core.Transactor
metrics *Metrics

// For serializing the ejection requests.
mu sync.Mutex
}

func NewEjector(logger logging.Logger, tx core.Transactor, metrics *Metrics) *Ejector {
return &Ejector{
Logger: logger.With("component", "Ejector"),
Transactor: tx,
Metrics: metrics,
logger: logger.With("component", "Ejector"),
transactor: tx,
metrics: metrics,
}
}

Expand All @@ -70,6 +70,9 @@ func (e *Ejector) eject(ctx context.Context, nonsigningRate *OperatorsNonsigning
// rate limiting.
sort.Slice(nonsigners, func(i, j int) bool {
if nonsigners[i].QuorumId == nonsigners[j].QuorumId {
if computePerfScore(nonsigners[i]) == computePerfScore(nonsigners[j]) {
return float64(nonsigners[i].TotalUnsignedBatches)*nonsigners[i].StakePercentage > float64(nonsigners[j].TotalUnsignedBatches)*nonsigners[j].StakePercentage
}
return computePerfScore(nonsigners[i]) < computePerfScore(nonsigners[j])
}
return nonsigners[i].QuorumId < nonsigners[j].QuorumId
Expand All @@ -80,11 +83,12 @@ func (e *Ejector) eject(ctx context.Context, nonsigningRate *OperatorsNonsigning
return err
}

if _, err = e.Transactor.EjectOperators(ctx, operatorsByQuorum); err != nil {
e.Logger.Error("Ejection transaction failed", "err", err)
receipt, err := e.transactor.EjectOperators(ctx, operatorsByQuorum)
if err != nil {
e.logger.Error("Ejection transaction failed", "err", err)
return err
}
e.Logger.Info("Ejection transaction succeeded")
e.logger.Info("Ejection transaction succeeded", "receipt", receipt)

// TODO: get the txn response and update the metrics.

Expand Down Expand Up @@ -113,7 +117,7 @@ func (e *Ejector) convertOperators(nonsigners []*OperatorNonsigningPercentageMet
stakeShareByQuorum[metric.QuorumId] += metric.StakePercentage
}

e.Metrics.UpdateRequestedOperatorMetric(numOperatorByQuorum, stakeShareByQuorum)
e.metrics.UpdateRequestedOperatorMetric(numOperatorByQuorum, stakeShareByQuorum)

return result, nil
}
12 changes: 3 additions & 9 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
)

type MetricsConfig struct {
Expand Down Expand Up @@ -110,16 +111,9 @@ func (g *Metrics) IncrementFailedRequestNum(method string) {
}).Inc()
}

func (g *Metrics) IncrementSuccessfulEjection(mode string) {
func (g *Metrics) IncrementEjectionRequest(mode string, status codes.Code) {
g.EjectionRequests.With(prometheus.Labels{
"status": "success",
"mode": mode,
}).Inc()
}

func (g *Metrics) IncrementFailedEjection(mode string) {
g.EjectionRequests.With(prometheus.Labels{
"status": "failed",
"status": status.String(),
"mode": mode,
}).Inc()
}
Expand Down
15 changes: 10 additions & 5 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/Layr-Labs/eigenda/disperser"
Expand Down Expand Up @@ -312,11 +313,18 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) {
}))
defer timer.ObserveDuration()

mode := "periodic"
if c.Query("mode") != "" {
mode = c.Query("mode")
}

endTime := time.Now()
if c.Query("end") != "" {
var err error
endTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("end"))
if err != nil {
s.metrics.IncrementFailedRequestNum("EjectOperators")
s.metrics.IncrementEjectionRequest(mode, codes.InvalidArgument)
errorResponse(c, err)
return
}
Expand All @@ -327,21 +335,18 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) {
interval = 86400
}

mode := "periodic"
if c.Query("mode") != "" {
mode = c.Query("mode")
}

nonSigningRate, err := s.getOperatorNonsigningRate(c.Request.Context(), endTime.Unix()-interval, endTime.Unix())
if err == nil {
err = s.ejector.eject(c.Request.Context(), nonSigningRate, mode)
}
if err != nil {
s.metrics.IncrementFailedRequestNum("EjectOperators")
s.metrics.IncrementEjectionRequest(mode, codes.Internal)
errorResponse(c, err)
return
}
s.metrics.IncrementSuccessfulRequestNum("EjectOperators")
s.metrics.IncrementEjectionRequest(mode, codes.OK)
c.Status(http.StatusOK)
}

Expand Down

0 comments on commit 4cef860

Please sign in to comment.