From dc4564fc415f371b9080271b84d1e852cea93725 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Wed, 18 Sep 2024 13:53:44 -0500 Subject: [PATCH] Add delta table for delegated stakers --- .../eigenState/avsOperators/avsOperators.go | 1 + .../stakerDelegations/stakerDelegations.go | 46 +++++++++++++++++++ .../stakerDelegations_test.go | 1 + .../202409181340_stakerDelegationDelta/up.go | 30 ++++++++++++ internal/sqlite/migrations/migrator.go | 2 + 5 files changed, 80 insertions(+) create mode 100644 internal/sqlite/migrations/202409181340_stakerDelegationDelta/up.go diff --git a/internal/eigenState/avsOperators/avsOperators.go b/internal/eigenState/avsOperators/avsOperators.go index 5a28c52e..b963f817 100644 --- a/internal/eigenState/avsOperators/avsOperators.go +++ b/internal/eigenState/avsOperators/avsOperators.go @@ -332,6 +332,7 @@ func (a *AvsOperatorsModel) CommitFinalState(blockNumber uint64) error { func (a *AvsOperatorsModel) ClearAccumulatedState(blockNumber uint64) error { delete(a.stateAccumulator, blockNumber) + delete(a.deltaAccumulator, blockNumber) return nil } diff --git a/internal/eigenState/stakerDelegations/stakerDelegations.go b/internal/eigenState/stakerDelegations/stakerDelegations.go index 38476f43..5132d08b 100644 --- a/internal/eigenState/stakerDelegations/stakerDelegations.go +++ b/internal/eigenState/stakerDelegations/stakerDelegations.go @@ -2,6 +2,7 @@ package stakerDelegations import ( "database/sql" + "errors" "fmt" "slices" "sort" @@ -36,6 +37,14 @@ type AccumulatedStateChange struct { Delegated bool } +type StakerDelegationChange struct { + Staker string + Operator string + BlockNumber uint64 + Delegated bool + LogIndex uint64 +} + func NewSlotID(staker string, operator string) types.SlotID { return types.SlotID(fmt.Sprintf("%s_%s", staker, operator)) } @@ -49,6 +58,8 @@ type StakerDelegationsModel struct { // Accumulates state changes for SlotIds, grouped by block number stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange + + deltaAccumulator map[uint64][]*StakerDelegationChange } type DelegatedStakersDiff struct { @@ -72,6 +83,8 @@ func NewStakerDelegationsModel( logger: logger, globalConfig: globalConfig, stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange), + + deltaAccumulator: make(map[uint64][]*StakerDelegationChange), } esm.RegisterState(model, 2) @@ -123,6 +136,15 @@ func (s *StakerDelegationsModel) GetStateTransitions() (types.StateTransitions[A record.Delegated = true } + // Store the change in the delta accumulator + s.deltaAccumulator[log.BlockNumber] = append(s.deltaAccumulator[log.BlockNumber], &StakerDelegationChange{ + Staker: staker, + Operator: operator, + BlockNumber: log.BlockNumber, + Delegated: record.Delegated, + LogIndex: log.LogIndex, + }) + return record, nil } @@ -157,6 +179,7 @@ func (s *StakerDelegationsModel) IsInterestingLog(log *storage.TransactionLog) b // InitBlockProcessing initialize state accumulator for the block. func (s *StakerDelegationsModel) InitBlockProcessing(blockNumber uint64) error { s.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange) + s.deltaAccumulator[blockNumber] = make([]*StakerDelegationChange, 0) return nil } @@ -229,6 +252,24 @@ func (s *StakerDelegationsModel) prepareState(blockNumber uint64) ([]DelegatedSt return inserts, deletes, nil } +func (s *StakerDelegationsModel) writeDeltaRecordsToDeltaTable(blockNumber uint64) error { + records, ok := s.deltaAccumulator[blockNumber] + if !ok { + msg := "Delta accumulator was not initialized" + s.logger.Sugar().Errorw(msg, zap.Uint64("blockNumber", blockNumber)) + return errors.New(msg) + } + + if len(records) > 0 { + res := s.DB.Model(&StakerDelegationChange{}).Clauses(clause.Returning{}).Create(&records) + if res.Error != nil { + s.logger.Sugar().Errorw("Failed to insert delta records", zap.Error(res.Error)) + return res.Error + } + } + return nil +} + func (s *StakerDelegationsModel) CommitFinalState(blockNumber uint64) error { // Clone the previous block state to give us a reference point. // @@ -264,12 +305,17 @@ func (s *StakerDelegationsModel) CommitFinalState(blockNumber uint64) error { return res.Error } } + + if err = s.writeDeltaRecordsToDeltaTable(blockNumber); err != nil { + return err + } return nil } // ClearAccumulatedState clears the accumulated state for the given block number to free up memory. func (s *StakerDelegationsModel) ClearAccumulatedState(blockNumber uint64) error { delete(s.stateAccumulator, blockNumber) + delete(s.deltaAccumulator, blockNumber) return nil } diff --git a/internal/eigenState/stakerDelegations/stakerDelegations_test.go b/internal/eigenState/stakerDelegations/stakerDelegations_test.go index 4e427df6..7fe40304 100644 --- a/internal/eigenState/stakerDelegations/stakerDelegations_test.go +++ b/internal/eigenState/stakerDelegations/stakerDelegations_test.go @@ -39,6 +39,7 @@ func setup() ( func teardown(model *StakerDelegationsModel) { model.DB.Exec("delete from staker_delegation_changes") model.DB.Exec("delete from delegated_stakers") + model.DB.Exec("delete from staker_delegation_changes") } func Test_DelegatedStakersState(t *testing.T) { diff --git a/internal/sqlite/migrations/202409181340_stakerDelegationDelta/up.go b/internal/sqlite/migrations/202409181340_stakerDelegationDelta/up.go new file mode 100644 index 00000000..9e794a2c --- /dev/null +++ b/internal/sqlite/migrations/202409181340_stakerDelegationDelta/up.go @@ -0,0 +1,30 @@ +package _202409181340_stakerDelegationDelta + +import ( + "gorm.io/gorm" +) + +type SqliteMigration struct { +} + +func (m *SqliteMigration) Up(grm *gorm.DB) error { + query := ` + create table if not exists staker_delegation_changes ( + staker TEXT NOT NULL, + operator TEXT NOT NULL, + delegated INTEGER NOT NULL, + block_number INTEGER NOT NULL, + log_index INTEGER NOT NULL, + unique(staker, operator, delegated, log_index) + ) + ` + res := grm.Exec(query) + if res.Error != nil { + return res.Error + } + return nil +} + +func (m *SqliteMigration) GetName() string { + return "202409181340_stakerDelegationDelta" +} diff --git a/internal/sqlite/migrations/migrator.go b/internal/sqlite/migrations/migrator.go index bfd81651..686e25ee 100644 --- a/internal/sqlite/migrations/migrator.go +++ b/internal/sqlite/migrations/migrator.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" _202409161057_avsOperatorDeltas "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations/202409161057_avsOperatorDeltas" + _202409181340_stakerDelegationDelta "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations/202409181340_stakerDelegationDelta" "time" _202409061249_bootstrapDb "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations/202409061249_bootstrapDb" @@ -54,6 +55,7 @@ func (m *SqliteMigrator) MigrateAll() error { &_202409101540_rewardSubmissions.SqliteMigration{}, &_202409111509_removeOperatorRestakedStrategiesBlockConstraint.SqliteMigration{}, &_202409161057_avsOperatorDeltas.SqliteMigration{}, + &_202409181340_stakerDelegationDelta.SqliteMigration{}, } m.Logger.Sugar().Info("Running migrations")