Skip to content

Commit

Permalink
Add delta table for delegated stakers
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Sep 18, 2024
1 parent 1733e8f commit dc4564f
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 0 deletions.
1 change: 1 addition & 0 deletions internal/eigenState/avsOperators/avsOperators.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (a *AvsOperatorsModel) CommitFinalState(blockNumber uint64) error {

func (a *AvsOperatorsModel) ClearAccumulatedState(blockNumber uint64) error {
delete(a.stateAccumulator, blockNumber)
delete(a.deltaAccumulator, blockNumber)
return nil
}

Expand Down
46 changes: 46 additions & 0 deletions internal/eigenState/stakerDelegations/stakerDelegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stakerDelegations

import (
"database/sql"
"errors"
"fmt"
"slices"
"sort"
Expand Down Expand Up @@ -36,6 +37,14 @@ type AccumulatedStateChange struct {
Delegated bool
}

type StakerDelegationChange struct {
Staker string
Operator string
BlockNumber uint64
Delegated bool
LogIndex uint64
}

func NewSlotID(staker string, operator string) types.SlotID {
return types.SlotID(fmt.Sprintf("%s_%s", staker, operator))
}
Expand All @@ -49,6 +58,8 @@ type StakerDelegationsModel struct {

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange

deltaAccumulator map[uint64][]*StakerDelegationChange
}

type DelegatedStakersDiff struct {
Expand All @@ -72,6 +83,8 @@ func NewStakerDelegationsModel(
logger: logger,
globalConfig: globalConfig,
stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange),

deltaAccumulator: make(map[uint64][]*StakerDelegationChange),
}

esm.RegisterState(model, 2)
Expand Down Expand Up @@ -123,6 +136,15 @@ func (s *StakerDelegationsModel) GetStateTransitions() (types.StateTransitions[A
record.Delegated = true
}

// Store the change in the delta accumulator
s.deltaAccumulator[log.BlockNumber] = append(s.deltaAccumulator[log.BlockNumber], &StakerDelegationChange{
Staker: staker,
Operator: operator,
BlockNumber: log.BlockNumber,
Delegated: record.Delegated,
LogIndex: log.LogIndex,
})

return record, nil
}

Expand Down Expand Up @@ -157,6 +179,7 @@ func (s *StakerDelegationsModel) IsInterestingLog(log *storage.TransactionLog) b
// InitBlockProcessing initialize state accumulator for the block.
func (s *StakerDelegationsModel) InitBlockProcessing(blockNumber uint64) error {
s.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange)
s.deltaAccumulator[blockNumber] = make([]*StakerDelegationChange, 0)
return nil
}

Expand Down Expand Up @@ -229,6 +252,24 @@ func (s *StakerDelegationsModel) prepareState(blockNumber uint64) ([]DelegatedSt
return inserts, deletes, nil
}

func (s *StakerDelegationsModel) writeDeltaRecordsToDeltaTable(blockNumber uint64) error {
records, ok := s.deltaAccumulator[blockNumber]
if !ok {
msg := "Delta accumulator was not initialized"
s.logger.Sugar().Errorw(msg, zap.Uint64("blockNumber", blockNumber))
return errors.New(msg)
}

if len(records) > 0 {
res := s.DB.Model(&StakerDelegationChange{}).Clauses(clause.Returning{}).Create(&records)
if res.Error != nil {
s.logger.Sugar().Errorw("Failed to insert delta records", zap.Error(res.Error))
return res.Error
}
}
return nil
}

func (s *StakerDelegationsModel) CommitFinalState(blockNumber uint64) error {
// Clone the previous block state to give us a reference point.
//
Expand Down Expand Up @@ -264,12 +305,17 @@ func (s *StakerDelegationsModel) CommitFinalState(blockNumber uint64) error {
return res.Error
}
}

if err = s.writeDeltaRecordsToDeltaTable(blockNumber); err != nil {
return err
}
return nil
}

// ClearAccumulatedState clears the accumulated state for the given block number to free up memory.
func (s *StakerDelegationsModel) ClearAccumulatedState(blockNumber uint64) error {
delete(s.stateAccumulator, blockNumber)
delete(s.deltaAccumulator, blockNumber)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func setup() (
func teardown(model *StakerDelegationsModel) {
model.DB.Exec("delete from staker_delegation_changes")
model.DB.Exec("delete from delegated_stakers")
model.DB.Exec("delete from staker_delegation_changes")
}

func Test_DelegatedStakersState(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package _202409181340_stakerDelegationDelta

import (
"gorm.io/gorm"
)

type SqliteMigration struct {
}

func (m *SqliteMigration) Up(grm *gorm.DB) error {
query := `
create table if not exists staker_delegation_changes (
staker TEXT NOT NULL,
operator TEXT NOT NULL,
delegated INTEGER NOT NULL,
block_number INTEGER NOT NULL,
log_index INTEGER NOT NULL,
unique(staker, operator, delegated, log_index)
)
`
res := grm.Exec(query)
if res.Error != nil {
return res.Error
}
return nil
}

func (m *SqliteMigration) GetName() string {
return "202409181340_stakerDelegationDelta"
}
2 changes: 2 additions & 0 deletions internal/sqlite/migrations/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
_202409161057_avsOperatorDeltas "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations/202409161057_avsOperatorDeltas"
_202409181340_stakerDelegationDelta "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations/202409181340_stakerDelegationDelta"
"time"

_202409061249_bootstrapDb "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations/202409061249_bootstrapDb"
Expand Down Expand Up @@ -54,6 +55,7 @@ func (m *SqliteMigrator) MigrateAll() error {
&_202409101540_rewardSubmissions.SqliteMigration{},
&_202409111509_removeOperatorRestakedStrategiesBlockConstraint.SqliteMigration{},
&_202409161057_avsOperatorDeltas.SqliteMigration{},
&_202409181340_stakerDelegationDelta.SqliteMigration{},
}

m.Logger.Sugar().Info("Running migrations")
Expand Down

0 comments on commit dc4564f

Please sign in to comment.