From 799a4574969749bd679d7e3a4c515ac71936b928 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 24 Oct 2024 14:55:53 -0500 Subject: [PATCH] Migrate stakerDelegationSnapshots --- .../up.go | 31 +++++ internal/postgres/migrations/migrator.go | 2 + .../operatorAvsRegistrationSnaphots_test.go | 4 +- pkg/rewards/rewards.go | 3 +- pkg/rewards/stakerDelegationSnapshots.go | 112 ++++++------------ pkg/rewards/stakerDelegationSnapshots_test.go | 55 +++++---- pkg/rewards/tables.go | 8 +- 7 files changed, 114 insertions(+), 101 deletions(-) create mode 100644 internal/postgres/migrations/202410241450_stakerDelegationSnapshots/up.go diff --git a/internal/postgres/migrations/202410241450_stakerDelegationSnapshots/up.go b/internal/postgres/migrations/202410241450_stakerDelegationSnapshots/up.go new file mode 100644 index 00000000..d86f65f0 --- /dev/null +++ b/internal/postgres/migrations/202410241450_stakerDelegationSnapshots/up.go @@ -0,0 +1,31 @@ +package _202410241450_stakerDelegationSnapshots + +import ( + "database/sql" + "gorm.io/gorm" +) + +type Migration struct { +} + +func (m *Migration) Up(db *sql.DB, grm *gorm.DB) error { + queries := []string{ + `CREATE TABLE IF NOT EXISTS staker_delegation_snapshots ( + staker varchar not null, + operator varchar not null, + snapshot date not null + ) + `, + `create index idx_staker_delegation_snapshots_operator_snapshot on staker_delegation_snapshots (operator, snapshot)`, + } + for _, query := range queries { + if _, err := db.Exec(query); err != nil { + return err + } + } + return nil +} + +func (m *Migration) GetName() string { + return "202410241450_stakerDelegationSnapshots" +} diff --git a/internal/postgres/migrations/migrator.go b/internal/postgres/migrations/migrator.go index 574100ba..294eaa43 100644 --- a/internal/postgres/migrations/migrator.go +++ b/internal/postgres/migrations/migrator.go @@ -17,6 +17,7 @@ import ( _202410241313_operatorAvsRegistrationSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241313_operatorAvsRegistrationSnapshots" _202410241417_operatorAvsStrategySnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241417_operatorAvsStrategySnapshots" _202410241431_operatorShareSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241431_operatorShareSnapshots" + _202410241450_stakerDelegationSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241450_stakerDelegationSnapshots" "go.uber.org/zap" "gorm.io/gorm" "time" @@ -59,6 +60,7 @@ func (m *Migrator) MigrateAll() error { &_202410241313_operatorAvsRegistrationSnapshots.Migration{}, &_202410241417_operatorAvsStrategySnapshots.Migration{}, &_202410241431_operatorShareSnapshots.Migration{}, + &_202410241450_stakerDelegationSnapshots.Migration{}, } for _, migration := range migrations { diff --git a/pkg/rewards/operatorAvsRegistrationSnaphots_test.go b/pkg/rewards/operatorAvsRegistrationSnaphots_test.go index 24ee6593..1022de14 100644 --- a/pkg/rewards/operatorAvsRegistrationSnaphots_test.go +++ b/pkg/rewards/operatorAvsRegistrationSnaphots_test.go @@ -11,6 +11,7 @@ import ( "gorm.io/gorm" "slices" "testing" + "time" ) func setupOperatorAvsRegistrationSnapshot() ( @@ -161,7 +162,8 @@ func Test_OperatorAvsRegistrationSnapshots(t *testing.T) { t.Logf("Operator/AVS not found in results: %+v\n", window) lacksExpectedResult = append(lacksExpectedResult, window) } else { - if !slices.Contains(found, window.Snapshot) { + snapshotStr := window.Snapshot.Format(time.DateOnly) + if !slices.Contains(found, snapshotStr) { t.Logf("Found operator/AVS, but no snapshot: %+v - %+v\n", window, found) lacksExpectedResult = append(lacksExpectedResult, window) } diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index 89b1dec1..74ea27e1 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -101,7 +101,6 @@ func (rc *RewardsCalculator) getMostRecentDistributionRoot() (*submittedDistribu func (rc *RewardsCalculator) initializeRewardsSchema() error { funcs := []func() error{ rc.CreateStakerShareSnapshotsTable, - rc.CreateStakerDelegationSnapshotsTable, // Gold tables rc.CreateGold1ActiveRewardsTable, @@ -155,7 +154,7 @@ func (rc *RewardsCalculator) generateSnapshotData(startDate string, snapshotDate } rc.logger.Sugar().Debugw("Generated staker share snapshots") - if err = rc.GenerateAndInsertStakerDelegationSnapshots(snapshotDate); err != nil { + if err = rc.GenerateAndInsertStakerDelegationSnapshots(startDate, snapshotDate); err != nil { rc.logger.Sugar().Errorw("Failed to generate staker delegation snapshots", "error", err) return err } diff --git a/pkg/rewards/stakerDelegationSnapshots.go b/pkg/rewards/stakerDelegationSnapshots.go index f61c30c9..e239de49 100644 --- a/pkg/rewards/stakerDelegationSnapshots.go +++ b/pkg/rewards/stakerDelegationSnapshots.go @@ -14,75 +14,59 @@ with staker_delegations_with_block_info as ( from staker_delegation_changes as sdc left join blocks as b on (b.number = sdc.block_number) ), -ranked_staker_records as ( -SELECT *, - ROW_NUMBER() OVER (PARTITION BY staker, block_date ORDER BY block_time DESC, log_index desc) AS rn -FROM staker_delegations_with_block_info +ranked_delegations as ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY staker, cast(block_time AS DATE) ORDER BY block_time DESC, log_index DESC) AS rn + FROM staker_delegations_with_block_info ), -- Get the latest record for each day & round up to the snapshot day snapshotted_records as ( - SELECT - staker, - operator, - block_time, - DATE(block_date, '+1 day') as snapshot_time - from ranked_staker_records - where rn = 1 + SELECT + staker, + operator, + block_time, + date_trunc('day', block_time) + INTERVAL '1' day AS snapshot_time + from ranked_delegations + where rn = 1 ), --- Get the range for each operator, strategy pairing -staker_share_windows as ( - SELECT - staker, operator, snapshot_time as start_time, - CASE - -- If the range does not have the end, use the current timestamp truncated to 0 UTC - WHEN LEAD(snapshot_time) OVER (PARTITION BY staker ORDER BY snapshot_time) is null THEN DATE(@cutoffDate) - ELSE LEAD(snapshot_time) OVER (PARTITION BY staker ORDER BY snapshot_time) - END AS end_time - FROM snapshotted_records +-- Get the range for each staker +staker_delegation_windows as ( + SELECT + staker, operator, snapshot_time as start_time, + CASE + -- If the range does not have the end, use the cutoff date truncated to 0 UTC + WHEN LEAD(snapshot_time) OVER (PARTITION BY staker ORDER BY snapshot_time) is null THEN date_trunc('day', DATE(@cutoffDate)) + ELSE LEAD(snapshot_time) OVER (PARTITION BY staker ORDER BY snapshot_time) + END AS end_time + FROM snapshotted_records ), cleaned_records as ( - SELECT * FROM staker_share_windows + SELECT * FROM staker_delegation_windows WHERE start_time < end_time ), -date_bounds as ( - select - min(start_time) as min_start, - max(end_time) as max_end - from cleaned_records -), -day_series AS ( - with RECURSIVE day_series_inner AS ( - SELECT DATE(min_start) AS day - FROM date_bounds - UNION ALL - SELECT DATE(day, '+1 day') - FROM day_series_inner - WHERE day < (SELECT max_end FROM date_bounds) - ) - select * from day_series_inner -), final_results as ( SELECT staker, operator, - day as snapshot - FROM cleaned_records - cross join day_series - where DATE(day) between DATE(start_time) and DATE(end_time, '-1 day') + cast(day AS DATE) AS snapshot + FROM + cleaned_records + CROSS JOIN + generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS day ) select * from final_results +where + snapshot >= @startDate + and snapshot < @cutoffDate ` -type StakerDelegationSnapshot struct { - Staker string - Operator string - Snapshot string -} - -func (r *RewardsCalculator) GenerateStakerDelegationSnapshots(snapshotDate string) ([]*StakerDelegationSnapshot, error) { +func (r *RewardsCalculator) GenerateStakerDelegationSnapshots(startDate string, snapshotDate string) ([]*StakerDelegationSnapshot, error) { results := make([]*StakerDelegationSnapshot, 0) - res := r.grm.Raw(stakerDelegationSnapshotsQuery, sql.Named("cutoffDate", snapshotDate)).Scan(&results) + res := r.grm.Raw(stakerDelegationSnapshotsQuery, + sql.Named("startDate", startDate), + sql.Named("cutoffDate", snapshotDate), + ).Scan(&results) if res.Error != nil { r.logger.Sugar().Errorw("Failed to generate staker delegation snapshots", "error", res.Error) @@ -91,8 +75,8 @@ func (r *RewardsCalculator) GenerateStakerDelegationSnapshots(snapshotDate strin return results, nil } -func (r *RewardsCalculator) GenerateAndInsertStakerDelegationSnapshots(snapshotDate string) error { - snapshots, err := r.GenerateStakerDelegationSnapshots(snapshotDate) +func (r *RewardsCalculator) GenerateAndInsertStakerDelegationSnapshots(startDate string, snapshotDate string) error { + snapshots, err := r.GenerateStakerDelegationSnapshots(startDate, snapshotDate) if err != nil { r.logger.Sugar().Errorw("Failed to generate staker delegation snapshots", "error", err) return err @@ -107,25 +91,3 @@ func (r *RewardsCalculator) GenerateAndInsertStakerDelegationSnapshots(snapshotD return nil } - -func (r *RewardsCalculator) CreateStakerDelegationSnapshotsTable() error { - queries := []string{ - ` - CREATE TABLE IF NOT EXISTS staker_delegation_snapshots ( - staker TEXT, - operator TEXT, - snapshot TEXT - ) - `, - `create index idx_staker_delegation_snapshots_operator_snapshot on staker_delegation_snapshots (operator, snapshot)`, - } - - for _, query := range queries { - res := r.grm.Exec(query) - if res.Error != nil { - r.logger.Sugar().Errorw("Failed to create staker delegation snapshots table", "error", res.Error) - return res.Error - } - } - return nil -} diff --git a/pkg/rewards/stakerDelegationSnapshots_test.go b/pkg/rewards/stakerDelegationSnapshots_test.go index 8fca9c88..ca017357 100644 --- a/pkg/rewards/stakerDelegationSnapshots_test.go +++ b/pkg/rewards/stakerDelegationSnapshots_test.go @@ -4,14 +4,14 @@ import ( "fmt" "github.com/Layr-Labs/go-sidecar/internal/config" "github.com/Layr-Labs/go-sidecar/internal/logger" - "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations" + "github.com/Layr-Labs/go-sidecar/internal/postgres" "github.com/Layr-Labs/go-sidecar/internal/tests" - "github.com/Layr-Labs/go-sidecar/internal/tests/sqlite" "github.com/stretchr/testify/assert" "go.uber.org/zap" "gorm.io/gorm" "slices" "testing" + "time" ) func setupStakerDelegationSnapshot() ( @@ -21,30 +21,39 @@ func setupStakerDelegationSnapshot() ( *zap.Logger, error, ) { + testContext := getRewardsTestContext() cfg := tests.GetConfig() + switch testContext { + case "testnet": + cfg.Chain = config.Chain_Holesky + case "testnet-reduced": + cfg.Chain = config.Chain_Holesky + case "mainnet-reduced": + cfg.Chain = config.Chain_Mainnet + default: + return "", nil, nil, nil, fmt.Errorf("Unknown test context") + } + + cfg.DatabaseConfig = *tests.GetDbConfigFromEnv() + l, _ := logger.NewLogger(&logger.LoggerConfig{Debug: cfg.Debug}) - dbFileName, db, err := sqlite.GetFileBasedSqliteDatabaseConnection(l) + dbname, _, grm, err := postgres.GetTestPostgresDatabase(cfg.DatabaseConfig, l) if err != nil { - panic(err) - } - sqliteMigrator := migrations.NewSqliteMigrator(db, l) - if err := sqliteMigrator.MigrateAll(); err != nil { - l.Sugar().Fatalw("Failed to migrate", "error", err) + return dbname, nil, nil, nil, err } - return dbFileName, cfg, db, l, err + return dbname, cfg, grm, l, nil } -func teardownStakerDelegationSnapshot(grm *gorm.DB) { - queries := []string{ - `delete from staker_delegation_changes`, - `delete from blocks`, - } - for _, query := range queries { - if res := grm.Exec(query); res.Error != nil { - fmt.Printf("Failed to run query: %v\n", res.Error) - } +func teardownStakerDelegationSnapshot(dbname string, cfg *config.Config, db *gorm.DB, l *zap.Logger) { + rawDb, _ := db.DB() + _ = rawDb.Close() + + pgConfig := postgres.PostgresConfigFromDbConfig(&cfg.DatabaseConfig) + + if err := postgres.DeleteTestDatabase(pgConfig, dbname); err != nil { + l.Sugar().Errorw("Failed to delete test database", "error", err) } } @@ -82,6 +91,8 @@ func Test_StakerDelegationSnapshots(t *testing.T) { t.Fatal(err) } + startDate := "1970-01-01" + t.Run("Should hydrate dependency tables", func(t *testing.T) { if _, err := hydrateAllBlocksTable(grm, l); err != nil { t.Error(err) @@ -94,7 +105,7 @@ func Test_StakerDelegationSnapshots(t *testing.T) { rewards, _ := NewRewardsCalculator(l, grm, cfg) t.Log("Generating staker delegation snapshots") - snapshots, err := rewards.GenerateStakerDelegationSnapshots(snapshotDate) + snapshots, err := rewards.GenerateStakerDelegationSnapshots(startDate, snapshotDate) assert.Nil(t, err) t.Log("Getting expected results") @@ -125,7 +136,8 @@ func Test_StakerDelegationSnapshots(t *testing.T) { t.Logf("Staker/operator not found in results: %+v\n", snapshot) lacksExpectedResult = append(lacksExpectedResult, snapshot) } else { - if !slices.Contains(found, snapshot.Snapshot) { + snapshotStr := snapshot.Snapshot.Format(time.DateOnly) + if !slices.Contains(found, snapshotStr) { t.Logf("Found staker operator, but no snapshot: %+v - %+v\n", snapshot, found) lacksExpectedResult = append(lacksExpectedResult, snapshot) } @@ -141,7 +153,6 @@ func Test_StakerDelegationSnapshots(t *testing.T) { } }) t.Cleanup(func() { - teardownStakerDelegationSnapshot(grm) - tests.DeleteTestSqliteDB(dbFileName) + teardownStakerDelegationSnapshot(dbFileName, cfg, grm, l) }) } diff --git a/pkg/rewards/tables.go b/pkg/rewards/tables.go index 0fe60d04..7f3eb0a5 100644 --- a/pkg/rewards/tables.go +++ b/pkg/rewards/tables.go @@ -22,7 +22,7 @@ type CombinedRewards struct { type OperatorAvsRegistrationSnapshots struct { Avs string Operator string - Snapshot string + Snapshot time.Time } type OperatorAvsStrategySnapshot struct { @@ -38,3 +38,9 @@ type OperatorShareSnapshots struct { Shares string Snapshot time.Time } + +type StakerDelegationSnapshot struct { + Staker string + Operator string + Snapshot time.Time +}