Skip to content

Commit

Permalink
Migrate stakerShareSnapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Oct 24, 2024
1 parent 799a457 commit 34bf438
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package _202410241456_stakerShareSnapshots

import (
"database/sql"
"gorm.io/gorm"
)

type Migration struct {
}

func (m *Migration) Up(db *sql.DB, grm *gorm.DB) error {
queries := []string{
`CREATE TABLE IF NOT EXISTS staker_share_snapshots (
staker varchar not null,
strategy varchar not null,
shares numeric not null,
snapshot date not null
)
`,
`create index idx_staker_share_snapshots_staker_strategy_snapshot on staker_share_snapshots (staker, strategy, snapshot)`,
`create index idx_staker_share_snapshots_strategy_snapshot on staker_share_snapshots (strategy, snapshot)`,
}
for _, query := range queries {
if _, err := db.Exec(query); err != nil {
return err
}
}
return nil
}

func (m *Migration) GetName() string {
return "202410241456_stakerShareSnapshots"
}
2 changes: 2 additions & 0 deletions internal/postgres/migrations/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
_202410241417_operatorAvsStrategySnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241417_operatorAvsStrategySnapshots"
_202410241431_operatorShareSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241431_operatorShareSnapshots"
_202410241450_stakerDelegationSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241450_stakerDelegationSnapshots"
_202410241456_stakerShareSnapshots "github.com/Layr-Labs/go-sidecar/internal/postgres/migrations/202410241456_stakerShareSnapshots"
"go.uber.org/zap"
"gorm.io/gorm"
"time"
Expand Down Expand Up @@ -61,6 +62,7 @@ func (m *Migrator) MigrateAll() error {
&_202410241417_operatorAvsStrategySnapshots.Migration{},
&_202410241431_operatorShareSnapshots.Migration{},
&_202410241450_stakerDelegationSnapshots.Migration{},
&_202410241456_stakerShareSnapshots.Migration{},
}

for _, migration := range migrations {
Expand Down
3 changes: 1 addition & 2 deletions pkg/rewards/rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (rc *RewardsCalculator) getMostRecentDistributionRoot() (*submittedDistribu

func (rc *RewardsCalculator) initializeRewardsSchema() error {
funcs := []func() error{
rc.CreateStakerShareSnapshotsTable,

// Gold tables
rc.CreateGold1ActiveRewardsTable,
Expand Down Expand Up @@ -148,7 +147,7 @@ func (rc *RewardsCalculator) generateSnapshotData(startDate string, snapshotDate
}
rc.logger.Sugar().Debugw("Generated operator share snapshots")

if err = rc.GenerateAndInsertStakerShareSnapshots(snapshotDate); err != nil {
if err = rc.GenerateAndInsertStakerShareSnapshots(startDate, snapshotDate); err != nil {
rc.logger.Sugar().Errorw("Failed to generate staker share snapshots", "error", err)
return err
}
Expand Down
107 changes: 34 additions & 73 deletions pkg/rewards/stakerShareSnapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,77 +15,60 @@ with staker_shares_with_block_info as (
left join blocks as b on (b.number = ss.block_number)
),
ranked_staker_records as (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY staker, strategy, block_date ORDER BY block_time DESC) AS rn
FROM staker_shares_with_block_info
SELECT *,
ROW_NUMBER() OVER (PARTITION BY staker, strategy, cast(block_time AS DATE) ORDER BY block_time DESC) AS rn
FROM staker_shares_with_block_info
),
-- Get the latest record for each day & round up to the snapshot day
snapshotted_records as (
SELECT
staker,
strategy,
shares,
block_time,
DATE(block_date, '+1 day') as snapshot_time
from ranked_staker_records
where rn = 1
SELECT
staker,
strategy,
shares,
block_time,
date_trunc('day', block_time) + INTERVAL '1' day AS snapshot_time
from ranked_staker_records
where rn = 1
),
-- Get the range for each operator, strategy pairing
staker_share_windows as (
SELECT
staker, strategy, shares, snapshot_time as start_time,
CASE
-- If the range does not have the end, use the current timestamp truncated to 0 UTC
WHEN LEAD(snapshot_time) OVER (PARTITION BY staker, strategy ORDER BY snapshot_time) is null THEN DATE(@cutoffDate)
ELSE LEAD(snapshot_time) OVER (PARTITION BY staker, strategy ORDER BY snapshot_time)
END AS end_time
FROM snapshotted_records
SELECT
staker, strategy, shares, snapshot_time as start_time,
CASE
-- If the range does not have the end, use the current timestamp truncated to 0 UTC
WHEN LEAD(snapshot_time) OVER (PARTITION BY staker, strategy ORDER BY snapshot_time) is null THEN date_trunc('day', DATE(@cutoffDate))
ELSE LEAD(snapshot_time) OVER (PARTITION BY staker, strategy ORDER BY snapshot_time)
END AS end_time
FROM snapshotted_records
),
cleaned_records as (
SELECT * FROM staker_share_windows
WHERE start_time < end_time
),
date_bounds as (
select
min(start_time) as min_start,
max(end_time) as max_end
from cleaned_records
),
day_series AS (
with RECURSIVE day_series_inner AS (
SELECT DATE(min_start) AS day
FROM date_bounds
UNION ALL
SELECT DATE(day, '+1 day')
FROM day_series_inner
WHERE day < (SELECT max_end FROM date_bounds)
)
select * from day_series_inner
),
final_results as (
SELECT
staker,
strategy,
shares,
day as snapshot
FROM cleaned_records
cross join day_series
where DATE(day) between DATE(start_time) and DATE(end_time, '-1 day')
cast(day AS DATE) AS snapshot
FROM
cleaned_records
CROSS JOIN
generate_series(DATE(start_time), DATE(end_time) - interval '1' day, interval '1' day) AS day
)
select * from final_results
where
snapshot >= @startDate
and snapshot < @cutoffDate
`

type StakerShareSnapshot struct {
Staker string
Strategy string
Snapshot string
Shares string
}

func (r *RewardsCalculator) GenerateStakerShareSnapshots(snapshotDate string) ([]*StakerShareSnapshot, error) {
func (r *RewardsCalculator) GenerateStakerShareSnapshots(startDate string, snapshotDate string) ([]*StakerShareSnapshot, error) {
results := make([]*StakerShareSnapshot, 0)

res := r.grm.Raw(stakerShareSnapshotsQuery, sql.Named("cutoffDate", snapshotDate)).Scan(&results)
res := r.grm.Raw(stakerShareSnapshotsQuery,
sql.Named("startDate", startDate),
sql.Named("cutoffDate", snapshotDate),
).Scan(&results)

if res.Error != nil {
r.logger.Sugar().Errorw("Failed to generate staker share snapshots", "error", res.Error)
Expand All @@ -94,8 +77,8 @@ func (r *RewardsCalculator) GenerateStakerShareSnapshots(snapshotDate string) ([
return results, nil
}

func (r *RewardsCalculator) GenerateAndInsertStakerShareSnapshots(snapshotDate string) error {
snapshots, err := r.GenerateStakerShareSnapshots(snapshotDate)
func (r *RewardsCalculator) GenerateAndInsertStakerShareSnapshots(startDate string, snapshotDate string) error {
snapshots, err := r.GenerateStakerShareSnapshots(startDate, snapshotDate)
if err != nil {
r.logger.Sugar().Errorw("Failed to generate staker share snapshots", "error", err)
return err
Expand All @@ -109,25 +92,3 @@ func (r *RewardsCalculator) GenerateAndInsertStakerShareSnapshots(snapshotDate s
}
return nil
}

func (r *RewardsCalculator) CreateStakerShareSnapshotsTable() error {
queries := []string{
`CREATE TABLE IF NOT EXISTS staker_share_snapshots (
staker TEXT,
strategy TEXT,
shares TEXT,
snapshot TEXT
)
`,
`create index idx_staker_share_snapshots_staker_strategy_snapshot on staker_share_snapshots (staker, strategy, snapshot)`,
`create index idx_staker_share_snapshots_strategy_snapshot on staker_share_snapshots (strategy, snapshot)`,
}
for _, query := range queries {
res := r.grm.Exec(query)
if res.Error != nil {
r.logger.Sugar().Errorw("Failed to create staker share snapshots table", "error", res.Error)
return res.Error
}
}
return nil
}
55 changes: 33 additions & 22 deletions pkg/rewards/stakerShareSnapshots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"fmt"
"github.com/Layr-Labs/go-sidecar/internal/config"
"github.com/Layr-Labs/go-sidecar/internal/logger"
"github.com/Layr-Labs/go-sidecar/internal/sqlite/migrations"
"github.com/Layr-Labs/go-sidecar/internal/postgres"
"github.com/Layr-Labs/go-sidecar/internal/tests"
"github.com/Layr-Labs/go-sidecar/internal/tests/sqlite"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"gorm.io/gorm"
"testing"
"time"
)

func setupStakerShareSnapshot() (
Expand All @@ -20,30 +20,39 @@ func setupStakerShareSnapshot() (
*zap.Logger,
error,
) {
testContext := getRewardsTestContext()
cfg := tests.GetConfig()
switch testContext {
case "testnet":
cfg.Chain = config.Chain_Holesky
case "testnet-reduced":
cfg.Chain = config.Chain_Holesky
case "mainnet-reduced":
cfg.Chain = config.Chain_Mainnet
default:
return "", nil, nil, nil, fmt.Errorf("Unknown test context")
}

cfg.DatabaseConfig = *tests.GetDbConfigFromEnv()

l, _ := logger.NewLogger(&logger.LoggerConfig{Debug: cfg.Debug})

dbFileName, db, err := sqlite.GetFileBasedSqliteDatabaseConnection(l)
dbname, _, grm, err := postgres.GetTestPostgresDatabase(cfg.DatabaseConfig, l)
if err != nil {
panic(err)
}
sqliteMigrator := migrations.NewSqliteMigrator(db, l)
if err := sqliteMigrator.MigrateAll(); err != nil {
l.Sugar().Fatalw("Failed to migrate", "error", err)
return dbname, nil, nil, nil, err
}

return dbFileName, cfg, db, l, err
return dbname, cfg, grm, l, nil
}

func teardownStakerShareSnapshot(grm *gorm.DB) {
queries := []string{
`delete from staker_shares`,
`delete from blocks`,
}
for _, query := range queries {
if res := grm.Exec(query); res.Error != nil {
fmt.Printf("Failed to run query: %v\n", res.Error)
}
func teardownStakerShareSnapshot(dbname string, cfg *config.Config, db *gorm.DB, l *zap.Logger) {
rawDb, _ := db.DB()
_ = rawDb.Close()

pgConfig := postgres.PostgresConfigFromDbConfig(&cfg.DatabaseConfig)

if err := postgres.DeleteTestDatabase(pgConfig, dbname); err != nil {
l.Sugar().Errorw("Failed to delete test database", "error", err)
}
}

Expand Down Expand Up @@ -78,6 +87,8 @@ func Test_StakerShareSnapshots(t *testing.T) {

snapshotDate, err := getSnapshotDate()

startDate := "1970-01-01"

t.Run("Should hydrate dependency tables", func(t *testing.T) {
if _, err = hydrateAllBlocksTable(grm, l); err != nil {
t.Error(err)
Expand All @@ -90,7 +101,7 @@ func Test_StakerShareSnapshots(t *testing.T) {
rewards, _ := NewRewardsCalculator(l, grm, cfg)

t.Log("Generating staker share snapshots")
snapshots, err := rewards.GenerateStakerShareSnapshots(snapshotDate)
snapshots, err := rewards.GenerateStakerShareSnapshots(startDate, snapshotDate)
assert.Nil(t, err)

t.Log("Getting expected results")
Expand All @@ -113,7 +124,8 @@ func Test_StakerShareSnapshots(t *testing.T) {
// Go line-by-line in the snapshot results and find the corresponding line in the expected results.
// If one doesnt exist, add it to the missing list.
for _, snapshot := range snapshots {
slotId := fmt.Sprintf("%s_%s_%s", snapshot.Staker, snapshot.Strategy, snapshot.Snapshot)
snapshotStr := snapshot.Snapshot.Format(time.DateOnly)
slotId := fmt.Sprintf("%s_%s_%s", snapshot.Staker, snapshot.Strategy, snapshotStr)

found, ok := mappedExpectedResults[slotId]
if !ok {
Expand All @@ -135,7 +147,6 @@ func Test_StakerShareSnapshots(t *testing.T) {
}
})
t.Cleanup(func() {
teardownStakerShareSnapshot(grm)
tests.DeleteTestSqliteDB(dbFileName)
teardownStakerShareSnapshot(dbFileName, cfg, grm, l)
})
}
7 changes: 7 additions & 0 deletions pkg/rewards/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,10 @@ type StakerDelegationSnapshot struct {
Operator string
Snapshot time.Time
}

type StakerShareSnapshot struct {
Staker string
Strategy string
Snapshot time.Time
Shares string
}

0 comments on commit 34bf438

Please sign in to comment.