Skip to content

Commit

Permalink
feat: remove HandleStateChange
Browse files Browse the repository at this point in the history
  • Loading branch information
gpsanant committed Oct 20, 2024
1 parent 9e7ec29 commit 64bdb56
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 145 deletions.
13 changes: 6 additions & 7 deletions internal/eigenState/avsOperators/avsOperators.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ func NewSlotID(avs string, operator string) types.SlotID {

// EigenState model for AVS operators that implements IEigenStateModel.
type AvsOperatorsBaseModel struct {
StateTransitions types.StateTransitions[AccumulatedStateChange]
db *gorm.DB
logger *zap.Logger
globalConfig *config.Config
db *gorm.DB
logger *zap.Logger
globalConfig *config.Config

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange
Expand Down Expand Up @@ -120,11 +119,11 @@ func (a *AvsOperatorsBaseModel) Base() interface{} {
//
// Returns the map and a reverse sorted list of block numbers that can be traversed when
// processing a log to determine which state change to apply.
func (a *AvsOperatorsBaseModel) GetStateTransitions() (types.StateTransitions[AccumulatedStateChange], []uint64) {
stateChanges := make(types.StateTransitions[AccumulatedStateChange])
func (a *AvsOperatorsBaseModel) GetStateTransitions() (types.StateTransitions, []uint64) {
stateChanges := make(types.StateTransitions)

// TODO(seanmcgary): make this not a closure so this function doesnt get big an messy...
stateChanges[0] = func(log *storage.TransactionLog) (*AccumulatedStateChange, error) {
stateChanges[0] = func(log *storage.TransactionLog) (interface{}, error) {
arguments, err := utils.ParseLogArguments(a.logger, log)
if err != nil {
return nil, err
Expand Down
20 changes: 20 additions & 0 deletions internal/eigenState/eigenStateModel/eigenStateModel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ func (m *EigenStateModel) IsInterestingLog(log *storage.TransactionLog) bool {
return false
}

func (m *EigenStateModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) {
stateChanges, sortedBlockNumbers := m.GetStateTransitions()

for _, blockNumber := range sortedBlockNumbers {
if log.BlockNumber >= blockNumber {
m.Logger().Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", blockNumber))

change, err := stateChanges[blockNumber](log)
if err != nil {
return nil, err
}
if change == nil {
return nil, fmt.Errorf("No state change found for block %d", blockNumber)
}
return change, nil
}
}
return nil, nil //nolint:nilnil
}

func (m *EigenStateModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
tableName := m.TableName()
if endBlockNumber != 0 && endBlockNumber < startBlockNumber {
Expand Down
33 changes: 6 additions & 27 deletions internal/eigenState/operatorShares/operatorShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ func NewSlotID(operator string, strategy string) types.SlotID {

// Implements IEigenStateModel.
type OperatorSharesBaseModel struct {
StateTransitions types.StateTransitions[AccumulatedStateChange]
db *gorm.DB
logger *zap.Logger
globalConfig *config.Config
db *gorm.DB
logger *zap.Logger
globalConfig *config.Config

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange
Expand Down Expand Up @@ -122,10 +121,10 @@ func parseLogOutputForOperatorShares(outputDataStr string) (*operatorSharesOutpu
return outputData, err
}

func (osm *OperatorSharesBaseModel) GetStateTransitions() (types.StateTransitions[AccumulatedStateChange], []uint64) {
stateChanges := make(types.StateTransitions[AccumulatedStateChange])
func (osm *OperatorSharesBaseModel) GetStateTransitions() (types.StateTransitions, []uint64) {
stateChanges := make(types.StateTransitions)

stateChanges[0] = func(log *storage.TransactionLog) (*AccumulatedStateChange, error) {
stateChanges[0] = func(log *storage.TransactionLog) (interface{}, error) {
arguments, err := utils.ParseLogArguments(osm.logger, log)
if err != nil {
return nil, err
Expand Down Expand Up @@ -208,26 +207,6 @@ func (osm *OperatorSharesBaseModel) CleanupBlock(blockNumber uint64) error {
return nil
}

func (osm *OperatorSharesBaseModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) {
stateChanges, sortedBlockNumbers := osm.GetStateTransitions()

for _, blockNumber := range sortedBlockNumbers {
if log.BlockNumber >= blockNumber {
osm.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", blockNumber))

change, err := stateChanges[blockNumber](log)
if err != nil {
return nil, err
}
if change == nil {
return nil, xerrors.Errorf("No state change found for block %d", blockNumber)
}
return change, nil
}
}
return nil, nil //nolint:nilnil
}

// prepareState prepares the state for commit by adding the new state to the existing state.
func (osm *OperatorSharesBaseModel) prepareState(blockNumber uint64) ([]*OperatorSharesDiff, error) {
preparedState := make([]*OperatorSharesDiff, 0)
Expand Down
37 changes: 8 additions & 29 deletions internal/eigenState/rewardSubmissions/rewardSubmissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ func NewSlotID(rewardHash string, strategy string) types.SlotID {
}

type RewardSubmissionsBaseModel struct {
StateTransitions types.StateTransitions[RewardSubmission]
db *gorm.DB
Network config.Network
Environment config.Environment
logger *zap.Logger
globalConfig *config.Config
db *gorm.DB
Network config.Network
Environment config.Environment
logger *zap.Logger
globalConfig *config.Config

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*RewardSubmission
Expand Down Expand Up @@ -194,10 +193,10 @@ func (rs *RewardSubmissionsBaseModel) handleRewardSubmissionCreatedEvent(log *st
return &RewardSubmissions{Submissions: rewardSubmissions}, nil
}

func (rs *RewardSubmissionsBaseModel) GetStateTransitions() (types.StateTransitions[RewardSubmissions], []uint64) {
stateChanges := make(types.StateTransitions[RewardSubmissions])
func (rs *RewardSubmissionsBaseModel) GetStateTransitions() (types.StateTransitions, []uint64) {
stateChanges := make(types.StateTransitions)

stateChanges[0] = func(log *storage.TransactionLog) (*RewardSubmissions, error) {
stateChanges[0] = func(log *storage.TransactionLog) (interface{}, error) {
rewardSubmissions, err := rs.handleRewardSubmissionCreatedEvent(log)
if err != nil {
return nil, err
Expand Down Expand Up @@ -255,26 +254,6 @@ func (rs *RewardSubmissionsBaseModel) CleanupBlock(blockNumber uint64) error {
return nil
}

func (rs *RewardSubmissionsBaseModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) {
stateChanges, sortedBlockNumbers := rs.GetStateTransitions()

for _, blockNumber := range sortedBlockNumbers {
if log.BlockNumber >= blockNumber {
rs.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", blockNumber))

change, err := stateChanges[blockNumber](log)
if err != nil {
return nil, err
}
if change == nil {
return nil, nil
}
return change, nil
}
}
return nil, nil
}

func (rs *RewardSubmissionsBaseModel) clonePreviousBlocksToNewBlock(blockNumber uint64) error {
query := `
insert into reward_submissions(avs, reward_hash, token, amount, strategy, strategy_index, multiplier, start_timestamp, end_timestamp, duration, reward_type, block_number)
Expand Down
35 changes: 7 additions & 28 deletions internal/eigenState/stakerDelegations/stakerDelegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ func NewSlotID(staker string, operator string) types.SlotID {
}

type StakerDelegationsBaseModel struct {
StateTransitions types.StateTransitions[AccumulatedStateChange]
db *gorm.DB
logger *zap.Logger
globalConfig *config.Config
db *gorm.DB
logger *zap.Logger
globalConfig *config.Config

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange
Expand Down Expand Up @@ -108,10 +107,10 @@ func (s *StakerDelegationsBaseModel) Base() interface{} {
return s
}

func (s *StakerDelegationsBaseModel) GetStateTransitions() (types.StateTransitions[AccumulatedStateChange], []uint64) {
stateChanges := make(types.StateTransitions[AccumulatedStateChange])
func (s *StakerDelegationsBaseModel) GetStateTransitions() (types.StateTransitions, []uint64) {
stateChanges := make(types.StateTransitions)

stateChanges[0] = func(log *storage.TransactionLog) (*AccumulatedStateChange, error) {
stateChanges[0] = func(log *storage.TransactionLog) (interface{}, error) {
arguments, err := utils.ParseLogArguments(s.logger, log)
if err != nil {
return nil, err
Expand Down Expand Up @@ -198,26 +197,6 @@ func (s *StakerDelegationsBaseModel) CleanupBlock(blockNumber uint64) error {
return nil
}

func (s *StakerDelegationsBaseModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) {
stateChanges, sortedBlockNumbers := s.GetStateTransitions()

for _, blockNumber := range sortedBlockNumbers {
if log.BlockNumber >= blockNumber {
s.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", blockNumber))

change, err := stateChanges[blockNumber](log)
if err != nil {
return nil, err
}
if change == nil {
return nil, xerrors.Errorf("No state change found for block %d", blockNumber)
}
return change, nil
}
}
return nil, nil //nolint:nilnil
}

func (s *StakerDelegationsBaseModel) clonePreviousBlocksToNewBlock(blockNumber uint64) error {
query := `
insert into delegated_stakers (staker, operator, block_number)
Expand Down Expand Up @@ -270,7 +249,7 @@ func (s *StakerDelegationsBaseModel) prepareState(blockNumber uint64) ([]Delegat
func (s *StakerDelegationsBaseModel) writeDeltaRecordsToDeltaTable(blockNumber uint64) error {
records, ok := s.deltaAccumulator[blockNumber]
if !ok {
msg := "Delta accumulator was not initialized"
msg := "delta accumulator was not initialized"
s.logger.Sugar().Errorw(msg, zap.Uint64("blockNumber", blockNumber))
return errors.New(msg)
}
Expand Down
28 changes: 4 additions & 24 deletions internal/eigenState/stakerShares/stakerShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewSlotID(staker string, strategy string) types.SlotID {
}

type StakerSharesBaseModel struct {
StateTransitions types.StateTransitions[AccumulatedStateChange]
StateTransitions types.StateTransitions
db *gorm.DB
logger *zap.Logger
globalConfig *config.Config
Expand Down Expand Up @@ -371,10 +371,10 @@ type AccumulatedStateChanges struct {
Changes []*AccumulatedStateChange
}

func (ss *StakerSharesBaseModel) GetStateTransitions() (types.StateTransitions[AccumulatedStateChanges], []uint64) {
stateChanges := make(types.StateTransitions[AccumulatedStateChanges])
func (ss *StakerSharesBaseModel) GetStateTransitions() (types.StateTransitions, []uint64) {
stateChanges := make(types.StateTransitions)

stateChanges[0] = func(log *storage.TransactionLog) (*AccumulatedStateChanges, error) {
stateChanges[0] = func(log *storage.TransactionLog) (interface{}, error) {
var parsedRecords []*AccumulatedStateChange
var err error

Expand Down Expand Up @@ -482,26 +482,6 @@ func (ss *StakerSharesBaseModel) CleanupBlock(blockNumber uint64) error {
return nil
}

func (ss *StakerSharesBaseModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) {
stateChanges, sortedBlockNumbers := ss.GetStateTransitions()

for _, blockNumber := range sortedBlockNumbers {
if log.BlockNumber >= blockNumber {
ss.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", blockNumber))

change, err := stateChanges[blockNumber](log)
if err != nil {
return nil, err
}
if change == nil {
return nil, nil
}
return change, nil
}
}
return nil, nil
}

// prepareState prepares the state for commit by adding the new state to the existing state.
func (ss *StakerSharesBaseModel) prepareState(blockNumber uint64) ([]*StakerSharesDiff, error) {
preparedState := make([]*StakerSharesDiff, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewSlotID(rootIndex uint64, root string) types.SlotID {
}

type SubmittedDistributionRootsBaseModel struct {
StateTransitions types.StateTransitions[SubmittedDistributionRoots]
StateTransitions types.StateTransitions
db *gorm.DB
logger *zap.Logger
globalConfig *config.Config
Expand Down Expand Up @@ -100,10 +100,10 @@ func parseLogOutputForDistributionRootSubmitted(outputDataStr string) (*distribu
return outputData, err
}

func (sdr *SubmittedDistributionRootsBaseModel) GetStateTransitions() (types.StateTransitions[SubmittedDistributionRoots], []uint64) {
stateChanges := make(types.StateTransitions[SubmittedDistributionRoots])
func (sdr *SubmittedDistributionRootsBaseModel) GetStateTransitions() (types.StateTransitions, []uint64) {
stateChanges := make(types.StateTransitions)

stateChanges[0] = func(log *storage.TransactionLog) (*SubmittedDistributionRoots, error) {
stateChanges[0] = func(log *storage.TransactionLog) (interface{}, error) {
arguments, err := utils.ParseLogArguments(sdr.logger, log)
if err != nil {
return nil, err
Expand Down Expand Up @@ -213,26 +213,6 @@ func (sdr *SubmittedDistributionRootsBaseModel) CleanupBlock(blockNumber uint64)
return nil
}

func (sdr *SubmittedDistributionRootsBaseModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) {
stateChanges, sortedBlockNumbers := sdr.GetStateTransitions()

for _, blockNumber := range sortedBlockNumbers {
if log.BlockNumber >= blockNumber {
sdr.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", blockNumber))

change, err := stateChanges[blockNumber](log)
if err != nil {
return nil, err
}
if change == nil {
return nil, xerrors.Errorf("No state change found for block %d", blockNumber)
}
return change, nil
}
}
return nil, nil
}

func (sdr *SubmittedDistributionRootsBaseModel) clonePreviousBlocksToNewBlock(blockNumber uint64) error {
query := `
insert into submitted_distribution_roots (root, root_index, rewards_calculation_end, rewards_calculation_end_unit, activated_at, activated_at_unit, created_at_block_number, block_number)
Expand Down
16 changes: 10 additions & 6 deletions internal/eigenState/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ type IBaseEigenStateModel interface {
// Cleanup any state changes for the block
CleanupBlock(blockNumber uint64) error

// HandleStateChange
// Allow the state model to handle the state change
//
// Returns the saved value. Listed as an interface because go generics suck
HandleStateChange(log *storage.TransactionLog) (interface{}, error)
// GetStateTransitions
// Get the state transitions for the model
GetStateTransitions() (StateTransitions, []uint64)

// CommitFinalState
// Once all state changes are processed, commit the final state to the database
Expand All @@ -64,6 +62,12 @@ type IEigenStateModel interface {
// Determine if the log is interesting to the state model
IsInterestingLog(log *storage.TransactionLog) bool

// HandleStateChange
// Allow the state model to handle the state change
//
// Returns the saved value. Listed as an interface because go generics suck
HandleStateChange(log *storage.TransactionLog) (interface{}, error)

// GenerateStateRoot
// Generate the state root for the model
GenerateStateRoot(blockNumber uint64) (StateRoot, error)
Expand All @@ -78,7 +82,7 @@ type IEigenStateModel interface {

// StateTransitions
// Map of block number to function that will transition the state to the next block.
type StateTransitions[T interface{}] map[uint64]func(log *storage.TransactionLog) (*T, error)
type StateTransitions map[uint64]func(log *storage.TransactionLog) (interface{}, error)

type SlotID string

Expand Down

0 comments on commit 64bdb56

Please sign in to comment.