Skip to content

Commit

Permalink
Migrate stakerDelegationSnapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Oct 24, 2024
1 parent 1236822 commit 799a457
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package _202410241450_stakerDelegationSnapshots

import (
"database/sql"
"gorm.io/gorm"
)

type Migration struct {
}

func (m *Migration) Up(db *sql.DB, grm *gorm.DB) error {
queries := []string{
`CREATE TABLE IF NOT EXISTS staker_delegation_snapshots (
staker varchar not null,
operator varchar not null,
snapshot date not null
)
`,
`create index idx_staker_delegation_snapshots_operator_snapshot on staker_delegation_snapshots (operator, snapshot)`,
}
for _, query := range queries {
if _, err := db.Exec(query); err != nil {
return err
}
}
return nil
}

func (m *Migration) GetName() string {
return "202410241450_stakerDelegationSnapshots"
}
2 changes: 2 additions & 0 deletions internal/postgres/migrations/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
_202410241313_operatorAvsRegistrationSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241313_operatorAvsRegistrationSnapshots"
_202410241417_operatorAvsStrategySnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241417_operatorAvsStrategySnapshots"
_202410241431_operatorShareSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241431_operatorShareSnapshots"
_202410241450_stakerDelegationSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241450_stakerDelegationSnapshots"
"go.uber.org/zap"
"gorm.io/gorm"
"time"
Expand Down Expand Up @@ -59,6 +60,7 @@ func (m *Migrator) MigrateAll() error {
&_202410241313_operatorAvsRegistrationSnapshots.Migration{},
&_202410241417_operatorAvsStrategySnapshots.Migration{},
&_202410241431_operatorShareSnapshots.Migration{},
&_202410241450_stakerDelegationSnapshots.Migration{},
}

for _, migration := range migrations {
Expand Down
4 changes: 3 additions & 1 deletion pkg/rewards/operatorAvsRegistrationSnaphots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"gorm.io/gorm"
"slices"
"testing"
"time"
)

func setupOperatorAvsRegistrationSnapshot() (
Expand Down Expand Up @@ -161,7 +162,8 @@ func Test_OperatorAvsRegistrationSnapshots(t *testing.T) {
t.Logf("Operator/AVS not found in results: %+v\n", window)
lacksExpectedResult = append(lacksExpectedResult, window)
} else {
if !slices.Contains(found, window.Snapshot) {
snapshotStr := window.Snapshot.Format(time.DateOnly)
if !slices.Contains(found, snapshotStr) {
t.Logf("Found operator/AVS, but no snapshot: %+v - %+v\n", window, found)
lacksExpectedResult = append(lacksExpectedResult, window)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/rewards/rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (rc *RewardsCalculator) getMostRecentDistributionRoot() (*submittedDistribu
func (rc *RewardsCalculator) initializeRewardsSchema() error {
funcs := []func() error{
rc.CreateStakerShareSnapshotsTable,
rc.CreateStakerDelegationSnapshotsTable,

// Gold tables
rc.CreateGold1ActiveRewardsTable,
Expand Down Expand Up @@ -155,7 +154,7 @@ func (rc *RewardsCalculator) generateSnapshotData(startDate string, snapshotDate
}
rc.logger.Sugar().Debugw("Generated staker share snapshots")

if err = rc.GenerateAndInsertStakerDelegationSnapshots(snapshotDate); err != nil {
if err = rc.GenerateAndInsertStakerDelegationSnapshots(startDate, snapshotDate); err != nil {
rc.logger.Sugar().Errorw("Failed to generate staker delegation snapshots", "error", err)
return err
}
Expand Down
112 changes: 37 additions & 75 deletions pkg/rewards/stakerDelegationSnapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,75 +14,59 @@ with staker_delegations_with_block_info as (
from staker_delegation_changes as sdc
left join blocks as b on (b.number = sdc.block_number)
),
ranked_staker_records as (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY staker, block_date ORDER BY block_time DESC, log_index desc) AS rn
FROM staker_delegations_with_block_info
ranked_delegations as (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY staker, cast(block_time AS DATE) ORDER BY block_time DESC, log_index DESC) AS rn
FROM staker_delegations_with_block_info
),
-- Get the latest record for each day & round up to the snapshot day
snapshotted_records as (
SELECT
staker,
operator,
block_time,
DATE(block_date, '+1 day') as snapshot_time
from ranked_staker_records
where rn = 1
SELECT
staker,
operator,
block_time,
date_trunc('day', block_time) + INTERVAL '1' day AS snapshot_time
from ranked_delegations
where rn = 1
),
-- Get the range for each operator, strategy pairing
staker_share_windows as (
SELECT
staker, operator, snapshot_time as start_time,
CASE
-- If the range does not have the end, use the current timestamp truncated to 0 UTC
WHEN LEAD(snapshot_time) OVER (PARTITION BY staker ORDER BY snapshot_time) is null THEN DATE(@cutoffDate)
ELSE LEAD(snapshot_time) OVER (PARTITION BY staker ORDER BY snapshot_time)
END AS end_time
FROM snapshotted_records
-- Get the range for each staker
staker_delegation_windows as (
SELECT
staker, operator, snapshot_time as start_time,
CASE
-- If the range does not have the end, use the cutoff date truncated to 0 UTC
WHEN LEAD(snapshot_time) OVER (PARTITION BY staker ORDER BY snapshot_time) is null THEN date_trunc('day', DATE(@cutoffDate))
ELSE LEAD(snapshot_time) OVER (PARTITION BY staker ORDER BY snapshot_time)
END AS end_time
FROM snapshotted_records
),
cleaned_records as (
SELECT * FROM staker_share_windows
SELECT * FROM staker_delegation_windows
WHERE start_time < end_time
),
date_bounds as (
select
min(start_time) as min_start,
max(end_time) as max_end
from cleaned_records
),
day_series AS (
with RECURSIVE day_series_inner AS (
SELECT DATE(min_start) AS day
FROM date_bounds
UNION ALL
SELECT DATE(day, '+1 day')
FROM day_series_inner
WHERE day < (SELECT max_end FROM date_bounds)
)
select * from day_series_inner
),
final_results as (
SELECT
staker,
operator,
day as snapshot
FROM cleaned_records
cross join day_series
where DATE(day) between DATE(start_time) and DATE(end_time, '-1 day')
cast(day AS DATE) AS snapshot
FROM
cleaned_records
CROSS JOIN
generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS day
)
select * from final_results
where
snapshot >= @startDate
and snapshot < @cutoffDate
`

type StakerDelegationSnapshot struct {
Staker string
Operator string
Snapshot string
}

func (r *RewardsCalculator) GenerateStakerDelegationSnapshots(snapshotDate string) ([]*StakerDelegationSnapshot, error) {
func (r *RewardsCalculator) GenerateStakerDelegationSnapshots(startDate string, snapshotDate string) ([]*StakerDelegationSnapshot, error) {
results := make([]*StakerDelegationSnapshot, 0)

res := r.grm.Raw(stakerDelegationSnapshotsQuery, sql.Named("cutoffDate", snapshotDate)).Scan(&results)
res := r.grm.Raw(stakerDelegationSnapshotsQuery,
sql.Named("startDate", startDate),
sql.Named("cutoffDate", snapshotDate),
).Scan(&results)

if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate staker delegation snapshots", "error", res.Error)
Expand All @@ -91,8 +75,8 @@ func (r *RewardsCalculator) GenerateStakerDelegationSnapshots(snapshotDate strin
return results, nil
}

func (r *RewardsCalculator) GenerateAndInsertStakerDelegationSnapshots(snapshotDate string) error {
snapshots, err := r.GenerateStakerDelegationSnapshots(snapshotDate)
func (r *RewardsCalculator) GenerateAndInsertStakerDelegationSnapshots(startDate string, snapshotDate string) error {
snapshots, err := r.GenerateStakerDelegationSnapshots(startDate, snapshotDate)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate staker delegation snapshots", "error", err)
return err
Expand All @@ -107,25 +91,3 @@ func (r *RewardsCalculator) GenerateAndInsertStakerDelegationSnapshots(snapshotD

return nil
}

func (r *RewardsCalculator) CreateStakerDelegationSnapshotsTable() error {
queries := []string{
`
CREATE TABLE IF NOT EXISTS staker_delegation_snapshots (
staker TEXT,
operator TEXT,
snapshot TEXT
)
`,
`create index idx_staker_delegation_snapshots_operator_snapshot on staker_delegation_snapshots (operator, snapshot)`,
}

for _, query := range queries {
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to create staker delegation snapshots table", "error", res.Error)
return res.Error
}
}
return nil
}
55 changes: 33 additions & 22 deletions pkg/rewards/stakerDelegationSnapshots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"fmt"
"github.com/Layr-Labs/go-sidecar/internal/config"
"github.com/Layr-Labs/go-sidecar/internal/logger"
"github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations"
"github.com/Layr-Labs/go-sidecar/internal/postgres"
"github.com/Layr-Labs/go-sidecar/internal/tests"
"github.com/Layr-Labs/go-sidecar/internal/tests/sqlite"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"gorm.io/gorm"
"slices"
"testing"
"time"
)

func setupStakerDelegationSnapshot() (
Expand All @@ -21,30 +21,39 @@ func setupStakerDelegationSnapshot() (
*zap.Logger,
error,
) {
testContext := getRewardsTestContext()
cfg := tests.GetConfig()
switch testContext {
case "testnet":
cfg.Chain = config.Chain_Holesky
case "testnet-reduced":
cfg.Chain = config.Chain_Holesky
case "mainnet-reduced":
cfg.Chain = config.Chain_Mainnet
default:
return "", nil, nil, nil, fmt.Errorf("Unknown test context")
}

cfg.DatabaseConfig = *tests.GetDbConfigFromEnv()

l, _ := logger.NewLogger(&logger.LoggerConfig{Debug: cfg.Debug})

dbFileName, db, err := sqlite.GetFileBasedSqliteDatabaseConnection(l)
dbname, _, grm, err := postgres.GetTestPostgresDatabase(cfg.DatabaseConfig, l)
if err != nil {
panic(err)
}
sqliteMigrator := migrations.NewSqliteMigrator(db, l)
if err := sqliteMigrator.MigrateAll(); err != nil {
l.Sugar().Fatalw("Failed to migrate", "error", err)
return dbname, nil, nil, nil, err
}

return dbFileName, cfg, db, l, err
return dbname, cfg, grm, l, nil
}

func teardownStakerDelegationSnapshot(grm *gorm.DB) {
queries := []string{
`delete from staker_delegation_changes`,
`delete from blocks`,
}
for _, query := range queries {
if res := grm.Exec(query); res.Error != nil {
fmt.Printf("Failed to run query: %v\n", res.Error)
}
func teardownStakerDelegationSnapshot(dbname string, cfg *config.Config, db *gorm.DB, l *zap.Logger) {
rawDb, _ := db.DB()
_ = rawDb.Close()

pgConfig := postgres.PostgresConfigFromDbConfig(&cfg.DatabaseConfig)

if err := postgres.DeleteTestDatabase(pgConfig, dbname); err != nil {
l.Sugar().Errorw("Failed to delete test database", "error", err)
}
}

Expand Down Expand Up @@ -82,6 +91,8 @@ func Test_StakerDelegationSnapshots(t *testing.T) {
t.Fatal(err)
}

startDate := "1970-01-01"

t.Run("Should hydrate dependency tables", func(t *testing.T) {
if _, err := hydrateAllBlocksTable(grm, l); err != nil {
t.Error(err)
Expand All @@ -94,7 +105,7 @@ func Test_StakerDelegationSnapshots(t *testing.T) {
rewards, _ := NewRewardsCalculator(l, grm, cfg)

t.Log("Generating staker delegation snapshots")
snapshots, err := rewards.GenerateStakerDelegationSnapshots(snapshotDate)
snapshots, err := rewards.GenerateStakerDelegationSnapshots(startDate, snapshotDate)
assert.Nil(t, err)

t.Log("Getting expected results")
Expand Down Expand Up @@ -125,7 +136,8 @@ func Test_StakerDelegationSnapshots(t *testing.T) {
t.Logf("Staker/operator not found in results: %+v\n", snapshot)
lacksExpectedResult = append(lacksExpectedResult, snapshot)
} else {
if !slices.Contains(found, snapshot.Snapshot) {
snapshotStr := snapshot.Snapshot.Format(time.DateOnly)
if !slices.Contains(found, snapshotStr) {
t.Logf("Found staker operator, but no snapshot: %+v - %+v\n", snapshot, found)
lacksExpectedResult = append(lacksExpectedResult, snapshot)
}
Expand All @@ -141,7 +153,6 @@ func Test_StakerDelegationSnapshots(t *testing.T) {
}
})
t.Cleanup(func() {
teardownStakerDelegationSnapshot(grm)
tests.DeleteTestSqliteDB(dbFileName)
teardownStakerDelegationSnapshot(dbFileName, cfg, grm, l)
})
}
8 changes: 7 additions & 1 deletion pkg/rewards/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type CombinedRewards struct {
type OperatorAvsRegistrationSnapshots struct {
Avs string
Operator string
Snapshot string
Snapshot time.Time
}

type OperatorAvsStrategySnapshot struct {
Expand All @@ -38,3 +38,9 @@ type OperatorShareSnapshots struct {
Shares string
Snapshot time.Time
}

type StakerDelegationSnapshot struct {
Staker string
Operator string
Snapshot time.Time
}

0 comments on commit 799a457

Please sign in to comment.