diff --git a/internal/eigenState/avsOperators/avsOperators.go b/internal/eigenState/avsOperators/avsOperators.go index edd8361a..24bb9e12 100644 --- a/internal/eigenState/avsOperators/avsOperators.go +++ b/internal/eigenState/avsOperators/avsOperators.go @@ -155,6 +155,10 @@ func (a *AvsOperators) IsInterestingLog(log *storage.TransactionLog) bool { return a.BaseEigenState.IsInterestingLog(addresses, log) } +func (a *AvsOperators) StartBlockProcessing(blockNumber uint64) error { + return nil +} + // Handle the state change for the given log // // Takes a log and iterates over the state transitions to determine which state change to apply based on block number. @@ -201,7 +205,7 @@ func (a *AvsOperators) writeStateChange(change *AvsOperatorChange) (*AvsOperator // 4. Determine which rows from the previous block should be carried over and which shouldnt (i.e. deregistrations) // 5. Geneate the final state by unioning the carryover and the new registrations // 6. Insert the final state into the registered_avs_operators table -func (a *AvsOperators) WriteFinalState(blockNumber uint64) error { +func (a *AvsOperators) CommitFinalState(blockNumber uint64) error { query := ` with new_changes as ( select @@ -315,6 +319,10 @@ func (a *AvsOperators) getDifferenceInStates(blockNumber uint64) ([]RegisteredAv return results, nil } +func (a *AvsOperators) ClearAccumulatedState(blockNumber uint64) error { + panic("implement me") +} + // Generates a state root for the given block number. // // 1. Select all registered_avs_operators for the given block number ordered by avs and operator asc diff --git a/internal/eigenState/avsOperators/avsOperators_test.go b/internal/eigenState/avsOperators/avsOperators_test.go index 14233343..beff1aaa 100644 --- a/internal/eigenState/avsOperators/avsOperators_test.go +++ b/internal/eigenState/avsOperators/avsOperators_test.go @@ -102,7 +102,7 @@ func Test_AvsOperatorState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, stateChange) - err = avsOperatorState.WriteFinalState(blockNumber) + err = avsOperatorState.CommitFinalState(blockNumber) assert.Nil(t, err) states := []RegisteredAvsOperators{} @@ -170,7 +170,7 @@ func Test_AvsOperatorState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, stateChange) - err = avsOperatorState.WriteFinalState(log.BlockNumber) + err = avsOperatorState.CommitFinalState(log.BlockNumber) assert.Nil(t, err) states := []RegisteredAvsOperators{} diff --git a/internal/eigenState/operatorShares/operatorShares.go b/internal/eigenState/operatorShares/operatorShares.go index 49b6ba16..fd85b3f7 100644 --- a/internal/eigenState/operatorShares/operatorShares.go +++ b/internal/eigenState/operatorShares/operatorShares.go @@ -155,6 +155,10 @@ func (osm *OperatorSharesModel) IsInterestingLog(log *storage.TransactionLog) bo return osm.BaseEigenState.IsInterestingLog(addresses, log) } +func (osm *OperatorSharesModel) StartBlockProcessing(blockNumber uint64) error { + return nil +} + func (osm *OperatorSharesModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) { stateChanges, sortedBlockNumbers := osm.GetStateTransitions() @@ -189,7 +193,7 @@ func (osm *OperatorSharesModel) writeStateChange(change *OperatorShareChange) (i return change, nil } -func (osm *OperatorSharesModel) WriteFinalState(blockNumber uint64) error { +func (osm *OperatorSharesModel) CommitFinalState(blockNumber uint64) error { query := ` with new_sum as ( select @@ -281,6 +285,10 @@ func (osm *OperatorSharesModel) getDifferencesInStates(currentBlock uint64) ([]O return diffs, nil } +func (osm *OperatorSharesModel) ClearAccumulatedState(blockNumber uint64) error { + panic("implement me") +} + func (osm *OperatorSharesModel) GenerateStateRoot(blockNumber uint64) (types.StateRoot, error) { diffs, err := osm.getDifferencesInStates(blockNumber) if err != nil { diff --git a/internal/eigenState/operatorShares/operatorShares_test.go b/internal/eigenState/operatorShares/operatorShares_test.go index 12ec97b4..37eca7ca 100644 --- a/internal/eigenState/operatorShares/operatorShares_test.go +++ b/internal/eigenState/operatorShares/operatorShares_test.go @@ -99,7 +99,7 @@ func Test_OperatorSharesState(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, stateChange) - err = model.WriteFinalState(blockNumber) + err = model.CommitFinalState(blockNumber) assert.Nil(t, err) states := []OperatorShares{} diff --git a/internal/eigenState/stakerDelegations/stakerDelegations.go b/internal/eigenState/stakerDelegations/stakerDelegations.go index 141a7f24..e03ec047 100644 --- a/internal/eigenState/stakerDelegations/stakerDelegations.go +++ b/internal/eigenState/stakerDelegations/stakerDelegations.go @@ -13,6 +13,7 @@ import ( "github.com/wealdtech/go-merkletree/v2/keccak256" orderedmap "github.com/wk8/go-ordered-map/v2" "go.uber.org/zap" + "golang.org/x/xerrors" "gorm.io/gorm" "gorm.io/gorm/clause" "slices" @@ -39,6 +40,19 @@ type StakerDelegationChange struct { CreatedAt time.Time } +type AccumulatedStateChange struct { + Staker string + Operator string + BlockNumber uint64 + Delegated bool +} + +type SlotId string + +func NewSlotId(staker string, operator string) SlotId { + return SlotId(fmt.Sprintf("%s_%s", staker, operator)) +} + type StakerDelegationsModel struct { base.BaseEigenState StateTransitions types.StateTransitions[StakerDelegationChange] @@ -47,6 +61,9 @@ type StakerDelegationsModel struct { Environment config.Environment logger *zap.Logger globalConfig *config.Config + + // Accumulates state changes for SlotIds, grouped by block number + stateAccumulator map[uint64]map[SlotId]*AccumulatedStateChange } type DelegatedStakersDiff struct { @@ -68,11 +85,12 @@ func NewStakerDelegationsModel( BaseEigenState: base.BaseEigenState{ Logger: logger, }, - Db: grm, - Network: Network, - Environment: Environment, - logger: logger, - globalConfig: globalConfig, + Db: grm, + Network: Network, + Environment: Environment, + logger: logger, + globalConfig: globalConfig, + stateAccumulator: make(map[uint64]map[SlotId]*AccumulatedStateChange), } esm.RegisterState(model, 2) @@ -83,31 +101,48 @@ func (s *StakerDelegationsModel) GetModelName() string { return "StakerDelegationsModel" } -func (s *StakerDelegationsModel) GetStateTransitions() (types.StateTransitions[StakerDelegationChange], []uint64) { - stateChanges := make(types.StateTransitions[StakerDelegationChange]) +func (s *StakerDelegationsModel) GetStateTransitions() (types.StateTransitions[AccumulatedStateChange], []uint64) { + stateChanges := make(types.StateTransitions[AccumulatedStateChange]) - stateChanges[0] = func(log *storage.TransactionLog) (*StakerDelegationChange, error) { + stateChanges[0] = func(log *storage.TransactionLog) (*AccumulatedStateChange, error) { arguments, err := s.ParseLogArguments(log) if err != nil { return nil, err } - delegated := true - if log.EventName == "StakerUndelegated" { - delegated = false + // Sanity check to make sure we've got an initialized accumulator map for the block + if _, ok := s.stateAccumulator[log.BlockNumber]; !ok { + return nil, xerrors.Errorf("No state accumulator found for block %d", log.BlockNumber) } - change := &StakerDelegationChange{ - Staker: arguments[0].Value.(string), - Operator: arguments[1].Value.(string), - Delegated: delegated, - TransactionHash: log.TransactionHash, - TransactionIndex: log.TransactionIndex, - LogIndex: log.LogIndex, - BlockNumber: log.BlockNumber, - CreatedAt: log.CreatedAt, + staker := arguments[0].Value.(string) + operator := arguments[1].Value.(string) + + slotId := NewSlotId(staker, operator) + record, ok := s.stateAccumulator[log.BlockNumber][slotId] + if !ok { + // if the record doesn't exist, create a new one + record = &AccumulatedStateChange{ + Staker: staker, + Operator: operator, + BlockNumber: log.BlockNumber, + } + s.stateAccumulator[log.BlockNumber][slotId] = record } - return change, nil + if log.EventName == "StakerUndelegated" { + if ok { + // In this situation, we've encountered a delegate and undelegate in the same block + // which functionally results in no state change at all so we want to remove the record + // from the accumulated state. + delete(s.stateAccumulator[log.BlockNumber], slotId) + return nil, nil + } + record.Delegated = false + } else if log.EventName == "StakerDelegated" { + record.Delegated = true + } + + return record, nil } // Create an ordered list of block numbers @@ -138,6 +173,12 @@ func (s *StakerDelegationsModel) IsInterestingLog(log *storage.TransactionLog) b return s.BaseEigenState.IsInterestingLog(addresses, log) } +// StartBlockProcessing Initialize state accumulator for the block +func (s *StakerDelegationsModel) StartBlockProcessing(blockNumber uint64) error { + s.stateAccumulator[blockNumber] = make(map[SlotId]*AccumulatedStateChange) + return nil +} + func (s *StakerDelegationsModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) { stateChanges, sortedBlockNumbers := s.GetStateTransitions() @@ -149,157 +190,143 @@ func (s *StakerDelegationsModel) HandleStateChange(log *storage.TransactionLog) if err != nil { return nil, err } - - if change != nil { - wroteChange, err := s.writeStateChange(change) - if err != nil { - return wroteChange, err - } - return wroteChange, nil + if change == nil { + return nil, xerrors.Errorf("No state change found for block %d", blockNumber) } + return change, nil } } return nil, nil } -func (s *StakerDelegationsModel) writeStateChange(change *StakerDelegationChange) (interface{}, error) { - s.logger.Sugar().Debugw("Writing state change", zap.Any("change", change)) - res := s.Db.Model(&StakerDelegationChange{}).Clauses(clause.Returning{}).Create(change) - if res.Error != nil { - s.logger.Error("Failed to insert into avs_operator_changes", zap.Error(res.Error)) - return change, res.Error - } - return change, nil -} - -func (s *StakerDelegationsModel) WriteFinalState(blockNumber uint64) error { +func (s *StakerDelegationsModel) clonePreviousBlocksToNewBlock(blockNumber uint64) error { query := ` - with new_changes as ( + insert into delegated_stakers (staker, operator, block_number) select staker, operator, - block_number, - max(transaction_index) as transaction_index, - max(log_index) as log_index - from staker_delegation_changes - where block_number = @currentBlock - group by 1, 2, 3 - ), - unique_delegations as ( - select - nc.staker, - nc.operator, - sdc.log_index, - sdc.delegated, - nc.block_number - from new_changes as nc - left join staker_delegation_changes as sdc on ( - sdc.staker = nc.staker - and sdc.operator = nc.operator - and sdc.log_index = nc.log_index - and sdc.transaction_index = nc.transaction_index - and sdc.block_number = nc.block_number - ) - ), - undelegations as ( - select - concat(staker, '_', operator) as staker_operator - from unique_delegations - where delegated = false - ), - carryover as ( - select - rao.staker, - rao.operator, @currentBlock as block_number - from delegated_stakers as rao - where - rao.block_number = @previousBlock - and concat(rao.staker, '_', rao.operator) not in (select staker_operator from undelegations) - ), - final_state as ( - (select staker, operator, block_number::bigint from carryover) - union all - (select staker, operator, block_number::bigint from unique_delegations where delegated = true) - ) - insert into delegated_stakers (staker, operator, block_number) - select staker, operator, block_number from final_state + from delegated_stakers + where block_number = @previousBlock ` - res := s.Db.Exec(query, sql.Named("currentBlock", blockNumber), sql.Named("previousBlock", blockNumber-1), ) + if res.Error != nil { - s.logger.Sugar().Errorw("Failed to insert into operator_shares", zap.Error(res.Error)) + s.logger.Sugar().Errorw("Failed to clone previous block state to new block", zap.Error(res.Error)) return res.Error } return nil } -func (s *StakerDelegationsModel) getDifferenceInStates(blockNumber uint64) ([]DelegatedStakersDiff, error) { - query := ` - with new_states as ( - select - staker, - operator, - block_number, - true as delegated - from delegated_stakers - where block_number = @currentBlock - ), - previous_states as ( - select - staker, - operator, - block_number, - true as delegated - from delegated_stakers - where block_number = @previousBlock - ), - undelegated as ( - (select staker, operator, delegated from previous_states) - except - (select staker, operator, delegated from new_states) - ), - new_delegated as ( - (select staker, operator, delegated from new_states) - except - (select staker, operator, delegated from previous_states) - ) - select staker, operator, false as delegated from undelegated - union all - select staker, operator, true as delegated from new_delegated; - ` - results := make([]DelegatedStakersDiff, 0) - res := s.Db.Model(&DelegatedStakersDiff{}). - Raw(query, - sql.Named("currentBlock", blockNumber), - sql.Named("previousBlock", blockNumber-1), - ). - Scan(&results) - if res.Error != nil { - s.logger.Sugar().Errorw("Failed to fetch delegated_stakers", zap.Error(res.Error)) - return nil, res.Error +// prepareState prepares the state for the current block by comparing the accumulated state changes. +// It separates out the changes into inserts and deletes +func (s *StakerDelegationsModel) prepareState(blockNumber uint64) ([]DelegatedStakers, []DelegatedStakers, error) { + accumulatedState, ok := s.stateAccumulator[blockNumber] + if !ok { + err := xerrors.Errorf("No accumulated state found for block %d", blockNumber) + s.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber)) + return nil, nil, err + } + + inserts := make([]DelegatedStakers, 0) + deletes := make([]DelegatedStakers, 0) + for _, stateChange := range accumulatedState { + record := DelegatedStakers{ + Staker: stateChange.Staker, + Operator: stateChange.Operator, + BlockNumber: blockNumber, + } + if stateChange.Delegated { + inserts = append(inserts, record) + } else { + deletes = append(deletes, record) + } } - return results, nil + return inserts, deletes, nil } +func (s *StakerDelegationsModel) CommitFinalState(blockNumber uint64) error { + // Clone the previous block state to give us a reference point. + // + // By doing this, existing staker delegations will be carried over to the new block. + // We'll then remove any stakers that were undelegated and add any new stakers that were delegated. + err := s.clonePreviousBlocksToNewBlock(blockNumber) + if err != nil { + return err + } + + recordsToInsert, recordsToDelete, err := s.prepareState(blockNumber) + + // TODO(seanmcgary): should probably wrap the operations of this function in a db transaction + for _, record := range recordsToDelete { + res := s.Db.Delete(&DelegatedStakers{}, "staker = ? and operator = ? and block_number = ?", record.Staker, record.Operator, blockNumber) + if res.Error != nil { + s.logger.Sugar().Errorw("Failed to delete staker delegation", + zap.Error(res.Error), + zap.String("staker", record.Staker), + zap.String("operator", record.Operator), + zap.Uint64("blockNumber", blockNumber), + ) + return res.Error + } + } + if len(recordsToInsert) > 0 { + res := s.Db.Model(&DelegatedStakers{}).Clauses(clause.Returning{}).Create(&recordsToInsert) + if res.Error != nil { + s.logger.Sugar().Errorw("Failed to insert staker delegations", zap.Error(res.Error)) + return res.Error + } + } + return nil +} + +// ClearAccumulatedState clears the accumulated state for the given block number to free up memory +func (s *StakerDelegationsModel) ClearAccumulatedState(blockNumber uint64) error { + delete(s.stateAccumulator, blockNumber) + return nil +} + +// GenerateStateRoot generates the state root for the given block number by storing +// the state changes in a merkle tree. func (s *StakerDelegationsModel) GenerateStateRoot(blockNumber uint64) (types.StateRoot, error) { - results, err := s.getDifferenceInStates(blockNumber) + inserts, deletes, err := s.prepareState(blockNumber) if err != nil { return "", err } - fullTree, err := s.merkelizeState(blockNumber, results) + // Take all of the inserts and deletes and combine them into a single list + combinedResults := make([]DelegatedStakersDiff, 0) + for _, record := range inserts { + combinedResults = append(combinedResults, DelegatedStakersDiff{ + Staker: record.Staker, + Operator: record.Operator, + Delegated: true, + BlockNumber: blockNumber, + }) + } + for _, record := range deletes { + combinedResults = append(combinedResults, DelegatedStakersDiff{ + Staker: record.Staker, + Operator: record.Operator, + Delegated: false, + BlockNumber: blockNumber, + }) + } + + fullTree, err := s.merkelizeState(blockNumber, combinedResults) if err != nil { return "", err } return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil } +// merkelizeState generates a merkle tree for the given block number and delegated stakers. +// Changes are stored in the following format: +// Operator -> staker:delegated func (s *StakerDelegationsModel) merkelizeState(blockNumber uint64, delegatedStakers []DelegatedStakersDiff) (*merkletree.MerkleTree, error) { - // Operator -> staker:delegated om := orderedmap.New[string, *orderedmap.OrderedMap[string, bool]]() for _, result := range delegatedStakers { diff --git a/internal/eigenState/stakerDelegations/stakerDelegations_test.go b/internal/eigenState/stakerDelegations/stakerDelegations_test.go index 134e7f50..a7ae67de 100644 --- a/internal/eigenState/stakerDelegations/stakerDelegations_test.go +++ b/internal/eigenState/stakerDelegations/stakerDelegations_test.go @@ -68,10 +68,17 @@ func Test_DelegatedStakersState(t *testing.T) { assert.Equal(t, true, model.IsInterestingLog(&log)) + err = model.StartBlockProcessing(blockNumber) + assert.Nil(t, err) + res, err := model.HandleStateChange(&log) assert.Nil(t, err) assert.NotNil(t, res) + typedChange := res.(*AccumulatedStateChange) + assert.Equal(t, "0x5fc1b61816ddeb33b65a02a42b29059ecd3a20e9", typedChange.Staker) + assert.Equal(t, "0x5accc90436492f24e6af278569691e2c942a676d", typedChange.Operator) + teardown(model) }) t.Run("Should register StakerDelegationsModel and generate the table for the block", func(t *testing.T) { @@ -98,11 +105,18 @@ func Test_DelegatedStakersState(t *testing.T) { assert.Equal(t, true, model.IsInterestingLog(&log)) + err = model.StartBlockProcessing(blockNumber) + assert.Nil(t, err) + stateChange, err := model.HandleStateChange(&log) assert.Nil(t, err) assert.NotNil(t, stateChange) - err = model.WriteFinalState(blockNumber) + typedChange := stateChange.(*AccumulatedStateChange) + assert.Equal(t, "0x5fc1b61816ddeb33b65a02a42b29059ecd3a20e9", typedChange.Staker) + assert.Equal(t, "0x5accc90436492f24e6af278569691e2c942a676d", typedChange.Operator) + + err = model.CommitFinalState(blockNumber) assert.Nil(t, err) states := []DelegatedStakers{} @@ -166,11 +180,14 @@ func Test_DelegatedStakersState(t *testing.T) { for _, log := range logs { assert.True(t, model.IsInterestingLog(log)) + err = model.StartBlockProcessing(log.BlockNumber) + assert.Nil(t, err) + stateChange, err := model.HandleStateChange(log) assert.Nil(t, err) assert.NotNil(t, stateChange) - err = model.WriteFinalState(log.BlockNumber) + err = model.CommitFinalState(log.BlockNumber) assert.Nil(t, err) states := []DelegatedStakers{} @@ -185,16 +202,16 @@ func Test_DelegatedStakersState(t *testing.T) { if log.BlockNumber == blocks[0] { assert.Equal(t, 1, len(states)) - diffs, err := model.getDifferenceInStates(log.BlockNumber) + inserts, deletes, err := model.prepareState(log.BlockNumber) assert.Nil(t, err) - assert.Equal(t, 1, len(diffs)) - assert.Equal(t, true, diffs[0].Delegated) + assert.Equal(t, 1, len(inserts)) + assert.Equal(t, 0, len(deletes)) } else if log.BlockNumber == blocks[1] { assert.Equal(t, 0, len(states)) - diffs, err := model.getDifferenceInStates(log.BlockNumber) + inserts, deletes, err := model.prepareState(log.BlockNumber) assert.Nil(t, err) - assert.Equal(t, 1, len(diffs)) - assert.Equal(t, false, diffs[0].Delegated) + assert.Equal(t, 0, len(inserts)) + assert.Equal(t, 1, len(deletes)) } stateRoot, err := model.GenerateStateRoot(log.BlockNumber) diff --git a/internal/eigenState/stateManager/stateManager.go b/internal/eigenState/stateManager/stateManager.go index 61c978c9..b7d1583a 100644 --- a/internal/eigenState/stateManager/stateManager.go +++ b/internal/eigenState/stateManager/stateManager.go @@ -49,7 +49,7 @@ func (e *EigenStateManager) HandleLogStateChange(log *storage.TransactionLog) er func (e *EigenStateManager) CommitFinalState(blockNumber uint64) error { for _, index := range e.GetSortedModelIndexes() { state := e.StateModels[index] - err := state.WriteFinalState(blockNumber) + err := state.CommitFinalState(blockNumber) if err != nil { return err } diff --git a/internal/eigenState/types/types.go b/internal/eigenState/types/types.go index 82a657ed..6a6901a2 100644 --- a/internal/eigenState/types/types.go +++ b/internal/eigenState/types/types.go @@ -15,19 +15,27 @@ type IEigenStateModel interface { //Determine if the log is interesting to the state model IsInterestingLog(log *storage.TransactionLog) bool + // StartBlockProcessing + // Perform any necessary setup for processing a block + StartBlockProcessing(blockNumber uint64) error + // HandleStateChange // Allow the state model to handle the state change // // Returns the saved value. Listed as an interface because go generics suck HandleStateChange(log *storage.TransactionLog) (interface{}, error) - // WriteFinalState - // Once all state changes are processed, calculate and write final state - WriteFinalState(blockNumber uint64) error + // CommitFinalState + // Once all state changes are processed, commit the final state to the database + CommitFinalState(blockNumber uint64) error // GenerateStateRoot // Generate the state root for the model GenerateStateRoot(blockNumber uint64) (StateRoot, error) + + // ClearAccumulatedState + // Clear the accumulated state for the model to free up memory + ClearAccumulatedState(blockNumber uint64) error } // StateTransitions