Skip to content

Commit

Permalink
Add delta records for staker shares
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Oct 30, 2024
1 parent 83213bc commit 4ebd2e6
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 46 deletions.
141 changes: 108 additions & 33 deletions internal/eigenState/stakerShares/stakerShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math/big"
"slices"
Expand Down Expand Up @@ -48,6 +49,19 @@ type StakerSharesDiff struct {
IsNew bool
}

// Table staker_share_deltas
type StakerShareDeltas struct {
Staker string
Strategy string
Shares string
StrategyIndex uint64
TransactionHash string
LogIndex uint64
BlockTime time.Time
BlockDate string
BlockNumber uint64
}

func NewSlotID(staker string, strategy string) types.SlotID {
return types.SlotID(fmt.Sprintf("%s_%s", staker, strategy))
}
Expand All @@ -61,6 +75,8 @@ type StakerSharesModel struct {

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange

deltaAccumulator map[uint64][]*StakerShareDeltas
}

func NewStakerSharesModel(
Expand All @@ -75,6 +91,7 @@ func NewStakerSharesModel(
logger: logger,
globalConfig: globalConfig,
stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange),
deltaAccumulator: make(map[uint64][]*StakerShareDeltas),
}

esm.RegisterState(model, 3)
Expand Down Expand Up @@ -111,7 +128,7 @@ func parseLogOutputForDepositEvent(outputDataStr string) (*depositOutputData, er
return outputData, err
}

func (ss *StakerSharesModel) handleStakerDepositEvent(log *storage.TransactionLog) (*AccumulatedStateChange, error) {
func (ss *StakerSharesModel) handleStakerDepositEvent(log *storage.TransactionLog) (*StakerShareDeltas, error) {
outputData, err := parseLogOutputForDepositEvent(log.OutputData)
if err != nil {
return nil, err
Expand All @@ -134,11 +151,14 @@ func (ss *StakerSharesModel) handleStakerDepositEvent(log *storage.TransactionLo
return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.Shares)
}

return &AccumulatedStateChange{
Staker: stakerAddress,
Strategy: outputData.Strategy,
Shares: shares,
BlockNumber: log.BlockNumber,
return &StakerShareDeltas{
Staker: stakerAddress,
Strategy: outputData.Strategy,
Shares: shares.String(),
StrategyIndex: uint64(0),
LogIndex: log.LogIndex,
TransactionHash: log.TransactionHash,
BlockNumber: log.BlockNumber,
}, nil
}

Expand All @@ -158,7 +178,7 @@ func parseLogOutputForPodSharesUpdatedEvent(outputDataStr string) (*podSharesUpd
return outputData, err
}

func (ss *StakerSharesModel) handlePodSharesUpdatedEvent(log *storage.TransactionLog) (*AccumulatedStateChange, error) {
func (ss *StakerSharesModel) handlePodSharesUpdatedEvent(log *storage.TransactionLog) (*StakerShareDeltas, error) {
arguments, err := ss.ParseLogArguments(log)
if err != nil {
return nil, err
Expand All @@ -177,15 +197,18 @@ func (ss *StakerSharesModel) handlePodSharesUpdatedEvent(log *storage.Transactio
return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", sharesDelta)
}

return &AccumulatedStateChange{
Staker: staker,
Strategy: "0xbeac0eeeeeeeeeeeeeeeeeeeeeeeeeeeeeebeac0",
Shares: sharesDelta,
BlockNumber: log.BlockNumber,
return &StakerShareDeltas{
Staker: staker,
Strategy: "0xbeac0eeeeeeeeeeeeeeeeeeeeeeeeeeeeeebeac0",
Shares: sharesDelta.String(),
StrategyIndex: uint64(0),
LogIndex: log.LogIndex,
TransactionHash: log.TransactionHash,
BlockNumber: log.BlockNumber,
}, nil
}

func (ss *StakerSharesModel) handleM1StakerWithdrawals(log *storage.TransactionLog) (*AccumulatedStateChange, error) {
func (ss *StakerSharesModel) handleM1StakerWithdrawals(log *storage.TransactionLog) (*StakerShareDeltas, error) {
outputData, err := parseLogOutputForDepositEvent(log.OutputData)
if err != nil {
return nil, err
Expand All @@ -208,11 +231,14 @@ func (ss *StakerSharesModel) handleM1StakerWithdrawals(log *storage.TransactionL
return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.Shares)
}

return &AccumulatedStateChange{
Staker: stakerAddress,
Strategy: outputData.Strategy,
Shares: shares.Mul(shares, big.NewInt(-1)),
BlockNumber: log.BlockNumber,
return &StakerShareDeltas{
Staker: stakerAddress,
Strategy: outputData.Strategy,
Shares: shares.Mul(shares, big.NewInt(-1)).String(),
StrategyIndex: uint64(0),
LogIndex: log.LogIndex,
TransactionHash: log.TransactionHash,
BlockNumber: log.BlockNumber,
}, nil
}

Expand All @@ -239,7 +265,7 @@ func parseLogOutputForM2MigrationEvent(outputDataStr string) (*m2MigrationOutput
// Since we have already counted M1 withdrawals due to processing events block-by-block, we need to handle not double subtracting.
// Assuming that M2 WithdrawalQueued events always result in a subtraction, if we encounter a migration event, we need
// to add the amount back to the shares to get the correct final state.
func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.TransactionLog) ([]*AccumulatedStateChange, error) {
func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.TransactionLog) ([]*StakerShareDeltas, error) {
outputData, err := parseLogOutputForM2MigrationEvent(log.OutputData)
if err != nil {
return nil, err
Expand Down Expand Up @@ -290,7 +316,7 @@ func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.Tran
return nil, res.Error
}

changes := make([]*AccumulatedStateChange, 0)
changes := make([]*StakerShareDeltas, 0)
for _, l := range logs {
c, err := ss.handleStakerDepositEvent(&l)
if err != nil {
Expand Down Expand Up @@ -329,24 +355,27 @@ func parseLogOutputForM2WithdrawalEvent(outputDataStr string) (*m2WithdrawalOutp
}

// handleM2QueuedWithdrawal handles the WithdrawalQueued event from the DelegationManager contract for M2.
func (ss *StakerSharesModel) handleM2QueuedWithdrawal(log *storage.TransactionLog) ([]*AccumulatedStateChange, error) {
func (ss *StakerSharesModel) handleM2QueuedWithdrawal(log *storage.TransactionLog) ([]*StakerShareDeltas, error) {
outputData, err := parseLogOutputForM2WithdrawalEvent(log.OutputData)
if err != nil {
return nil, err
}

records := make([]*AccumulatedStateChange, 0)
records := make([]*StakerShareDeltas, 0)

for i, strategy := range outputData.Withdrawal.Strategies {
shares, success := numbers.NewBig257().SetString(outputData.Withdrawal.Shares[i].String(), 10)
if !success {
return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.Withdrawal.Shares[i])
}
r := &AccumulatedStateChange{
Staker: outputData.Withdrawal.Staker,
Strategy: strategy,
Shares: shares.Mul(shares, big.NewInt(-1)),
BlockNumber: log.BlockNumber,
r := &StakerShareDeltas{
Staker: outputData.Withdrawal.Staker,
Strategy: strategy,
Shares: shares.Mul(shares, big.NewInt(-1)).String(),
StrategyIndex: uint64(i),
LogIndex: log.LogIndex,
TransactionHash: log.TransactionHash,
BlockNumber: log.BlockNumber,
}
records = append(records, r)
}
Expand All @@ -357,10 +386,21 @@ type AccumulatedStateChanges struct {
Changes []*AccumulatedStateChange
}

func shareDeltaToAccumulatedStateChange(deltaRecord *StakerShareDeltas) *AccumulatedStateChange {
shares, _ := numbers.NewBig257().SetString(deltaRecord.Shares, 10)
return &AccumulatedStateChange{
Staker: deltaRecord.Staker,
Strategy: deltaRecord.Strategy,
Shares: shares,
BlockNumber: deltaRecord.BlockNumber,
}
}

func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[AccumulatedStateChanges], []uint64) {
stateChanges := make(types.StateTransitions[AccumulatedStateChanges])

stateChanges[0] = func(log *storage.TransactionLog) (*AccumulatedStateChanges, error) {
deltaRecords := make([]*StakerShareDeltas, 0)
var parsedRecords []*AccumulatedStateChange
var err error

Expand All @@ -371,27 +411,37 @@ func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[Accum
if log.Address == contractAddresses.StrategyManager && log.EventName == "Deposit" {
record, err := ss.handleStakerDepositEvent(log)
if err == nil {
parsedRecords = append(parsedRecords, record)
deltaRecords = append(deltaRecords, record)
parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record))
}
} else if log.Address == contractAddresses.EigenpodManager && log.EventName == "PodSharesUpdated" {
record, err := ss.handlePodSharesUpdatedEvent(log)
if err == nil {
parsedRecords = append(parsedRecords, record)
deltaRecords = append(deltaRecords, record)
parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record))
}
} else if log.Address == contractAddresses.StrategyManager && log.EventName == "ShareWithdrawalQueued" && log.TransactionHash != "0x62eb0d0865b2636c74ed146e2d161e39e42b09bac7f86b8905fc7a830935dc1e" {
record, err := ss.handleM1StakerWithdrawals(log)
if err == nil {
parsedRecords = append(parsedRecords, record)
deltaRecords = append(deltaRecords, record)
parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record))
}
} else if log.Address == contractAddresses.DelegationManager && log.EventName == "WithdrawalQueued" {
records, err := ss.handleM2QueuedWithdrawal(log)
if err == nil && records != nil {
parsedRecords = append(parsedRecords, records...)
deltaRecords = append(deltaRecords, records...)
for _, record := range records {
parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record))
}
}
} else if log.Address == contractAddresses.DelegationManager && log.EventName == "WithdrawalMigrated" {
records, err := ss.handleMigratedM2StakerWithdrawals(log)
if err == nil {
parsedRecords = append(parsedRecords, records...)
// NOTE: we DONT add these to the delta table because they've already been handled

for _, record := range records {
parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record))
}
}
} else {
ss.logger.Sugar().Debugw("Got stakerShares event that we don't handle",
Expand All @@ -402,7 +452,7 @@ func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[Accum
if err != nil {
return nil, err
}
if parsedRecords == nil {
if deltaRecords == nil {
return nil, nil
}

Expand All @@ -424,6 +474,7 @@ func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[Accum
record.Shares = record.Shares.Add(record.Shares, parsedRecord.Shares)
}
}
ss.deltaAccumulator[log.BlockNumber] = append(ss.deltaAccumulator[log.BlockNumber], deltaRecords...)

return &AccumulatedStateChanges{Changes: parsedRecords}, nil
}
Expand Down Expand Up @@ -465,11 +516,13 @@ func (ss *StakerSharesModel) IsInterestingLog(log *storage.TransactionLog) bool

func (ss *StakerSharesModel) SetupStateForBlock(blockNumber uint64) error {
ss.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange)
ss.deltaAccumulator[blockNumber] = make([]*StakerShareDeltas, 0)
return nil
}

func (ss *StakerSharesModel) CleanupProcessedStateForBlock(blockNumber uint64) error {
delete(ss.stateAccumulator, blockNumber)
delete(ss.deltaAccumulator, blockNumber)
return nil
}

Expand Down Expand Up @@ -578,6 +631,24 @@ func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]*StakerSharesDi
return preparedState, nil
}

func (ss *StakerSharesModel) writeDeltaRecordsToDeltaTable(blockNumber uint64) error {
records, ok := ss.deltaAccumulator[blockNumber]
if !ok {
msg := "delta accumulator was not initialized"
ss.logger.Sugar().Errorw(msg, zap.Uint64("blockNumber", blockNumber))
return errors.New(msg)
}

if len(records) > 0 {
res := ss.DB.Model(&StakerShareDeltas{}).Clauses(clause.Returning{}).Create(&records)
if res.Error != nil {
ss.logger.Sugar().Errorw("Failed to insert delta records", zap.Error(res.Error))
return res.Error
}
}
return nil
}

func (ss *StakerSharesModel) CommitFinalState(blockNumber uint64) error {
records, err := ss.prepareState(blockNumber)
if err != nil {
Expand All @@ -601,6 +672,10 @@ func (ss *StakerSharesModel) CommitFinalState(blockNumber uint64) error {
}
}

if err = ss.writeDeltaRecordsToDeltaTable(blockNumber); err != nil {
return err
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/eigenState/stakerShares/stakerShares_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func Test_StakerSharesState(t *testing.T) {

change, err = model.HandleStateChange(&withdrawalQueued)
assert.Nil(t, err)
assert.Nil(t, change) // should be nil since the handler doesnt care about this event
assert.NotNil(t, change)

err = model.CommitFinalState(originBlockNumber)
assert.Nil(t, err)
Expand Down
12 changes: 0 additions & 12 deletions pkg/rewards/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,3 @@ type StakerShareSnapshot struct {
Snapshot time.Time
Shares string
}

type StakerShareDeltas struct {
Staker string
Strategy string
Shares string
StrategyIndex string
TransactionHash string
LogIndex string
BlockTime time.Time
BlockDate string
BlockNumber uint64
}

0 comments on commit 4ebd2e6

Please sign in to comment.