Skip to content

Commit

Permalink
Add state change table for AVS operators to make windowing easier
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Sep 16, 2024
1 parent a0d0294 commit f5f6035
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 4 deletions.
45 changes: 45 additions & 0 deletions internal/eigenState/avsOperators/avsOperators.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package avsOperators

import (
"database/sql"
"errors"
"fmt"
"slices"
"sort"
Expand Down Expand Up @@ -44,6 +45,14 @@ type RegisteredAvsOperatorDiff struct {
Registered bool
}

type AvsOperatorStateChange struct {
Avs string
Operator string
Registered bool
LogIndex uint64
BlockNumber uint64
}

func NewSlotID(avs string, operator string) types.SlotID {
return types.SlotID(fmt.Sprintf("%s_%s", avs, operator))
}
Expand All @@ -58,6 +67,9 @@ type AvsOperatorsModel struct {

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*AccumulatedStateChange

// Keep track of each distinct change, rather than accumulated change, to add to the delta table
deltaAccumulator map[uint64][]*AvsOperatorStateChange
}

// NewAvsOperators creates a new AvsOperatorsModel.
Expand All @@ -76,6 +88,8 @@ func NewAvsOperators(
globalConfig: globalConfig,

stateAccumulator: make(map[uint64]map[types.SlotID]*AccumulatedStateChange),

deltaAccumulator: make(map[uint64][]*AvsOperatorStateChange),
}
esm.RegisterState(s, 0)
return s, nil
Expand Down Expand Up @@ -120,6 +134,15 @@ func (a *AvsOperatorsModel) GetStateTransitions() (types.StateTransitions[Accumu
registered = uint64(val.(float64)) == 1
}

// Store the change in the delta accumulator
a.deltaAccumulator[log.BlockNumber] = append(a.deltaAccumulator[log.BlockNumber], &AvsOperatorStateChange{
Avs: avs,
Operator: operator,
Registered: registered,
LogIndex: log.LogIndex,
BlockNumber: log.BlockNumber,
})

slotID := NewSlotID(avs, operator)
record, ok := a.stateAccumulator[log.BlockNumber][slotID]
if !ok {
Expand Down Expand Up @@ -173,6 +196,7 @@ func (a *AvsOperatorsModel) IsInterestingLog(log *storage.TransactionLog) bool {

func (a *AvsOperatorsModel) InitBlockProcessing(blockNumber uint64) error {
a.stateAccumulator[blockNumber] = make(map[types.SlotID]*AccumulatedStateChange)
a.deltaAccumulator[blockNumber] = make([]*AvsOperatorStateChange, 0)
return nil
}

Expand Down Expand Up @@ -249,6 +273,22 @@ func (a *AvsOperatorsModel) prepareState(blockNumber uint64) ([]RegisteredAvsOpe
return inserts, deletes, nil
}

func (a *AvsOperatorsModel) writeDeltaRecordsToDeltaTable(blockNumber uint64) error {
records, ok := a.deltaAccumulator[blockNumber]
if !ok {
msg := "Delta accumulator was not initialized"
a.logger.Sugar().Errorw(msg, zap.Uint64("blockNumber", blockNumber))
return errors.New(msg)
}

res := a.DB.Model(&AvsOperatorStateChange{}).Clauses(clause.Returning{}).Create(&records)
if res.Error != nil {
a.logger.Sugar().Errorw("Failed to insert delta records", zap.Error(res.Error))
return res.Error
}
return nil
}

// CommitFinalState commits the final state for the given block number.
func (a *AvsOperatorsModel) CommitFinalState(blockNumber uint64) error {
err := a.clonePreviousBlocksToNewBlock(blockNumber)
Expand Down Expand Up @@ -280,6 +320,11 @@ func (a *AvsOperatorsModel) CommitFinalState(blockNumber uint64) error {
return res.Error
}
}

if err = a.writeDeltaRecordsToDeltaTable(blockNumber); err != nil {
return err
}

return nil
}

Expand Down
44 changes: 41 additions & 3 deletions internal/eigenState/avsOperators/avsOperators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ func setup() (
func teardown(model *AvsOperatorsModel) {
model.DB.Exec("delete from avs_operator_changes")
model.DB.Exec("delete from registered_avs_operators")
model.DB.Exec("delete from avs_operator_state_changes")
}

func getInsertedDeltaRecordsForBlock(blockNumber uint64, model *AvsOperatorsModel) ([]*AvsOperatorStateChange, error) {
results := []*AvsOperatorStateChange{}

res := model.DB.Model(&AvsOperatorStateChange{}).Where("block_number = ?", blockNumber).Find(&results)
return results, res.Error
}

func getInsertedDeltaRecords(model *AvsOperatorsModel) ([]*AvsOperatorStateChange, error) {
results := []*AvsOperatorStateChange{}

res := model.DB.Model(&AvsOperatorStateChange{}).Order("block_number asc").Find(&results)
return results, res.Error
}

func Test_AvsOperatorState(t *testing.T) {
Expand Down Expand Up @@ -84,7 +99,22 @@ func Test_AvsOperatorState(t *testing.T) {
assert.Nil(t, err)
assert.NotNil(t, res)

teardown(avsOperatorState)
err = avsOperatorState.CommitFinalState(blockNumber)
assert.Nil(t, err)

inserted, err := getInsertedDeltaRecordsForBlock(blockNumber, avsOperatorState)
assert.Nil(t, err)
assert.Equal(t, 1, len(inserted))

assert.Equal(t, "0xdf25bdcdcdd9a3dd8c9069306c4dba8d90dd8e8e", inserted[0].Avs)
assert.Equal(t, "0x870679e138bcdf293b7ff14dd44b70fc97e12fc0", inserted[0].Operator)
assert.Equal(t, true, inserted[0].Registered)
assert.Equal(t, blockNumber, inserted[0].BlockNumber)
assert.Equal(t, uint64(400), inserted[0].LogIndex)

t.Cleanup(func() {
teardown(avsOperatorState)
})
})
t.Run("Should register AvsOperatorState and generate the table for the block", func(t *testing.T) {
esm := stateManager.NewEigenStateManager(l, grm)
Expand Down Expand Up @@ -134,7 +164,9 @@ func Test_AvsOperatorState(t *testing.T) {
assert.Nil(t, err)
assert.True(t, len(stateRoot) > 0)

teardown(avsOperatorState)
t.Cleanup(func() {
teardown(avsOperatorState)
})
})
t.Run("Should correctly generate state across multiple blocks", func(t *testing.T) {
esm := stateManager.NewEigenStateManager(l, grm)
Expand Down Expand Up @@ -217,6 +249,12 @@ func Test_AvsOperatorState(t *testing.T) {
assert.True(t, len(stateRoot) > 0)
}

teardown(avsOperatorState)
inserted, err := getInsertedDeltaRecords(avsOperatorState)
assert.Nil(t, err)
assert.Equal(t, len(logs), len(inserted))

t.Cleanup(func() {
teardown(avsOperatorState)
})
})
}
1 change: 0 additions & 1 deletion internal/eigenState/operatorShares/operatorShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ func (osm *OperatorSharesModel) prepareState(blockNumber uint64) ([]OperatorShar
// Map the existing records to a map for easier lookup
mappedRecords := make(map[types.SlotID]OperatorShares)
for _, record := range existingRecords {
fmt.Printf("Existing OperatorShares %+v\n", record)
slotID := NewSlotID(record.Operator, record.Strategy)
mappedRecords[slotID] = record
}
Expand Down
36 changes: 36 additions & 0 deletions internal/sqlite/migrations/202409161057_avsOperatorDeltas/up.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package _202409161057_avsOperatorDeltas

import (
"fmt"
"gorm.io/gorm"
)

type SqliteMigration struct {
}

func (m *SqliteMigration) Up(grm *gorm.DB) error {
queries := []string{
`create table if not exists avs_operator_state_changes (
operator TEXT NOT NULL,
avs TEXT NOT NULL,
block_number INTEGER NOT NULL,
log_index INTEGER NOT NULL,
created_at DATETIME default current_timestamp,
registered integer not null,
unique(operator, avs, block_number, log_index)
);
`,
}

for _, query := range queries {
if res := grm.Exec(query); res.Error != nil {
fmt.Printf("Failed to execute query: %s\n", query)
return res.Error
}
}
return nil
}

func (m *SqliteMigration) GetName() string {
return "202409161057_avsOperatorDeltas"
}
2 changes: 2 additions & 0 deletions internal/sqlite/migrations/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package migrations
import (
"database/sql"
"fmt"
_202409161057_avsOperatorDeltas "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations/202409161057_avsOperatorDeltas"
"time"

_202409061249_bootstrapDb "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations/202409061249_bootstrapDb"
Expand Down Expand Up @@ -52,6 +53,7 @@ func (m *SqliteMigrator) MigrateAll() error {
&_202409101144_submittedDistributionRoot.SqliteMigration{},
&_202409101540_rewardSubmissions.SqliteMigration{},
&_202409111509_removeOperatorRestakedStrategiesBlockConstraint.SqliteMigration{},
&_202409161057_avsOperatorDeltas.SqliteMigration{},
}

m.Logger.Sugar().Info("Running migrations")
Expand Down
9 changes: 9 additions & 0 deletions pkg/rewards/operatorAvsRegistrationWindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package rewards

const query = `
`

func (r *RewardsCalculator) GenerateOperatorAvsRegistrationWindows(snapshotDate string) {

}
5 changes: 5 additions & 0 deletions pkg/rewards/operatorAvsStrategyWindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package rewards

func (r *RewardsCalculator) GenerateOperatorAvsStrategyWindows(snapshotDate string) {

}

0 comments on commit f5f6035

Please sign in to comment.