diff --git a/internal/eigenState/stakerShares/stakerShares.go b/internal/eigenState/stakerShares/stakerShares.go index a218c665..abb380d5 100644 --- a/internal/eigenState/stakerShares/stakerShares.go +++ b/internal/eigenState/stakerShares/stakerShares.go @@ -443,9 +443,14 @@ func (ss *StakerSharesModel) handleSlashingQueuedWithdrawal(log *storage.Transac } type operatorSlashedOutputData struct { - Operator string `json:"operator"` + Operator string `json:"operator"` + // OperatorSet struct { + // Avs string `json:"avs"` + // OperatorSetId uint32 `json:"operatorSetId"` + // } Strategies []string `json:"strategies"` WadsSlashed []json.Number `json:"wadsSlashed"` + // Description string `json:"description"` } func parseLogOutputForOperatorSlashedEvent(outputDataStr string) (*operatorSlashedOutputData, error) { @@ -615,9 +620,9 @@ func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]*ShareDiff, err switch diff.Event.(type) { case *ShareDiff: shareDiff := diff.Event.(*ShareDiff) - slotId := NewShareDiffSlotID(shareDiff.Staker, shareDiff.Strategy) + slotID := NewShareDiffSlotID(shareDiff.Staker, shareDiff.Strategy) - currStakerShares, ok := updatedStakerShares[slotId] + currStakerShares, ok := updatedStakerShares[slotID] if !ok { stakerShares := &StakerShares{} // get the record from the database with the given staker and strategy with the latest blockNumber @@ -648,7 +653,10 @@ func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]*ShareDiff, err currStakerShares.Shares = currStakerShares.Shares.Add(currStakerShares.Shares, shareDiff.Shares) // update the local cache - updatedStakerShares[slotId] = currStakerShares + updatedStakerShares[slotID] = currStakerShares + + ss.logger.Sugar().Infow("Updating staker shares with share diff", zap.String("staker", currStakerShares.Staker), zap.String("strategy", currStakerShares.Strategy), zap.String("shares", currStakerShares.Shares.String()), zap.Uint64("blockNumber", blockNumber)) + case *SlashDiff: slashDiff := diff.Event.(*SlashDiff) query := ` @@ -660,27 +668,33 @@ func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]*ShareDiff, err ROW_NUMBER() OVER (PARTITION BY staker ORDER BY block_number desc, log_index desc) as rn from staker_delegation_changes where - block_number <= @blockNumber + block_number <= @blockNumber and log_index <= @logIndex ), delegated_stakers as ( select lsd.staker - from latest_staker_delegations as lsd + from ranked_staker_delegations as lsd where lsd.operator = @operator and lsd.rn = 1 ), ranked_staker_shares as ( select - ss.staker, + ds.staker, ss.strategy, - ss.shares, - ss.block_number, - ROW_NUMBER() OVER (PARTITION BY ss.staker, ss.strategy ORDER BY ss.block_number desc) as rn - from staker_shares as ss - join delegated_stakers as ds - on ss.staker = ds.staker + COALESCE(ss.shares, 0) as shares, + COALESCE(ss.block_number, 0) as block_number, -- Assign a default block number if needed + ROW_NUMBER() OVER ( + PARTITION BY ds.staker, ss.strategy + ORDER BY ss.block_number desc + ) AS rn + from + delegated_stakers as ds + left join + staker_shares as ss + on ss.staker = ds.staker + and ss.strategy = @strategy ), latest_staker_shares as ( select @@ -688,44 +702,53 @@ func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]*ShareDiff, err ls.strategy, ls.shares, ls.block_number - from latest_staker_shares as ls + from ranked_staker_shares as ls where ls.rn = 1 ) select * from latest_staker_shares - ` stakerShares := make([]StakerShares, 0) // get the staker shares for the stakers who were delegated to the operator // and update the shares with the new max magnitude - res := ss.DB.Model(&StakerShares{}). - Raw(query, - sql.Named("blockNumber", slashDiff.BlockNumber), - sql.Named("logIndex", slashDiff.LogIndex), - sql.Named("operator", slashDiff.Operator), - ).Scan(&updatedStakerShares) - if res.Error != nil { + res := ss.DB.Raw(query, + sql.Named("blockNumber", slashDiff.BlockNumber), + sql.Named("logIndex", slashDiff.LogIndex), + sql.Named("operator", slashDiff.Operator), + sql.Named("strategy", slashDiff.Strategy), + ).Scan(&stakerShares) + if res.Error != nil && res.Error != gorm.ErrRecordNotFound { ss.logger.Sugar().Errorw("Failed to fetch staker_shares", zap.Error(res.Error)) return nil, res.Error } wadsLeft := new(big.Int).Sub(WAD, slashDiff.WadsSlashed) + ss.logger.Info("Updating staker shares with slash diff", zap.String("len", fmt.Sprint(len(stakerShares)))) + for _, stakerSharesRecord := range stakerShares { - slotID := NewShareDiffSlotID(stakerSharesRecord.Staker, stakerSharesRecord.Strategy) + slotID := NewShareDiffSlotID(stakerSharesRecord.Staker, slashDiff.Strategy) var err error currStakerShares, ok := updatedStakerShares[slotID] - if !ok { + // if we have not cached but the staker exists in the database for the strategy + if !ok && stakerSharesRecord.Strategy == slashDiff.Strategy { currStakerShares, err = stakerSharesRecord.ToShareDiff() if err != nil { return nil, err } + } else if !ok { + // otherwise the staker is for a different strategy, so we skip + continue } - // update the shares with wadsLeft currStakerShares.Shares = new(big.Int).Div(new(big.Int).Mul(currStakerShares.Shares, wadsLeft), WAD) + currStakerShares.BlockNumber = blockNumber + + // update the local cache + updatedStakerShares[slotID] = currStakerShares + ss.logger.Sugar().Infow("Updating staker shares with slash diff", zap.String("staker", currStakerShares.Staker), zap.String("strategy", currStakerShares.Strategy), zap.String("shares", currStakerShares.Shares.String()), zap.Uint64("blockNumber", stakerSharesRecord.BlockNumber)) } default: } diff --git a/internal/eigenState/stakerShares/stakerShares_test.go b/internal/eigenState/stakerShares/stakerShares_test.go index 5d2bceb6..a5038f93 100644 --- a/internal/eigenState/stakerShares/stakerShares_test.go +++ b/internal/eigenState/stakerShares/stakerShares_test.go @@ -1,12 +1,15 @@ package stakerShares import ( + "encoding/json" + "fmt" "math/big" "strings" "testing" "time" "github.com/Layr-Labs/go-sidecar/internal/config" + "github.com/Layr-Labs/go-sidecar/internal/eigenState/stakerDelegations" "github.com/Layr-Labs/go-sidecar/internal/eigenState/stateManager" "github.com/Layr-Labs/go-sidecar/internal/logger" "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations" @@ -42,13 +45,15 @@ func setup() ( func teardown(model *StakerSharesModel) { queries := []string{ + `delete from delegated_stakers`, + `delete from staker_delegation_changes`, `delete from staker_shares`, `delete from blocks`, `delete from transactions`, `delete from transaction_logs`, } for _, query := range queries { - model.DB.Raw(query) + model.DB.Exec(query) } } @@ -619,4 +624,265 @@ func Test_StakerSharesState(t *testing.T) { teardown(model) }) + t.Run("Should capture delegate, deposit, slash in same block", func(t *testing.T) { + esm := stateManager.NewEigenStateManager(l, grm) + blockNumber := uint64(200) + + delegationModel, err := stakerDelegations.NewStakerDelegationsModel(esm, grm, l, cfg) + assert.Nil(t, err) + + err = delegationModel.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + _, err = processDelegation(delegationModel, cfg.GetContractsMapForChain().DelegationManager, blockNumber, 300, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", "0xbde83df53bc7d159700e966ad5d21e8b7c619459") + assert.Nil(t, err) + + err = delegationModel.CommitFinalState(blockNumber) + assert.Nil(t, err) + + model, err := NewStakerSharesModel(esm, grm, l, cfg) + assert.Nil(t, err) + + err = model.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + _, err = processDeposit(model, cfg.GetContractsMapForChain().StrategyManager, blockNumber, 400, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", big.NewInt(1e18)) + assert.Nil(t, err) + + change, err := processSlashing(model, cfg.GetContractsMapForChain().AllocationManager, blockNumber, 500, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", big.NewInt(1e17)) + assert.Nil(t, err) + + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) + + slashDiff := typedChange.StateDiffs[0].Event.(*SlashDiff) + assert.Equal(t, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", slashDiff.Operator) + assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", slashDiff.Strategy) + assert.Equal(t, "100000000000000000", slashDiff.WadsSlashed.String()) + + err = model.CommitFinalState(blockNumber) + assert.Nil(t, err) + + query := ` + select * from staker_shares + where block_number = ? + ` + results := []*StakerShares{} + res := model.DB.Raw(query, blockNumber).Scan(&results) + assert.Nil(t, res.Error) + + assert.Equal(t, 1, len(results)) + assert.Equal(t, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", results[0].Staker) + assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", results[0].Strategy) + assert.Equal(t, "900000000000000000", results[0].Shares) + assert.Equal(t, blockNumber, results[0].BlockNumber) + + teardown(model) + }) + + t.Run("Should capture many deposits and slash in same block", func(t *testing.T) { + esm := stateManager.NewEigenStateManager(l, grm) + blockNumber := uint64(200) + + delegationModel, err := stakerDelegations.NewStakerDelegationsModel(esm, grm, l, cfg) + assert.Nil(t, err) + + err = delegationModel.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + _, err = processDelegation(delegationModel, cfg.GetContractsMapForChain().DelegationManager, blockNumber, 300, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", "0xbde83df53bc7d159700e966ad5d21e8b7c619459") + assert.Nil(t, err) + _, err = processDelegation(delegationModel, cfg.GetContractsMapForChain().DelegationManager, blockNumber, 301, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", "0xbde83df53bc7d159700e966ad5d21e8b7c619459") + assert.Nil(t, err) + + err = delegationModel.CommitFinalState(blockNumber) + assert.Nil(t, err) + + model, err := NewStakerSharesModel(esm, grm, l, cfg) + assert.Nil(t, err) + + err = model.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + _, err = processDeposit(model, cfg.GetContractsMapForChain().StrategyManager, blockNumber, 400, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", big.NewInt(1e18)) + assert.Nil(t, err) + _, err = processDeposit(model, cfg.GetContractsMapForChain().StrategyManager, blockNumber, 401, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", big.NewInt(2e18)) + assert.Nil(t, err) + + change, err := processSlashing(model, cfg.GetContractsMapForChain().AllocationManager, blockNumber, 500, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", big.NewInt(1e17)) + assert.Nil(t, err) + + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) + + slashDiff := typedChange.StateDiffs[0].Event.(*SlashDiff) + assert.Equal(t, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", slashDiff.Operator) + assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", slashDiff.Strategy) + assert.Equal(t, "100000000000000000", slashDiff.WadsSlashed.String()) + + err = model.CommitFinalState(blockNumber) + assert.Nil(t, err) + + query := ` + select * from staker_shares + where block_number = ? + ` + results := []*StakerShares{} + res := model.DB.Raw(query, blockNumber).Scan(&results) + assert.Nil(t, res.Error) + + assert.Equal(t, 2, len(results)) + assert.Equal(t, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", results[0].Staker) + assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", results[0].Strategy) + assert.Equal(t, "900000000000000000", results[0].Shares) + assert.Equal(t, blockNumber, results[0].BlockNumber) + + assert.Equal(t, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", results[1].Staker) + assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", results[1].Strategy) + assert.Equal(t, "1800000000000000000", results[1].Shares) + assert.Equal(t, blockNumber, results[1].BlockNumber) + + teardown(model) + }) + + t.Run("Should capture many deposits and slash in a different block", func(t *testing.T) { + esm := stateManager.NewEigenStateManager(l, grm) + blockNumber := uint64(200) + + delegationModel, err := stakerDelegations.NewStakerDelegationsModel(esm, grm, l, cfg) + assert.Nil(t, err) + + err = delegationModel.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + _, err = processDelegation(delegationModel, cfg.GetContractsMapForChain().DelegationManager, blockNumber, 300, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", "0xbde83df53bc7d159700e966ad5d21e8b7c619459") + assert.Nil(t, err) + _, err = processDelegation(delegationModel, cfg.GetContractsMapForChain().DelegationManager, blockNumber, 301, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", "0xbde83df53bc7d159700e966ad5d21e8b7c619459") + assert.Nil(t, err) + + err = delegationModel.CommitFinalState(blockNumber) + assert.Nil(t, err) + + model, err := NewStakerSharesModel(esm, grm, l, cfg) + assert.Nil(t, err) + + err = model.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + _, err = processDeposit(model, cfg.GetContractsMapForChain().StrategyManager, blockNumber, 400, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", big.NewInt(1e18)) + assert.Nil(t, err) + _, err = processDeposit(model, cfg.GetContractsMapForChain().StrategyManager, blockNumber, 401, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", big.NewInt(2e18)) + assert.Nil(t, err) + + err = model.CommitFinalState(blockNumber) + assert.Nil(t, err) + + blockNumber = blockNumber + 1 + err = model.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + change, err := processSlashing(model, cfg.GetContractsMapForChain().AllocationManager, blockNumber, 500, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", big.NewInt(1e17)) + assert.Nil(t, err) + + typedChange := change.(*AccumulatedStateDiffs) + assert.Equal(t, 1, len(typedChange.StateDiffs)) + + slashDiff := typedChange.StateDiffs[0].Event.(*SlashDiff) + assert.Equal(t, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", slashDiff.Operator) + assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", slashDiff.Strategy) + assert.Equal(t, "100000000000000000", slashDiff.WadsSlashed.String()) + + err = model.CommitFinalState(blockNumber) + assert.Nil(t, err) + + query := ` + select * from staker_shares + where block_number = ? + ` + results := []*StakerShares{} + res := model.DB.Raw(query, blockNumber).Scan(&results) + assert.Nil(t, res.Error) + + assert.Equal(t, 2, len(results)) + assert.Equal(t, "0xaf6fb48ac4a60c61a64124ce9dc28f508dc8de8d", results[0].Staker) + assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", results[0].Strategy) + assert.Equal(t, "900000000000000000", results[0].Shares) + assert.Equal(t, blockNumber, results[0].BlockNumber) + + assert.Equal(t, "0xbde83df53bc7d159700e966ad5d21e8b7c619459", results[1].Staker) + assert.Equal(t, "0x7d704507b76571a51d9cae8addabbfd0ba0e63d3", results[1].Strategy) + assert.Equal(t, "1800000000000000000", results[1].Shares) + assert.Equal(t, blockNumber, results[1].BlockNumber) + + teardown(model) + }) + +} + +func processDelegation(delegationModel *stakerDelegations.StakerDelegationsModel, delegationManager string, blockNumber, logIndex uint64, staker, operator string) (interface{}, error) { + delegateLog := storage.TransactionLog{ + TransactionHash: "some hash", + TransactionIndex: 100, + BlockNumber: blockNumber, + Address: delegationManager, + Arguments: fmt.Sprintf(`[{"Name":"staker","Type":"address","Value":"%s","Indexed":true},{"Name":"operator","Type":"address","Value":"%s","Indexed":true}]`, staker, operator), + EventName: "StakerDelegated", + LogIndex: logIndex, + OutputData: `{}`, + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + DeletedAt: time.Time{}, + } + + return delegationModel.HandleStateChange(&delegateLog) +} + +func processDeposit(stakerSharesModel *StakerSharesModel, strategyManager string, blockNumber, logIndex uint64, staker, strategy string, shares *big.Int) (interface{}, error) { + depositLog := storage.TransactionLog{ + TransactionHash: "some hash", + TransactionIndex: 100, + BlockNumber: blockNumber, + Address: strategyManager, + Arguments: `[{"Name": "staker", "Type": "address", "Value": ""}, {"Name": "token", "Type": "address", "Value": ""}, {"Name": "strategy", "Type": "address", "Value": ""}, {"Name": "shares", "Type": "uint256", "Value": ""}]`, + EventName: "Deposit", + LogIndex: logIndex, + OutputData: fmt.Sprintf(`{"token": "%s", "shares": %s, "staker": "%s", "strategy": "%s"}`, strategy, shares.String(), staker, strategy), + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + DeletedAt: time.Time{}, + } + + return stakerSharesModel.HandleStateChange(&depositLog) +} + +func processSlashing(stakerSharesModel *StakerSharesModel, allocationManager string, blockNumber, logIndex uint64, operator, strategy string, wadsSlashed *big.Int) (interface{}, error) { + operatorSlashedEvent := operatorSlashedOutputData{ + Operator: operator, + Strategies: []string{ + strategy, + }, + WadsSlashed: []json.Number{ + json.Number(wadsSlashed.String()), + }, + } + operatorJson, err := json.Marshal(operatorSlashedEvent) + if err != nil { + return nil, err + } + + slashingLog := storage.TransactionLog{ + TransactionHash: "some hash", + TransactionIndex: 100, + BlockNumber: blockNumber, + Address: allocationManager, + Arguments: ``, + EventName: "OperatorSlashed", + LogIndex: logIndex, + OutputData: string(operatorJson), + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + DeletedAt: time.Time{}, + } + + return stakerSharesModel.HandleStateChange(&slashingLog) }