From 12368223355c1fe98cb963c2ccbdc1df633a1d42 Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Thu, 24 Oct 2024 14:46:53 -0500 Subject: [PATCH] Migrate operatorShareSnapshots --- .../202410241431_operatorShareSnapshots/up.go | 30 +++++ internal/postgres/migrations/migrator.go | 2 + pkg/rewards/operatorShareSnapshots.go | 103 ++++++------------ pkg/rewards/operatorShareSnapshots_test.go | 55 ++++++---- pkg/rewards/rewards.go | 3 +- pkg/rewards/tables.go | 7 ++ 6 files changed, 107 insertions(+), 93 deletions(-) create mode 100644 internal/postgres/migrations/202410241431_operatorShareSnapshots/up.go diff --git a/internal/postgres/migrations/202410241431_operatorShareSnapshots/up.go b/internal/postgres/migrations/202410241431_operatorShareSnapshots/up.go new file mode 100644 index 00000000..0c12cd0d --- /dev/null +++ b/internal/postgres/migrations/202410241431_operatorShareSnapshots/up.go @@ -0,0 +1,30 @@ +package _202410241431_operatorShareSnapshots + +import ( + "database/sql" + "gorm.io/gorm" +) + +type Migration struct { +} + +func (m *Migration) Up(db *sql.DB, grm *gorm.DB) error { + queries := []string{ + `CREATE TABLE IF NOT EXISTS operator_share_snapshots ( + operator varchar not null, + strategy varchar not null, + shares numeric not null, + snapshot date not null + )`, + } + for _, query := range queries { + if _, err := db.Exec(query); err != nil { + return err + } + } + return nil +} + +func (m *Migration) GetName() string { + return "202410241431_operatorShareSnapshots" +} diff --git a/internal/postgres/migrations/migrator.go b/internal/postgres/migrations/migrator.go index 8a2306b5..574100ba 100644 --- a/internal/postgres/migrations/migrator.go +++ b/internal/postgres/migrations/migrator.go @@ -16,6 +16,7 @@ import ( _202410241239_combinedRewards "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241239_combinedRewards" _202410241313_operatorAvsRegistrationSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241313_operatorAvsRegistrationSnapshots" _202410241417_operatorAvsStrategySnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241417_operatorAvsStrategySnapshots" + _202410241431_operatorShareSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241431_operatorShareSnapshots" "go.uber.org/zap" "gorm.io/gorm" "time" @@ -57,6 +58,7 @@ func (m *Migrator) MigrateAll() error { &_202410241239_combinedRewards.Migration{}, &_202410241313_operatorAvsRegistrationSnapshots.Migration{}, &_202410241417_operatorAvsStrategySnapshots.Migration{}, + &_202410241431_operatorShareSnapshots.Migration{}, } for _, migration := range migrations { diff --git a/pkg/rewards/operatorShareSnapshots.go b/pkg/rewards/operatorShareSnapshots.go index ecc18525..f495c99f 100644 --- a/pkg/rewards/operatorShareSnapshots.go +++ b/pkg/rewards/operatorShareSnapshots.go @@ -2,7 +2,6 @@ package rewards import ( "database/sql" - "go.uber.org/zap" ) const operatorShareSnapshotsQuery = ` @@ -18,78 +17,60 @@ with operator_shares_with_block_info as ( left join blocks as b on (b.number = os.block_number) ), ranked_operator_records as ( - select - *, - ROW_NUMBER() OVER (PARTITION BY operator, strategy, block_date ORDER BY block_time DESC) as rn - from operator_shares_with_block_info as os + SELECT *, + ROW_NUMBER() OVER (PARTITION BY operator, strategy, cast(block_time AS DATE) ORDER BY block_time DESC) AS rn + FROM operator_shares_with_block_info ), -- Get the latest record for each day & round up to the snapshot day snapshotted_records as ( - SELECT - operator, - strategy, - shares, - block_time, - DATE(block_date, '+1 day') as snapshot_time - from ranked_operator_records - where rn = 1 + SELECT + operator, + strategy, + shares, + block_time, + date_trunc('day', block_time) + INTERVAL '1' day as snapshot_time + from ranked_operator_records + where rn = 1 ), -- Get the range for each operator, strategy pairing operator_share_windows as ( - SELECT - operator, strategy, shares, snapshot_time as start_time, - CASE - -- If the range does not have the end, use the current timestamp truncated to 0 UTC - WHEN LEAD(snapshot_time) OVER (PARTITION BY operator, strategy ORDER BY snapshot_time) is null THEN DATE(@cutoffDate) - ELSE LEAD(snapshot_time) OVER (PARTITION BY operator, strategy ORDER BY snapshot_time) - END AS end_time - FROM snapshotted_records + SELECT + operator, strategy, shares, snapshot_time as start_time, + CASE + -- If the range does not have the end, use the current timestamp truncated to 0 UTC + WHEN LEAD(snapshot_time) OVER (PARTITION BY operator, strategy ORDER BY snapshot_time) is null THEN date_trunc('day', DATE(@cutoffDate)) + ELSE LEAD(snapshot_time) OVER (PARTITION BY operator, strategy ORDER BY snapshot_time) + END AS end_time + FROM snapshotted_records ), cleaned_records as ( SELECT * FROM operator_share_windows WHERE start_time < end_time ), -date_bounds as ( - select - min(start_time) as min_start, - max(end_time) as max_end - from cleaned_records -), -day_series AS ( - with RECURSIVE day_series_inner AS ( - SELECT DATE(min_start) AS day - FROM date_bounds - UNION ALL - SELECT DATE(day, '+1 day') - FROM day_series_inner - WHERE day < (SELECT max_end FROM date_bounds) - ) - select * from day_series_inner -), final_results as ( SELECT operator, strategy, shares, - day as snapshot - FROM cleaned_records - cross join day_series - where DATE(day) between DATE(start_time) and DATE(end_time, '-1 day') + cast(day AS DATE) AS snapshot + FROM + cleaned_records + CROSS JOIN + generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS day ) select * from final_results +where + snapshot >= @startDate + and snapshot < @cutoffDate ` -type OperatorShareSnapshots struct { - Operator string - Strategy string - Shares string - Snapshot string -} - -func (r *RewardsCalculator) GenerateOperatorShareSnapshots(snapshotDate string) ([]*OperatorShareSnapshots, error) { +func (r *RewardsCalculator) GenerateOperatorShareSnapshots(startDate string, snapshotDate string) ([]*OperatorShareSnapshots, error) { results := make([]*OperatorShareSnapshots, 0) - res := r.grm.Raw(operatorShareSnapshotsQuery, sql.Named("cutoffDate", snapshotDate)).Scan(&results) + res := r.grm.Raw(operatorShareSnapshotsQuery, + sql.Named("startDate", startDate), + sql.Named("cutoffDate", snapshotDate), + ).Scan(&results) if res.Error != nil { r.logger.Sugar().Errorw("Failed to generate operator share snapshots", "error", res.Error) @@ -98,8 +79,8 @@ func (r *RewardsCalculator) GenerateOperatorShareSnapshots(snapshotDate string) return results, nil } -func (r *RewardsCalculator) GenerateAndInsertOperatorShareSnapshots(snapshotDate string) error { - snapshots, err := r.GenerateOperatorShareSnapshots(snapshotDate) +func (r *RewardsCalculator) GenerateAndInsertOperatorShareSnapshots(startDate string, snapshotDate string) error { + snapshots, err := r.GenerateOperatorShareSnapshots(startDate, snapshotDate) if err != nil { r.logger.Sugar().Errorw("Failed to generate operator share snapshots", "error", err) return err @@ -114,19 +95,3 @@ func (r *RewardsCalculator) GenerateAndInsertOperatorShareSnapshots(snapshotDate return nil } - -func (r *RewardsCalculator) CreateOperatorSharesSnapshotsTable() error { - res := r.grm.Exec(` - CREATE TABLE IF NOT EXISTS operator_share_snapshots ( - operator TEXT, - strategy TEXT, - shares TEXT, - snapshot TEXT - ) - `) - if res.Error != nil { - r.logger.Error("Failed to create operator share snapshots table", zap.Error(res.Error)) - return res.Error - } - return nil -} diff --git a/pkg/rewards/operatorShareSnapshots_test.go b/pkg/rewards/operatorShareSnapshots_test.go index 25727723..41f5c713 100644 --- a/pkg/rewards/operatorShareSnapshots_test.go +++ b/pkg/rewards/operatorShareSnapshots_test.go @@ -4,13 +4,13 @@ import ( "fmt" "github.com/Layr-Labs/go-sidecar/internal/config" "github.com/Layr-Labs/go-sidecar/internal/logger" - "github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations" + "github.com/Layr-Labs/go-sidecar/internal/postgres" "github.com/Layr-Labs/go-sidecar/internal/tests" - "github.com/Layr-Labs/go-sidecar/internal/tests/sqlite" "github.com/stretchr/testify/assert" "go.uber.org/zap" "gorm.io/gorm" "testing" + "time" ) func setupOperatorShareSnapshot() ( @@ -20,30 +20,39 @@ func setupOperatorShareSnapshot() ( *zap.Logger, error, ) { + testContext := getRewardsTestContext() cfg := tests.GetConfig() + switch testContext { + case "testnet": + cfg.Chain = config.Chain_Holesky + case "testnet-reduced": + cfg.Chain = config.Chain_Holesky + case "mainnet-reduced": + cfg.Chain = config.Chain_Mainnet + default: + return "", nil, nil, nil, fmt.Errorf("Unknown test context") + } + + cfg.DatabaseConfig = *tests.GetDbConfigFromEnv() + l, _ := logger.NewLogger(&logger.LoggerConfig{Debug: cfg.Debug}) - dbFileName, db, err := sqlite.GetFileBasedSqliteDatabaseConnection(l) + dbname, _, grm, err := postgres.GetTestPostgresDatabase(cfg.DatabaseConfig, l) if err != nil { - panic(err) - } - sqliteMigrator := migrations.NewSqliteMigrator(db, l) - if err := sqliteMigrator.MigrateAll(); err != nil { - l.Sugar().Fatalw("Failed to migrate", "error", err) + return dbname, nil, nil, nil, err } - return dbFileName, cfg, db, l, err + return dbname, cfg, grm, l, nil } -func teardownOperatorShareSnapshot(grm *gorm.DB) { - queries := []string{ - `delete from operator_shares`, - `delete from blocks`, - } - for _, query := range queries { - if res := grm.Exec(query); res.Error != nil { - fmt.Printf("Failed to run query: %v\n", res.Error) - } +func teardownOperatorShareSnapshot(dbname string, cfg *config.Config, db *gorm.DB, l *zap.Logger) { + rawDb, _ := db.DB() + _ = rawDb.Close() + + pgConfig := postgres.PostgresConfigFromDbConfig(&cfg.DatabaseConfig) + + if err := postgres.DeleteTestDatabase(pgConfig, dbname); err != nil { + l.Sugar().Errorw("Failed to delete test database", "error", err) } } @@ -81,6 +90,8 @@ func Test_OperatorShareSnapshots(t *testing.T) { t.Fatal(err) } + startDate := "1970-01-01" + t.Run("Should hydrate dependency tables", func(t *testing.T) { if _, err = hydrateAllBlocksTable(grm, l); err != nil { t.Error(err) @@ -93,7 +104,7 @@ func Test_OperatorShareSnapshots(t *testing.T) { rewards, _ := NewRewardsCalculator(l, grm, cfg) t.Log("Generating operator share snapshots") - snapshots, err := rewards.GenerateOperatorShareSnapshots(snapshotDate) + snapshots, err := rewards.GenerateOperatorShareSnapshots(startDate, snapshotDate) assert.Nil(t, err) t.Log("Loading expected results") @@ -116,8 +127,9 @@ func Test_OperatorShareSnapshots(t *testing.T) { // Go line-by-line in the snapshot results and find the corresponding line in the expected results. // If one doesnt exist, add it to the missing list. for _, snapshot := range snapshots { + snapshotStr := snapshot.Snapshot.Format(time.DateOnly) - slotId := fmt.Sprintf("%s_%s_%s", snapshot.Operator, snapshot.Strategy, snapshot.Snapshot) + slotId := fmt.Sprintf("%s_%s_%s", snapshot.Operator, snapshot.Strategy, snapshotStr) found, ok := mappedExpectedResults[slotId] if !ok { @@ -139,7 +151,6 @@ func Test_OperatorShareSnapshots(t *testing.T) { } }) t.Cleanup(func() { - teardownOperatorShareSnapshot(grm) - tests.DeleteTestSqliteDB(dbFileName) + teardownOperatorShareSnapshot(dbFileName, cfg, grm, l) }) } diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index 8050c07f..89b1dec1 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -100,7 +100,6 @@ func (rc *RewardsCalculator) getMostRecentDistributionRoot() (*submittedDistribu func (rc *RewardsCalculator) initializeRewardsSchema() error { funcs := []func() error{ - rc.CreateOperatorSharesSnapshotsTable, rc.CreateStakerShareSnapshotsTable, rc.CreateStakerDelegationSnapshotsTable, @@ -144,7 +143,7 @@ func (rc *RewardsCalculator) generateSnapshotData(startDate string, snapshotDate } rc.logger.Sugar().Debugw("Generated operator AVS strategy snapshots") - if err = rc.GenerateAndInsertOperatorShareSnapshots(snapshotDate); err != nil { + if err = rc.GenerateAndInsertOperatorShareSnapshots(startDate, snapshotDate); err != nil { rc.logger.Sugar().Errorw("Failed to generate operator share snapshots", "error", err) return err } diff --git a/pkg/rewards/tables.go b/pkg/rewards/tables.go index f5ccb526..0fe60d04 100644 --- a/pkg/rewards/tables.go +++ b/pkg/rewards/tables.go @@ -31,3 +31,10 @@ type OperatorAvsStrategySnapshot struct { Strategy string Snapshot time.Time } + +type OperatorShareSnapshots struct { + Operator string + Strategy string + Shares string + Snapshot time.Time +}