From 752d11a17d99843168500ad3f2bf262bf4d8b898 Mon Sep 17 00:00:00 2001 From: gpsanant Date: Mon, 28 Oct 2024 18:56:13 -0700 Subject: [PATCH] feat: handle slashing events and previous tests work --- Makefile | 11 + internal/config/config.go | 7 +- .../eigenState/stakerShares/stakerShares.go | 468 +++++++++++------- .../stakerShares/stakerShares_test.go | 154 +++--- 4 files changed, 392 insertions(+), 248 deletions(-) diff --git a/Makefile b/Makefile index c120ea5c..4e75056c 100644 --- a/Makefile +++ b/Makefile @@ -71,6 +71,17 @@ lint: test: ./scripts/goTest.sh -v -p 1 -parallel 1 ./... +# Run tests in a specific test file +# Usage: make test-file FILE=path/to/your_test_file.go +.PHONY: test-file +test-file: + @if [ -z "$(FILE)" ]; then \ + echo "Error: FILE variable is not set."; \ + echo "Usage: make test-file FILE=path/to/your_test_file.go"; \ + exit 1; \ + fi + ./scripts/goTest.sh -v -p 1 -parallel 1 $(FILE) + .PHONY: staticcheck staticcheck: staticcheck ./... diff --git a/internal/config/config.go b/internal/config/config.go index 7f177279..6c9f7450 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,8 +3,9 @@ package config import ( "errors" "fmt" - "github.com/spf13/viper" "strings" + + "github.com/spf13/viper" ) type EnvScope string @@ -138,6 +139,7 @@ type ContractAddresses struct { StrategyManager string DelegationManager string AvsDirectory string + AllocationManager string } func (c *Config) GetContractsMapForChain() *ContractAddresses { @@ -148,6 +150,7 @@ func (c *Config) GetContractsMapForChain() *ContractAddresses { StrategyManager: "0xf9fbf2e35d8803273e214c99bf15174139f4e67a", DelegationManager: "0x75dfe5b44c2e530568001400d3f704bc8ae350cc", AvsDirectory: "0x141d6995556135d4997b2ff72eb443be300353bc", + AllocationManager: "0x16D3F63d18549f035Bc69b553006684B69cE8148", } } else if c.Chain == Chain_Holesky { return &ContractAddresses{ @@ -156,6 +159,7 @@ func (c *Config) GetContractsMapForChain() *ContractAddresses { StrategyManager: "0xdfb5f6ce42aaa7830e94ecfccad411bef4d4d5b6", DelegationManager: "0xa44151489861fe9e3055d95adc98fbd462b948e7", AvsDirectory: "0x055733000064333caddbc92763c58bf0192ffebf", + AllocationManager: "0x16D3F63d18549f035Bc69b553006684B69cE8148", } } else if c.Chain == Chain_Mainnet { return &ContractAddresses{ @@ -164,6 +168,7 @@ func (c *Config) GetContractsMapForChain() *ContractAddresses { StrategyManager: "0x858646372cc42e1a627fce94aa7a7033e7cf075a", DelegationManager: "0x39053d51b77dc0d36036fc1fcc8cb819df8ef37a", AvsDirectory: "0x135dda560e946695d6f155dacafc6f1f25c1f5af", + AllocationManager: "0x16D3F63d18549f035Bc69b553006684B69cE8148", } } else { return nil diff --git a/internal/eigenState/stakerShares/stakerShares.go b/internal/eigenState/stakerShares/stakerShares.go index 204f3668..a218c665 100644 --- a/internal/eigenState/stakerShares/stakerShares.go +++ b/internal/eigenState/stakerShares/stakerShares.go @@ -11,6 +11,8 @@ import ( "strings" "time" + pkgUtils "github.com/Layr-Labs/go-sidecar/pkg/utils" + "github.com/Layr-Labs/go-sidecar/internal/config" "github.com/Layr-Labs/go-sidecar/internal/eigenState/base" "github.com/Layr-Labs/go-sidecar/internal/eigenState/stateManager" @@ -18,49 +20,77 @@ import ( "github.com/Layr-Labs/go-sidecar/internal/storage" "github.com/Layr-Labs/go-sidecar/internal/types/numbers" "github.com/Layr-Labs/go-sidecar/internal/utils" - pkgUtils "github.com/Layr-Labs/go-sidecar/pkg/utils" "go.uber.org/zap" "golang.org/x/xerrors" "gorm.io/gorm" "gorm.io/gorm/clause" ) -type StakerShares struct { +var WAD = big.NewInt(1e18) + +type StateDiffEvent interface { + SlotID() types.SlotID +} + +type ShareDiff struct { Staker string Strategy string - Shares string + Shares *big.Int BlockNumber uint64 - CreatedAt time.Time } -type AccumulatedStateChange struct { - Staker string +func (d *ShareDiff) SlotID() types.SlotID { + return NewShareDiffSlotID(d.Staker, d.Strategy) +} + +func NewShareDiffSlotID(staker string, strategy string) types.SlotID { + return types.SlotID(fmt.Sprintf("%s_%s", staker, strategy)) +} + +type SlashDiff struct { + Operator string Strategy string - Shares *big.Int + WadsSlashed *big.Int BlockNumber uint64 + LogIndex uint64 } -type StakerSharesDiff struct { +func (u *SlashDiff) SlotID() types.SlotID { + return types.SlotID(fmt.Sprintf("MAX_MAGNITUDE_%s_%s", u.Operator, u.Strategy)) +} + +type StateDiff struct { + Event StateDiffEvent +} + +type StakerShares struct { Staker string Strategy string - Shares *big.Int + Shares string BlockNumber uint64 - IsNew bool + CreatedAt time.Time } -func NewSlotID(staker string, strategy string) types.SlotID { - return types.SlotID(fmt.Sprintf("%s_%s", staker, strategy)) +func (ss *StakerShares) ToShareDiff() (*ShareDiff, error) { + shares, success := numbers.NewBig257().SetString(ss.Shares, 10) + if !success { + return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", ss.Shares) + } + return &ShareDiff{ + Staker: ss.Staker, + Strategy: ss.Strategy, + Shares: shares, + BlockNumber: ss.BlockNumber, + }, nil } type StakerSharesModel struct { base.BaseEigenState - StateTransitions types.StateTransitions[AccumulatedStateChange] - DB *gorm.DB - logger *zap.Logger - globalConfig *config.Config + DB *gorm.DB + logger *zap.Logger + globalConfig *config.Config - // Accumulates state changes for SlotIds, grouped by block number - stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange + deltaAccumulator map[uint64][]*StateDiff } func NewStakerSharesModel( @@ -74,7 +104,7 @@ func NewStakerSharesModel( DB: grm, logger: logger, globalConfig: globalConfig, - stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange), + deltaAccumulator: make(map[uint64][]*StateDiff), } esm.RegisterState(model, 3) @@ -111,7 +141,7 @@ func parseLogOutputForDepositEvent(outputDataStr string) (*depositOutputData, er return outputData, err } -func (ss *StakerSharesModel) handleStakerDepositEvent(log *storage.TransactionLog) (*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handleStakerDepositEvent(log *storage.TransactionLog) (*StateDiff, error) { outputData, err := parseLogOutputForDepositEvent(log.OutputData) if err != nil { return nil, err @@ -134,11 +164,13 @@ func (ss *StakerSharesModel) handleStakerDepositEvent(log *storage.TransactionLo return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.Shares) } - return &AccumulatedStateChange{ - Staker: stakerAddress, - Strategy: outputData.Strategy, - Shares: shares, - BlockNumber: log.BlockNumber, + return &StateDiff{ + Event: &ShareDiff{ + Staker: stakerAddress, + Strategy: outputData.Strategy, + Shares: shares, + BlockNumber: log.BlockNumber, + }, }, nil } @@ -158,7 +190,7 @@ func parseLogOutputForPodSharesUpdatedEvent(outputDataStr string) (*podSharesUpd return outputData, err } -func (ss *StakerSharesModel) handlePodSharesUpdatedEvent(log *storage.TransactionLog) (*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handlePodSharesUpdatedEvent(log *storage.TransactionLog) (*StateDiff, error) { arguments, err := ss.ParseLogArguments(log) if err != nil { return nil, err @@ -177,15 +209,17 @@ func (ss *StakerSharesModel) handlePodSharesUpdatedEvent(log *storage.Transactio return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", sharesDelta) } - return &AccumulatedStateChange{ - Staker: staker, - Strategy: "0xbeac0eeeeeeeeeeeeeeeeeeeeeeeeeeeeeebeac0", - Shares: sharesDelta, - BlockNumber: log.BlockNumber, + return &StateDiff{ + Event: &ShareDiff{ + Staker: staker, + Strategy: "0xbeac0eeeeeeeeeeeeeeeeeeeeeeeeeeeeeebeac0", + Shares: sharesDelta, + BlockNumber: log.BlockNumber, + }, }, nil } -func (ss *StakerSharesModel) handleM1StakerWithdrawals(log *storage.TransactionLog) (*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handleM1StakerWithdrawals(log *storage.TransactionLog) (*StateDiff, error) { outputData, err := parseLogOutputForDepositEvent(log.OutputData) if err != nil { return nil, err @@ -208,11 +242,13 @@ func (ss *StakerSharesModel) handleM1StakerWithdrawals(log *storage.TransactionL return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.Shares) } - return &AccumulatedStateChange{ - Staker: stakerAddress, - Strategy: outputData.Strategy, - Shares: shares.Mul(shares, big.NewInt(-1)), - BlockNumber: log.BlockNumber, + return &StateDiff{ + Event: &ShareDiff{ + Staker: stakerAddress, + Strategy: outputData.Strategy, + Shares: shares.Mul(shares, big.NewInt(-1)), + BlockNumber: log.BlockNumber, + }, }, nil } @@ -239,7 +275,7 @@ func parseLogOutputForM2MigrationEvent(outputDataStr string) (*m2MigrationOutput // Since we have already counted M1 withdrawals due to processing events block-by-block, we need to handle not double subtracting. // Assuming that M2 WithdrawalQueued events always result in a subtraction, if we encounter a migration event, we need // to add the amount back to the shares to get the correct final state. -func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.TransactionLog) ([]*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.TransactionLog) ([]*StateDiff, error) { outputData, err := parseLogOutputForM2MigrationEvent(log.OutputData) if err != nil { return nil, err @@ -287,7 +323,7 @@ func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.Tran return nil, res.Error } - changes := make([]*AccumulatedStateChange, 0) + changes := make([]*StateDiff, 0) for _, l := range logs { c, err := ss.handleStakerDepositEvent(&l) if err != nil { @@ -326,24 +362,26 @@ func parseLogOutputForM2WithdrawalEvent(outputDataStr string) (*m2WithdrawalOutp } // handleM2QueuedWithdrawal handles the WithdrawalQueued event from the DelegationManager contract for M2. -func (ss *StakerSharesModel) handleM2QueuedWithdrawal(log *storage.TransactionLog) ([]*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handleM2QueuedWithdrawal(log *storage.TransactionLog) ([]*StateDiff, error) { outputData, err := parseLogOutputForM2WithdrawalEvent(log.OutputData) if err != nil { return nil, err } - records := make([]*AccumulatedStateChange, 0) + records := make([]*StateDiff, 0) for i, strategy := range outputData.Withdrawal.Strategies { shares, success := numbers.NewBig257().SetString(outputData.Withdrawal.Shares[i].String(), 10) if !success { return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.Withdrawal.Shares[i]) } - r := &AccumulatedStateChange{ - Staker: outputData.Withdrawal.Staker, - Strategy: strategy, - Shares: shares.Mul(shares, big.NewInt(-1)), - BlockNumber: log.BlockNumber, + r := &StateDiff{ + Event: &ShareDiff{ + Staker: outputData.Withdrawal.Staker, + Strategy: strategy, + Shares: shares.Mul(shares, big.NewInt(-1)), + BlockNumber: log.BlockNumber, + }, } records = append(records, r) } @@ -378,39 +416,88 @@ func parseLogOutputForSlashingWithdrawalQueuedEvent(outputDataStr string) (*slas } // handleM2QueuedWithdrawal handles the WithdrawalQueued event from the DelegationManager contract for M2. -func (ss *StakerSharesModel) handleSlashingQueuedWithdrawal(log *storage.TransactionLog) ([]*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handleSlashingQueuedWithdrawal(log *storage.TransactionLog) ([]*StateDiff, error) { outputData, err := parseLogOutputForSlashingWithdrawalQueuedEvent(log.OutputData) if err != nil { return nil, err } - records := make([]*AccumulatedStateChange, 0) + records := make([]*StateDiff, 0) for i, strategy := range outputData.Withdrawal.Strategies { shares, success := numbers.NewBig257().SetString(outputData.SharesToWithdraw[i].String(), 10) if !success { return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.SharesToWithdraw[i]) } - r := &AccumulatedStateChange{ - Staker: outputData.Withdrawal.Staker, - Strategy: strategy, - Shares: shares.Mul(shares, big.NewInt(-1)), - BlockNumber: log.BlockNumber, + r := &StateDiff{ + Event: &ShareDiff{ + Staker: outputData.Withdrawal.Staker, + Strategy: strategy, + Shares: shares.Mul(shares, big.NewInt(-1)), + BlockNumber: log.BlockNumber, + }, } records = append(records, r) } return records, nil } -type AccumulatedStateChanges struct { - Changes []*AccumulatedStateChange +type operatorSlashedOutputData struct { + Operator string `json:"operator"` + Strategies []string `json:"strategies"` + WadsSlashed []json.Number `json:"wadsSlashed"` +} + +func parseLogOutputForOperatorSlashedEvent(outputDataStr string) (*operatorSlashedOutputData, error) { + outputData := &operatorSlashedOutputData{} + decoder := json.NewDecoder(strings.NewReader(outputDataStr)) + decoder.UseNumber() + + err := decoder.Decode(&outputData) + if err != nil { + return nil, err + } + + return outputData, err +} + +func (ss *StakerSharesModel) handleOperatorSlashedEvent(log *storage.TransactionLog) ([]*StateDiff, error) { + outputData, err := parseLogOutputForOperatorSlashedEvent(log.OutputData) + if err != nil { + return nil, err + } + + stateDiffs := make([]*StateDiff, 0) + + for i, strategy := range outputData.Strategies { + wadsSlashed, success := numbers.NewBig257().SetString(outputData.WadsSlashed[i].String(), 10) + if !success { + return nil, xerrors.Errorf("Failed to convert wadsSlashed to big.Int: %s", outputData.WadsSlashed[i]) + } + stateDiffs = append(stateDiffs, &StateDiff{ + Event: &SlashDiff{ + Operator: outputData.Operator, + Strategy: strategy, + WadsSlashed: wadsSlashed, + BlockNumber: log.BlockNumber, + LogIndex: log.LogIndex, + }, + }) + } + + return stateDiffs, nil +} + +type AccumulatedStateDiffs struct { + StateDiffs []*StateDiff } -func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[AccumulatedStateChanges], []uint64) { - stateChanges := make(types.StateTransitions[AccumulatedStateChanges]) +func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[AccumulatedStateDiffs], []uint64) { + stateChanges := make(types.StateTransitions[AccumulatedStateDiffs]) - stateChanges[0] = func(log *storage.TransactionLog) (*AccumulatedStateChanges, error) { - var parsedRecords []*AccumulatedStateChange + stateChanges[0] = func(log *storage.TransactionLog) (*AccumulatedStateDiffs, error) { + var stateDiff *StateDiff + var stateDiffs []*StateDiff var err error contractAddresses := ss.globalConfig.GetContractsMapForChain() @@ -418,35 +505,22 @@ func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[Accum // Staker shares is a bit more complex and has 4 possible contract/event combinations // that we need to handle if log.Address == contractAddresses.StrategyManager && log.EventName == "Deposit" { - record, err := ss.handleStakerDepositEvent(log) - if err == nil { - parsedRecords = append(parsedRecords, record) - } + stateDiff, err = ss.handleStakerDepositEvent(log) + stateDiffs = []*StateDiff{stateDiff} } else if log.Address == contractAddresses.EigenpodManager && log.EventName == "PodSharesUpdated" { - record, err := ss.handlePodSharesUpdatedEvent(log) - if err == nil { - parsedRecords = append(parsedRecords, record) - } + stateDiff, err = ss.handlePodSharesUpdatedEvent(log) + stateDiffs = []*StateDiff{stateDiff} } else if log.Address == contractAddresses.StrategyManager && log.EventName == "ShareWithdrawalQueued" && log.TransactionHash != "0x62eb0d0865b2636c74ed146e2d161e39e42b09bac7f86b8905fc7a830935dc1e" { - record, err := ss.handleM1StakerWithdrawals(log) - if err == nil { - parsedRecords = append(parsedRecords, record) - } + stateDiff, err = ss.handleM1StakerWithdrawals(log) + stateDiffs = []*StateDiff{stateDiff} } else if log.Address == contractAddresses.DelegationManager && log.EventName == "WithdrawalQueued" { - records, err := ss.handleM2QueuedWithdrawal(log) - if err == nil && records != nil { - parsedRecords = append(parsedRecords, records...) - } + stateDiffs, err = ss.handleM2QueuedWithdrawal(log) } else if log.Address == contractAddresses.DelegationManager && log.EventName == "WithdrawalMigrated" { - records, err := ss.handleMigratedM2StakerWithdrawals(log) - if err == nil { - parsedRecords = append(parsedRecords, records...) - } + stateDiffs, err = ss.handleMigratedM2StakerWithdrawals(log) } else if log.Address == contractAddresses.DelegationManager && log.EventName == "SlashingWithdrawalQueued" { - records, err := ss.handleSlashingQueuedWithdrawal(log) - if err == nil { - parsedRecords = append(parsedRecords, records...) - } + stateDiffs, err = ss.handleSlashingQueuedWithdrawal(log) + } else if log.Address == contractAddresses.AllocationManager && log.EventName == "OperatorSlashed" { + stateDiffs, err = ss.handleOperatorSlashedEvent(log) } else { ss.logger.Sugar().Debugw("Got stakerShares event that we don't handle", zap.String("eventName", log.EventName), @@ -456,30 +530,15 @@ func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[Accum if err != nil { return nil, err } - if parsedRecords == nil { - return nil, nil - } - // Sanity check to make sure we've got an initialized accumulator map for the block - if _, ok := ss.stateAccumulator[log.BlockNumber]; !ok { - return nil, xerrors.Errorf("No state accumulator found for block %d", log.BlockNumber) + // Sanity check to make sure we've got an initialized accumulator maps for the block + if _, ok := ss.deltaAccumulator[log.BlockNumber]; !ok { + return nil, xerrors.Errorf("no state accumulator found for block %d", log.BlockNumber) } - for _, parsedRecord := range parsedRecords { - if parsedRecord == nil { - continue - } - slotId := NewSlotID(parsedRecord.Staker, parsedRecord.Strategy) - record, ok := ss.stateAccumulator[log.BlockNumber][slotId] - if !ok { - record = parsedRecord - ss.stateAccumulator[log.BlockNumber][slotId] = record - } else { - record.Shares = record.Shares.Add(record.Shares, parsedRecord.Shares) - } - } + ss.deltaAccumulator[log.BlockNumber] = append(ss.deltaAccumulator[log.BlockNumber], stateDiffs...) - return &AccumulatedStateChanges{Changes: parsedRecords}, nil + return &AccumulatedStateDiffs{StateDiffs: stateDiffs}, nil } // Create an ordered list of block numbers @@ -518,23 +577,23 @@ func (ss *StakerSharesModel) IsInterestingLog(log *storage.TransactionLog) bool } func (ss *StakerSharesModel) SetupStateForBlock(blockNumber uint64) error { - ss.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange) + ss.deltaAccumulator[blockNumber] = make([]*StateDiff, 0) return nil } func (ss *StakerSharesModel) CleanupProcessedStateForBlock(blockNumber uint64) error { - delete(ss.stateAccumulator, blockNumber) + delete(ss.deltaAccumulator, blockNumber) return nil } func (ss *StakerSharesModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) { - stateChanges, sortedBlockNumbers := ss.GetStateTransitions() + StateDiffs, sortedBlockNumbers := ss.GetStateTransitions() for _, blockNumber := range sortedBlockNumbers { if log.BlockNumber >= blockNumber { ss.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", blockNumber)) - change, err := stateChanges[blockNumber](log) + change, err := StateDiffs[blockNumber](log) if err != nil { return nil, err } @@ -548,87 +607,136 @@ func (ss *StakerSharesModel) HandleStateChange(log *storage.TransactionLog) (int } // prepareState prepares the state for commit by adding the new state to the existing state. -func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]*StakerSharesDiff, error) { - preparedState := make([]*StakerSharesDiff, 0) +func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]*ShareDiff, error) { + // keep track of the updated staker shares + updatedStakerShares := make(map[types.SlotID]*ShareDiff) - accumulatedState, ok := ss.stateAccumulator[blockNumber] - if !ok { - err := xerrors.Errorf("No accumulated state found for block %d", blockNumber) - ss.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber)) - return nil, err - } + for _, diff := range ss.deltaAccumulator[blockNumber] { + switch diff.Event.(type) { + case *ShareDiff: + shareDiff := diff.Event.(*ShareDiff) + slotId := NewShareDiffSlotID(shareDiff.Staker, shareDiff.Strategy) - slotIds := make([]types.SlotID, 0) - for slotId := range accumulatedState { - slotIds = append(slotIds, slotId) - } + currStakerShares, ok := updatedStakerShares[slotId] + if !ok { + stakerShares := &StakerShares{} + // get the record from the database with the given staker and strategy with the latest blockNumber + res := ss.DB.Model(&StakerShares{}).Where("staker = ? AND strategy = ?", shareDiff.Staker, shareDiff.Strategy).Order("block_number desc").First(&stakerShares) + if res.Error != nil && res.Error != gorm.ErrRecordNotFound { + ss.logger.Sugar().Errorw("Failed to fetch staker_shares", zap.Error(res.Error)) + return nil, res.Error + } + + // if the record is not found, create a new record with shares as 0 + if res.Error == gorm.ErrRecordNotFound { + stakerShares = &StakerShares{ + Staker: shareDiff.Staker, + Strategy: shareDiff.Strategy, + Shares: "0", + BlockNumber: blockNumber, + } + } + + var err error + currStakerShares, err = stakerShares.ToShareDiff() + if err != nil { + return nil, err + } + } - // Find only the records from the previous block, that are modified in this block - query := ` - with ranked_rows as ( - select - staker, - strategy, - shares, - block_number, - ROW_NUMBER() OVER (PARTITION BY staker, strategy ORDER BY block_number desc) as rn - from staker_shares - where - concat(staker, '_', strategy) in @slotIds - ) - select - rr.staker, - rr.strategy, - rr.shares, - rr.block_number - from ranked_rows as rr - where rn = 1 - ` - existingRecords := make([]StakerShares, 0) - res := ss.DB.Model(&StakerShares{}). - Raw(query, - sql.Named("slotIds", slotIds), - ). - Scan(&existingRecords) + // add the shares to the existing shares + currStakerShares.Shares = currStakerShares.Shares.Add(currStakerShares.Shares, shareDiff.Shares) + + // update the local cache + updatedStakerShares[slotId] = currStakerShares + case *SlashDiff: + slashDiff := diff.Event.(*SlashDiff) + query := ` + with ranked_staker_delegations as ( + select + staker, + operator, + delegated, + ROW_NUMBER() OVER (PARTITION BY staker ORDER BY block_number desc, log_index desc) as rn + from staker_delegation_changes + where + block_number <= @blockNumber + log_index <= @logIndex + ), + delegated_stakers as ( + select + lsd.staker + from latest_staker_delegations as lsd + where + lsd.operator = @operator + and lsd.rn = 1 + ), + ranked_staker_shares as ( + select + ss.staker, + ss.strategy, + ss.shares, + ss.block_number, + ROW_NUMBER() OVER (PARTITION BY ss.staker, ss.strategy ORDER BY ss.block_number desc) as rn + from staker_shares as ss + join delegated_stakers as ds + on ss.staker = ds.staker + ), + latest_staker_shares as ( + select + ls.staker, + ls.strategy, + ls.shares, + ls.block_number + from latest_staker_shares as ls + where + ls.rn = 1 + ) + select * from latest_staker_shares + + ` + stakerShares := make([]StakerShares, 0) + + // get the staker shares for the stakers who were delegated to the operator + // and update the shares with the new max magnitude + res := ss.DB.Model(&StakerShares{}). + Raw(query, + sql.Named("blockNumber", slashDiff.BlockNumber), + sql.Named("logIndex", slashDiff.LogIndex), + sql.Named("operator", slashDiff.Operator), + ).Scan(&updatedStakerShares) + if res.Error != nil { + ss.logger.Sugar().Errorw("Failed to fetch staker_shares", zap.Error(res.Error)) + return nil, res.Error + } - if res.Error != nil { - ss.logger.Sugar().Errorw("Failed to fetch staker_shares", zap.Error(res.Error)) - return nil, res.Error - } + wadsLeft := new(big.Int).Sub(WAD, slashDiff.WadsSlashed) - // Map the existing records to a map for easier lookup - mappedRecords := make(map[types.SlotID]StakerShares) - for _, record := range existingRecords { - slotId := NewSlotID(record.Staker, record.Strategy) - mappedRecords[slotId] = record - } + for _, stakerSharesRecord := range stakerShares { + slotID := NewShareDiffSlotID(stakerSharesRecord.Staker, stakerSharesRecord.Strategy) + var err error + currStakerShares, ok := updatedStakerShares[slotID] + if !ok { + currStakerShares, err = stakerSharesRecord.ToShareDiff() + if err != nil { + return nil, err + } + } - // Loop over our new state changes. - // If the record exists in the previous block, add the shares to the existing shares - for slotId, newState := range accumulatedState { - prepared := &StakerSharesDiff{ - Staker: newState.Staker, - Strategy: newState.Strategy, - Shares: newState.Shares, - BlockNumber: blockNumber, - } + // update the shares with wadsLeft + currStakerShares.Shares = new(big.Int).Div(new(big.Int).Mul(currStakerShares.Shares, wadsLeft), WAD) - if existingRecord, ok := mappedRecords[slotId]; ok { - existingShares, success := numbers.NewBig257().SetString(existingRecord.Shares, 10) - if !success { - ss.logger.Sugar().Errorw("Failed to convert existing shares to big.Int", - zap.String("shares", existingRecord.Shares), - zap.String("staker", existingRecord.Staker), - zap.String("strategy", existingRecord.Strategy), - zap.Uint64("blockNumber", blockNumber), - ) - continue } - prepared.Shares = existingShares.Add(existingShares, newState.Shares) + default: } + } - preparedState = append(preparedState, prepared) + // return the values as a slice + preparedState := make([]*ShareDiff, 0, len(updatedStakerShares)) + for _, v := range updatedStakerShares { + preparedState = append(preparedState, v) } + return preparedState, nil } @@ -638,7 +746,7 @@ func (ss *StakerSharesModel) CommitFinalState(blockNumber uint64) error { return err } - recordsToInsert := pkgUtils.Map(records, func(r *StakerSharesDiff, i uint64) *StakerShares { + recordsToInsert := pkgUtils.Map(records, func(r *ShareDiff, i uint64) *StakerShares { return &StakerShares{ Staker: r.Staker, Strategy: r.Strategy, @@ -673,11 +781,11 @@ func (ss *StakerSharesModel) GenerateStateRoot(blockNumber uint64) (types.StateR return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil } -func (ss *StakerSharesModel) sortValuesForMerkleTree(diffs []*StakerSharesDiff) []*base.MerkleTreeInput { +func (ss *StakerSharesModel) sortValuesForMerkleTree(diffs []*ShareDiff) []*base.MerkleTreeInput { inputs := make([]*base.MerkleTreeInput, 0) for _, diff := range diffs { inputs = append(inputs, &base.MerkleTreeInput{ - SlotID: NewSlotID(diff.Staker, diff.Strategy), + SlotID: NewShareDiffSlotID(diff.Staker, diff.Strategy), Value: diff.Shares.Bytes(), }) } diff --git a/internal/eigenState/stakerShares/stakerShares_test.go b/internal/eigenState/stakerShares/stakerShares_test.go index a878e661..5d2bceb6 100644 --- a/internal/eigenState/stakerShares/stakerShares_test.go +++ b/internal/eigenState/stakerShares/stakerShares_test.go @@ -92,14 +92,15 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange := change.(*AccumulatedStateChanges) + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) - assert.Equal(t, 1, len(typedChange.Changes)) + shareDiff := typedChange.StateDiffs[0].Event.(*ShareDiff) expectedShares, _ := numbers.NewBig257().SetString("159925690037480381", 10) - assert.Equal(t, expectedShares, typedChange.Changes[0].Shares) - assert.Equal(t, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", typedChange.Changes[0].Staker) - assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", typedChange.Changes[0].Strategy) + assert.Equal(t, expectedShares, shareDiff.Shares) + assert.Equal(t, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", shareDiff.Staker) + assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", shareDiff.Strategy) teardown(model) }) @@ -130,13 +131,15 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange := change.(*AccumulatedStateChanges) - assert.Equal(t, 1, len(typedChange.Changes)) + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) + + shareDiff := typedChange.StateDiffs[0].Event.(*ShareDiff) expectedShares, _ := numbers.NewBig257().SetString("-246393621132195985", 10) - assert.Equal(t, expectedShares, typedChange.Changes[0].Shares) - assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", typedChange.Changes[0].Staker) - assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", typedChange.Changes[0].Strategy) + assert.Equal(t, expectedShares, shareDiff.Shares) + assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", shareDiff.Staker) + assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", shareDiff.Strategy) teardown(model) }) @@ -167,13 +170,15 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange := change.(*AccumulatedStateChanges) - assert.Equal(t, 1, len(typedChange.Changes)) + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) + + shareDiff := typedChange.StateDiffs[0].Event.(*ShareDiff) expectedShares, _ := numbers.NewBig257().SetString("32000000000000000000", 10) - assert.Equal(t, expectedShares, typedChange.Changes[0].Shares) - assert.Equal(t, strings.ToLower("0x0808D4689B347D499a96f139A5fC5B5101258406"), typedChange.Changes[0].Staker) - assert.Equal(t, "0xbeac0eeeeeeeeeeeeeeeeeeeeeeeeeeeeeebeac0", typedChange.Changes[0].Strategy) + assert.Equal(t, expectedShares, shareDiff.Shares) + assert.Equal(t, strings.ToLower("0x0808D4689B347D499a96f139A5fC5B5101258406"), shareDiff.Staker) + assert.Equal(t, "0xbeac0eeeeeeeeeeeeeeeeeeeeeeeeeeeeeebeac0", shareDiff.Strategy) teardown(model) }) @@ -204,13 +209,15 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange := change.(*AccumulatedStateChanges) - assert.Equal(t, 1, len(typedChange.Changes)) + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) + + shareDiff := typedChange.StateDiffs[0].Event.(*ShareDiff) expectedShares, _ := numbers.NewBig257().SetString("-1000000000000000000", 10) - assert.Equal(t, expectedShares, typedChange.Changes[0].Shares) - assert.Equal(t, strings.ToLower("0x3c42cd72639e3e8d11ab8d0072cc13bd5d8aa83c"), typedChange.Changes[0].Staker) - assert.Equal(t, "0xd523267698c81a372191136e477fdebfa33d9fb4", typedChange.Changes[0].Strategy) + assert.Equal(t, expectedShares, shareDiff.Shares) + assert.Equal(t, strings.ToLower("0x3c42cd72639e3e8d11ab8d0072cc13bd5d8aa83c"), shareDiff.Staker) + assert.Equal(t, "0xd523267698c81a372191136e477fdebfa33d9fb4", shareDiff.Strategy) teardown(model) }) @@ -302,11 +309,13 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange := change.(*AccumulatedStateChanges) - assert.Equal(t, 1, len(typedChange.Changes)) - assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", typedChange.Changes[0].Staker) - assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", typedChange.Changes[0].Strategy) - assert.Equal(t, "246393621132195985", typedChange.Changes[0].Shares.String()) + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) + + shareDiff := typedChange.StateDiffs[0].Event.(*ShareDiff) + assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", shareDiff.Staker) + assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", shareDiff.Strategy) + assert.Equal(t, "246393621132195985", shareDiff.Shares.String()) preparedChange, err := model.prepareState(blockNumber) assert.Nil(t, err) @@ -380,21 +389,24 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange := change.(*AccumulatedStateChanges) + typedChange := change.(*AccumulatedStateDiffs) - assert.Equal(t, 1, len(typedChange.Changes)) - assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", typedChange.Changes[0].Staker) - assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", typedChange.Changes[0].Strategy) - assert.Equal(t, "-246393621132195985", typedChange.Changes[0].Shares.String()) + assert.Equal(t, 1, len(typedChange.StateDiffs)) - slotId := NewSlotID(typedChange.Changes[0].Staker, typedChange.Changes[0].Strategy) + shareDiff := typedChange.StateDiffs[0].Event.(*ShareDiff) + assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", shareDiff.Staker) + assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", shareDiff.Strategy) + assert.Equal(t, "-246393621132195985", shareDiff.Shares.String()) - accumulatedState, ok := model.stateAccumulator[originBlockNumber][slotId] + deltas, ok := model.deltaAccumulator[originBlockNumber] assert.True(t, ok) - assert.NotNil(t, accumulatedState) - assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", accumulatedState.Staker) - assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", accumulatedState.Strategy) - assert.Equal(t, "-246393621132195985", accumulatedState.Shares.String()) + assert.NotNil(t, deltas) + assert.Equal(t, 1, len(deltas)) + + delta := deltas[0].Event.(*ShareDiff) + assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", delta.Staker) + assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", delta.Strategy) + assert.Equal(t, "-246393621132195985", delta.Shares.String()) // Insert the other half of the M1 event that captures the withdrawalRoot associated with the M1 withdrawal // No need to process this event, we just need it to be present in the DB @@ -418,7 +430,7 @@ func Test_StakerSharesState(t *testing.T) { change, err = model.HandleStateChange(&withdrawalQueued) assert.Nil(t, err) - assert.Nil(t, change) // should be nil since the handler doesnt care about this event + assert.Nil(t, change.(*AccumulatedStateDiffs).StateDiffs) // should be nil since the handler doesnt care about this event err = model.CommitFinalState(originBlockNumber) assert.Nil(t, err) @@ -458,11 +470,13 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange = change.(*AccumulatedStateChanges) - assert.Equal(t, 1, len(typedChange.Changes)) - assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", typedChange.Changes[0].Staker) - assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", typedChange.Changes[0].Strategy) - assert.Equal(t, "-246393621132195985", typedChange.Changes[0].Shares.String()) + typedChange = change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) + + shareDiff = typedChange.StateDiffs[0].Event.(*ShareDiff) + assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", shareDiff.Staker) + assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", shareDiff.Strategy) + assert.Equal(t, "-246393621132195985", shareDiff.Shares.String()) // M2 WithdrawalMigrated event. Typically occurs in the same block as the M2 WithdrawalQueued event withdrawalMigratedLog := storage.TransactionLog{ @@ -483,20 +497,22 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange = change.(*AccumulatedStateChanges) - assert.Equal(t, 1, len(typedChange.Changes)) - assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", typedChange.Changes[0].Staker) - assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", typedChange.Changes[0].Strategy) - assert.Equal(t, "246393621132195985", typedChange.Changes[0].Shares.String()) + typedChange = change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) - slotId = NewSlotID(typedChange.Changes[0].Staker, typedChange.Changes[0].Strategy) + shareDiff = typedChange.StateDiffs[0].Event.(*ShareDiff) + assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", shareDiff.Staker) + assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", shareDiff.Strategy) + assert.Equal(t, "246393621132195985", shareDiff.Shares.String()) - accumulatedState, ok = model.stateAccumulator[blockNumber][slotId] - assert.True(t, ok) - assert.NotNil(t, accumulatedState) - assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", accumulatedState.Staker) - assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", accumulatedState.Strategy) - assert.Equal(t, "0", accumulatedState.Shares.String()) + deltas = model.deltaAccumulator[originBlockNumber] + assert.NotNil(t, deltas) + assert.Equal(t, 1, len(deltas)) + + delta = deltas[0].Event.(*ShareDiff) + assert.Equal(t, "0x9c01148c464cf06d135ad35d3d633ab4b46b9b78", delta.Staker) + assert.Equal(t, "0x298afb19a105d59e74658c4c334ff360bade6dd2", delta.Strategy) + assert.Equal(t, "-246393621132195985", delta.Shares.String()) err = model.CommitFinalState(blockNumber) assert.Nil(t, err) @@ -546,13 +562,14 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange := change.(*AccumulatedStateChanges) - assert.Equal(t, 1, len(typedChange.Changes)) + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) + shareDiff := typedChange.StateDiffs[0].Event.(*ShareDiff) expectedShares, _ := numbers.NewBig257().SetString("-50000000000000", 10) - assert.Equal(t, expectedShares, typedChange.Changes[0].Shares) - assert.Equal(t, strings.ToLower("0x3c42cd72639e3e8d11ab8d0072cc13bd5d8aa83c"), typedChange.Changes[0].Staker) - assert.Equal(t, "0xd523267698c81a372191136e477fdebfa33d9fb4", typedChange.Changes[0].Strategy) + assert.Equal(t, expectedShares, shareDiff.Shares) + assert.Equal(t, strings.ToLower("0x3c42cd72639e3e8d11ab8d0072cc13bd5d8aa83c"), shareDiff.Staker) + assert.Equal(t, "0xd523267698c81a372191136e477fdebfa33d9fb4", shareDiff.Strategy) teardown(model) }) @@ -584,19 +601,22 @@ func Test_StakerSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, change) - typedChange := change.(*AccumulatedStateChanges) - assert.Equal(t, 2, len(typedChange.Changes)) + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 2, len(typedChange.StateDiffs)) + shareDiff := typedChange.StateDiffs[0].Event.(*ShareDiff) expectedShares, _ := numbers.NewBig257().SetString("-50000000000000", 10) - assert.Equal(t, expectedShares, typedChange.Changes[0].Shares) - assert.Equal(t, strings.ToLower("0x3c42cd72639e3e8d11ab8d0072cc13bd5d8aa83c"), typedChange.Changes[0].Staker) - assert.Equal(t, "0xd523267698c81a372191136e477fdebfa33d9fb4", typedChange.Changes[0].Strategy) + assert.Equal(t, expectedShares, shareDiff.Shares) + assert.Equal(t, strings.ToLower("0x3c42cd72639e3e8d11ab8d0072cc13bd5d8aa83c"), shareDiff.Staker) + assert.Equal(t, "0xd523267698c81a372191136e477fdebfa33d9fb4", shareDiff.Strategy) + shareDiff = typedChange.StateDiffs[1].Event.(*ShareDiff) expectedShares, _ = numbers.NewBig257().SetString("-100000000000000", 10) - assert.Equal(t, expectedShares, typedChange.Changes[1].Shares) - assert.Equal(t, strings.ToLower("0x3c42cd72639e3e8d11ab8d0072cc13bd5d8aa83c"), typedChange.Changes[1].Staker) - assert.Equal(t, "0xe523267698c81a372191136e477fdebfa33d9fb5", typedChange.Changes[1].Strategy) + assert.Equal(t, expectedShares, shareDiff.Shares) + assert.Equal(t, strings.ToLower("0x3c42cd72639e3e8d11ab8d0072cc13bd5d8aa83c"), shareDiff.Staker) + assert.Equal(t, "0xe523267698c81a372191136e477fdebfa33d9fb5", shareDiff.Strategy) teardown(model) }) + }