diff --git a/internal/eigenState/stakerShares/stakerShares.go b/internal/eigenState/stakerShares/stakerShares.go index 70f96444..8021ee41 100644 --- a/internal/eigenState/stakerShares/stakerShares.go +++ b/internal/eigenState/stakerShares/stakerShares.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/hex" "encoding/json" + "errors" "fmt" "math/big" "slices" @@ -48,6 +49,19 @@ type StakerSharesDiff struct { IsNew bool } +// Table staker_share_deltas +type StakerShareDeltas struct { + Staker string + Strategy string + Shares string + StrategyIndex uint64 + TransactionHash string + LogIndex uint64 + BlockTime time.Time + BlockDate string + BlockNumber uint64 +} + func NewSlotID(staker string, strategy string) types.SlotID { return types.SlotID(fmt.Sprintf("%s_%s", staker, strategy)) } @@ -61,6 +75,8 @@ type StakerSharesModel struct { // Accumulates state changes for SlotIds, grouped by block number stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange + + deltaAccumulator map[uint64][]*StakerShareDeltas } func NewStakerSharesModel( @@ -75,6 +91,7 @@ func NewStakerSharesModel( logger: logger, globalConfig: globalConfig, stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange), + deltaAccumulator: make(map[uint64][]*StakerShareDeltas), } esm.RegisterState(model, 3) @@ -111,7 +128,7 @@ func parseLogOutputForDepositEvent(outputDataStr string) (*depositOutputData, er return outputData, err } -func (ss *StakerSharesModel) handleStakerDepositEvent(log *storage.TransactionLog) (*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handleStakerDepositEvent(log *storage.TransactionLog) (*StakerShareDeltas, error) { outputData, err := parseLogOutputForDepositEvent(log.OutputData) if err != nil { return nil, err @@ -134,11 +151,14 @@ func (ss *StakerSharesModel) handleStakerDepositEvent(log *storage.TransactionLo return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.Shares) } - return &AccumulatedStateChange{ - Staker: stakerAddress, - Strategy: outputData.Strategy, - Shares: shares, - BlockNumber: log.BlockNumber, + return &StakerShareDeltas{ + Staker: stakerAddress, + Strategy: outputData.Strategy, + Shares: shares.String(), + StrategyIndex: uint64(0), + LogIndex: log.LogIndex, + TransactionHash: log.TransactionHash, + BlockNumber: log.BlockNumber, }, nil } @@ -158,7 +178,7 @@ func parseLogOutputForPodSharesUpdatedEvent(outputDataStr string) (*podSharesUpd return outputData, err } -func (ss *StakerSharesModel) handlePodSharesUpdatedEvent(log *storage.TransactionLog) (*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handlePodSharesUpdatedEvent(log *storage.TransactionLog) (*StakerShareDeltas, error) { arguments, err := ss.ParseLogArguments(log) if err != nil { return nil, err @@ -177,15 +197,18 @@ func (ss *StakerSharesModel) handlePodSharesUpdatedEvent(log *storage.Transactio return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", sharesDelta) } - return &AccumulatedStateChange{ - Staker: staker, - Strategy: "0xbeac0eeeeeeeeeeeeeeeeeeeeeeeeeeeeeebeac0", - Shares: sharesDelta, - BlockNumber: log.BlockNumber, + return &StakerShareDeltas{ + Staker: staker, + Strategy: "0xbeac0eeeeeeeeeeeeeeeeeeeeeeeeeeeeeebeac0", + Shares: sharesDelta.String(), + StrategyIndex: uint64(0), + LogIndex: log.LogIndex, + TransactionHash: log.TransactionHash, + BlockNumber: log.BlockNumber, }, nil } -func (ss *StakerSharesModel) handleM1StakerWithdrawals(log *storage.TransactionLog) (*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handleM1StakerWithdrawals(log *storage.TransactionLog) (*StakerShareDeltas, error) { outputData, err := parseLogOutputForDepositEvent(log.OutputData) if err != nil { return nil, err @@ -208,11 +231,14 @@ func (ss *StakerSharesModel) handleM1StakerWithdrawals(log *storage.TransactionL return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.Shares) } - return &AccumulatedStateChange{ - Staker: stakerAddress, - Strategy: outputData.Strategy, - Shares: shares.Mul(shares, big.NewInt(-1)), - BlockNumber: log.BlockNumber, + return &StakerShareDeltas{ + Staker: stakerAddress, + Strategy: outputData.Strategy, + Shares: shares.Mul(shares, big.NewInt(-1)).String(), + StrategyIndex: uint64(0), + LogIndex: log.LogIndex, + TransactionHash: log.TransactionHash, + BlockNumber: log.BlockNumber, }, nil } @@ -239,7 +265,7 @@ func parseLogOutputForM2MigrationEvent(outputDataStr string) (*m2MigrationOutput // Since we have already counted M1 withdrawals due to processing events block-by-block, we need to handle not double subtracting. // Assuming that M2 WithdrawalQueued events always result in a subtraction, if we encounter a migration event, we need // to add the amount back to the shares to get the correct final state. -func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.TransactionLog) ([]*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.TransactionLog) ([]*StakerShareDeltas, error) { outputData, err := parseLogOutputForM2MigrationEvent(log.OutputData) if err != nil { return nil, err @@ -290,7 +316,7 @@ func (ss *StakerSharesModel) handleMigratedM2StakerWithdrawals(log *storage.Tran return nil, res.Error } - changes := make([]*AccumulatedStateChange, 0) + changes := make([]*StakerShareDeltas, 0) for _, l := range logs { c, err := ss.handleStakerDepositEvent(&l) if err != nil { @@ -329,24 +355,27 @@ func parseLogOutputForM2WithdrawalEvent(outputDataStr string) (*m2WithdrawalOutp } // handleM2QueuedWithdrawal handles the WithdrawalQueued event from the DelegationManager contract for M2. -func (ss *StakerSharesModel) handleM2QueuedWithdrawal(log *storage.TransactionLog) ([]*AccumulatedStateChange, error) { +func (ss *StakerSharesModel) handleM2QueuedWithdrawal(log *storage.TransactionLog) ([]*StakerShareDeltas, error) { outputData, err := parseLogOutputForM2WithdrawalEvent(log.OutputData) if err != nil { return nil, err } - records := make([]*AccumulatedStateChange, 0) + records := make([]*StakerShareDeltas, 0) for i, strategy := range outputData.Withdrawal.Strategies { shares, success := numbers.NewBig257().SetString(outputData.Withdrawal.Shares[i].String(), 10) if !success { return nil, xerrors.Errorf("Failed to convert shares to big.Int: %s", outputData.Withdrawal.Shares[i]) } - r := &AccumulatedStateChange{ - Staker: outputData.Withdrawal.Staker, - Strategy: strategy, - Shares: shares.Mul(shares, big.NewInt(-1)), - BlockNumber: log.BlockNumber, + r := &StakerShareDeltas{ + Staker: outputData.Withdrawal.Staker, + Strategy: strategy, + Shares: shares.Mul(shares, big.NewInt(-1)).String(), + StrategyIndex: uint64(i), + LogIndex: log.LogIndex, + TransactionHash: log.TransactionHash, + BlockNumber: log.BlockNumber, } records = append(records, r) } @@ -357,10 +386,21 @@ type AccumulatedStateChanges struct { Changes []*AccumulatedStateChange } +func shareDeltaToAccumulatedStateChange(deltaRecord *StakerShareDeltas) *AccumulatedStateChange { + shares, _ := numbers.NewBig257().SetString(deltaRecord.Shares, 10) + return &AccumulatedStateChange{ + Staker: deltaRecord.Staker, + Strategy: deltaRecord.Strategy, + Shares: shares, + BlockNumber: deltaRecord.BlockNumber, + } +} + func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[AccumulatedStateChanges], []uint64) { stateChanges := make(types.StateTransitions[AccumulatedStateChanges]) stateChanges[0] = func(log *storage.TransactionLog) (*AccumulatedStateChanges, error) { + deltaRecords := make([]*StakerShareDeltas, 0) var parsedRecords []*AccumulatedStateChange var err error @@ -371,27 +411,37 @@ func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[Accum if log.Address == contractAddresses.StrategyManager && log.EventName == "Deposit" { record, err := ss.handleStakerDepositEvent(log) if err == nil { - parsedRecords = append(parsedRecords, record) + deltaRecords = append(deltaRecords, record) + parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record)) } } else if log.Address == contractAddresses.EigenpodManager && log.EventName == "PodSharesUpdated" { record, err := ss.handlePodSharesUpdatedEvent(log) if err == nil { - parsedRecords = append(parsedRecords, record) + deltaRecords = append(deltaRecords, record) + parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record)) } } else if log.Address == contractAddresses.StrategyManager && log.EventName == "ShareWithdrawalQueued" && log.TransactionHash != "0x62eb0d0865b2636c74ed146e2d161e39e42b09bac7f86b8905fc7a830935dc1e" { record, err := ss.handleM1StakerWithdrawals(log) if err == nil { - parsedRecords = append(parsedRecords, record) + deltaRecords = append(deltaRecords, record) + parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record)) } } else if log.Address == contractAddresses.DelegationManager && log.EventName == "WithdrawalQueued" { records, err := ss.handleM2QueuedWithdrawal(log) if err == nil && records != nil { - parsedRecords = append(parsedRecords, records...) + deltaRecords = append(deltaRecords, records...) + for _, record := range records { + parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record)) + } } } else if log.Address == contractAddresses.DelegationManager && log.EventName == "WithdrawalMigrated" { records, err := ss.handleMigratedM2StakerWithdrawals(log) if err == nil { - parsedRecords = append(parsedRecords, records...) + // NOTE: we DONT add these to the delta table because they've already been handled + + for _, record := range records { + parsedRecords = append(parsedRecords, shareDeltaToAccumulatedStateChange(record)) + } } } else { ss.logger.Sugar().Debugw("Got stakerShares event that we don't handle", @@ -402,7 +452,7 @@ func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[Accum if err != nil { return nil, err } - if parsedRecords == nil { + if deltaRecords == nil { return nil, nil } @@ -424,6 +474,7 @@ func (ss *StakerSharesModel) GetStateTransitions() (types.StateTransitions[Accum record.Shares = record.Shares.Add(record.Shares, parsedRecord.Shares) } } + ss.deltaAccumulator[log.BlockNumber] = append(ss.deltaAccumulator[log.BlockNumber], deltaRecords...) return &AccumulatedStateChanges{Changes: parsedRecords}, nil } @@ -465,11 +516,13 @@ func (ss *StakerSharesModel) IsInterestingLog(log *storage.TransactionLog) bool func (ss *StakerSharesModel) SetupStateForBlock(blockNumber uint64) error { ss.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange) + ss.deltaAccumulator[blockNumber] = make([]*StakerShareDeltas, 0) return nil } func (ss *StakerSharesModel) CleanupProcessedStateForBlock(blockNumber uint64) error { delete(ss.stateAccumulator, blockNumber) + delete(ss.deltaAccumulator, blockNumber) return nil } @@ -578,6 +631,24 @@ func (ss *StakerSharesModel) prepareState(blockNumber uint64) ([]*StakerSharesDi return preparedState, nil } +func (ss *StakerSharesModel) writeDeltaRecordsToDeltaTable(blockNumber uint64) error { + records, ok := ss.deltaAccumulator[blockNumber] + if !ok { + msg := "delta accumulator was not initialized" + ss.logger.Sugar().Errorw(msg, zap.Uint64("blockNumber", blockNumber)) + return errors.New(msg) + } + + if len(records) > 0 { + res := ss.DB.Model(&StakerShareDeltas{}).Clauses(clause.Returning{}).Create(&records) + if res.Error != nil { + ss.logger.Sugar().Errorw("Failed to insert delta records", zap.Error(res.Error)) + return res.Error + } + } + return nil +} + func (ss *StakerSharesModel) CommitFinalState(blockNumber uint64) error { records, err := ss.prepareState(blockNumber) if err != nil { @@ -601,6 +672,10 @@ func (ss *StakerSharesModel) CommitFinalState(blockNumber uint64) error { } } + if err = ss.writeDeltaRecordsToDeltaTable(blockNumber); err != nil { + return err + } + return nil } diff --git a/internal/eigenState/stakerShares/stakerShares_test.go b/internal/eigenState/stakerShares/stakerShares_test.go index 1aecd889..a8791251 100644 --- a/internal/eigenState/stakerShares/stakerShares_test.go +++ b/internal/eigenState/stakerShares/stakerShares_test.go @@ -426,7 +426,7 @@ func Test_StakerSharesState(t *testing.T) { change, err = model.HandleStateChange(&withdrawalQueued) assert.Nil(t, err) - assert.Nil(t, change) // should be nil since the handler doesnt care about this event + assert.NotNil(t, change) err = model.CommitFinalState(originBlockNumber) assert.Nil(t, err) diff --git a/pkg/rewards/tables.go b/pkg/rewards/tables.go index 2b11e113..cd7434e8 100644 --- a/pkg/rewards/tables.go +++ b/pkg/rewards/tables.go @@ -53,15 +53,3 @@ type StakerShareSnapshot struct { Snapshot time.Time Shares string } - -type StakerShareDeltas struct { - Staker string - Strategy string - Shares string - StrategyIndex string - TransactionHash string - LogIndex string - BlockTime time.Time - BlockDate string - BlockNumber uint64 -}