Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
0xrajath committed Nov 15, 2024
1 parent 03433c1 commit c9a7384
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 133 deletions.
20 changes: 10 additions & 10 deletions pkg/eigenState/eigenState.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package eigenState

import (
"github.com/Layr-Labs/go-sidecar/internal/config"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/avsOperators"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/disabledDistributionRoots"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/operatorDirectedRewardSubmissions"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/operatorShares"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/rewardSubmissions"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/stakerDelegations"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/stakerShares"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/stateManager"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/submittedDistributionRoots"
"github.com/Layr-Labs/sidecar/internal/config"
"github.com/Layr-Labs/sidecar/pkg/eigenState/avsOperators"
"github.com/Layr-Labs/sidecar/pkg/eigenState/disabledDistributionRoots"
"github.com/Layr-Labs/sidecar/pkg/eigenState/operatorDirectedRewardSubmissions"
"github.com/Layr-Labs/sidecar/pkg/eigenState/operatorShares"
"github.com/Layr-Labs/sidecar/pkg/eigenState/rewardSubmissions"
"github.com/Layr-Labs/sidecar/pkg/eigenState/stakerDelegations"
"github.com/Layr-Labs/sidecar/pkg/eigenState/stakerShares"
"github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager"
"github.com/Layr-Labs/sidecar/pkg/eigenState/submittedDistributionRoots"
"go.uber.org/zap"
"gorm.io/gorm"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,61 +1,51 @@
package operatorDirectedRewardSubmissions

import (
"database/sql"
"encoding/json"
"fmt"
"slices"
"sort"
"strings"
"time"

"github.com/Layr-Labs/go-sidecar/pkg/storage"
"github.com/Layr-Labs/go-sidecar/pkg/types/numbers"
"github.com/Layr-Labs/go-sidecar/pkg/utils"
"github.com/Layr-Labs/sidecar/pkg/storage"
"github.com/Layr-Labs/sidecar/pkg/types/numbers"
"github.com/Layr-Labs/sidecar/pkg/utils"

"github.com/Layr-Labs/go-sidecar/internal/config"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/base"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/stateManager"
"github.com/Layr-Labs/go-sidecar/pkg/eigenState/types"
"github.com/Layr-Labs/sidecar/internal/config"
"github.com/Layr-Labs/sidecar/pkg/eigenState/base"
"github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager"
"github.com/Layr-Labs/sidecar/pkg/eigenState/types"
"go.uber.org/zap"
"golang.org/x/xerrors"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type OperatorDirectedRewardSubmission struct {
Avs string
RewardHash string
Token string
Operator string
OperatorIndex uint64
Amount string
Strategy string
StrategyIndex uint64
Multiplier string
StartTimestamp *time.Time
EndTimestamp *time.Time
Duration uint64
BlockNumber uint64
Avs string
RewardHash string
Token string
Amount string
Strategy string
StrategyIndex uint64
Multiplier string
StartTimestamp *time.Time
EndTimestamp *time.Time
Duration uint64
BlockNumber uint64
RewardType string // avs, all_stakers, all_earners
TransactionHash string
LogIndex uint64
}

type RewardSubmissionDiff struct {
OperatorDirectedRewardSubmission *OperatorDirectedRewardSubmission
IsNew bool
IsNoLongerActive bool
}

type OperatorDirectedRewardSubmissions struct {
Submissions []*OperatorDirectedRewardSubmission
}

func NewSlotID(rewardHash string, strategy string, operator string) types.SlotID {
return types.SlotID(fmt.Sprintf("%s_%s_%s", rewardHash, strategy, operator))
func NewSlotID(transactionHash string, logIndex uint64, rewardHash string, strategyIndex uint64) types.SlotID {
return base.NewSlotIDWithSuffix(transactionHash, logIndex, fmt.Sprintf("%s_%d", rewardHash, strategyIndex))
}

type OperatorDirectedRewardSubmissionsModel struct {
base.BaseEigenState
StateTransitions types.StateTransitions[OperatorDirectedRewardSubmission]
StateTransitions types.StateTransitions[[]*OperatorDirectedRewardSubmission]
DB *gorm.DB
Network config.Network
Environment config.Environment
Expand All @@ -82,7 +72,7 @@ func NewOperatorDirectedRewardSubmissionsModel(
stateAccumulator: make(map[uint64]map[types.SlotID]*OperatorDirectedRewardSubmission),
}

esm.RegisterState(model, 7)
esm.RegisterState(model, 5)
return model, nil
}

Expand All @@ -101,13 +91,13 @@ type genericRewardPaymentData struct {
} `json:"strategiesAndMultipliers"`
}

type rewardSubmissionOutputData struct {
type operatorDirectedRewardSubmissionOutputData struct {
RewardsSubmission *genericRewardPaymentData `json:"rewardsSubmission"`
RangePayment *genericRewardPaymentData `json:"rangePayment"`
}

func parseRewardSubmissionOutputData(outputDataStr string) (*rewardSubmissionOutputData, error) {
outputData := &rewardSubmissionOutputData{}
func parseRewardSubmissionOutputData(outputDataStr string) (*operatorDirectedRewardSubmissionOutputData, error) {
outputData := &operatorDirectedRewardSubmissionOutputData{}
decoder := json.NewDecoder(strings.NewReader(outputDataStr))
decoder.UseNumber()

Expand All @@ -119,7 +109,7 @@ func parseRewardSubmissionOutputData(outputDataStr string) (*rewardSubmissionOut
return outputData, err
}

func (odrs *OperatorDirectedRewardSubmissionsModel) handleRewardSubmissionCreatedEvent(log *storage.TransactionLog) (*OperatorDirectedRewardSubmissions, error) {
func (odrs *OperatorDirectedRewardSubmissionsModel) handleRewardSubmissionCreatedEvent(log *storage.TransactionLog) ([]*OperatorDirectedRewardSubmission, error) {
arguments, err := odrs.ParseLogArguments(log)
if err != nil {
return nil, err
Expand All @@ -139,7 +129,7 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) handleRewardSubmissionCreate

rewardSubmissions := make([]*OperatorDirectedRewardSubmission, 0)

for _, strategyAndMultiplier := range actualOuputData.StrategiesAndMultipliers {
for i, strategyAndMultiplier := range actualOuputData.StrategiesAndMultipliers {
startTimestamp := time.Unix(int64(actualOuputData.StartTimestamp), 0)
endTimestamp := startTimestamp.Add(time.Duration(actualOuputData.Duration) * time.Second)

Expand All @@ -165,38 +155,42 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) handleRewardSubmissionCreate
}

rewardSubmission := &OperatorDirectedRewardSubmission{
Avs: strings.ToLower(arguments[0].Value.(string)),
RewardHash: strings.ToLower(arguments[2].Value.(string)),
Token: strings.ToLower(actualOuputData.Token),
Amount: amountBig.String(),
Strategy: strategyAndMultiplier.Strategy,
Multiplier: multiplierBig.String(),
StartTimestamp: &startTimestamp,
EndTimestamp: &endTimestamp,
Duration: actualOuputData.Duration,
BlockNumber: log.BlockNumber,
RewardType: rewardType,
Avs: strings.ToLower(arguments[0].Value.(string)),
RewardHash: strings.ToLower(arguments[2].Value.(string)),
Token: strings.ToLower(actualOuputData.Token),
Amount: amountBig.String(),
Strategy: strategyAndMultiplier.Strategy,
Multiplier: multiplierBig.String(),
StartTimestamp: &startTimestamp,
EndTimestamp: &endTimestamp,
Duration: actualOuputData.Duration,
BlockNumber: log.BlockNumber,
RewardType: rewardType,
TransactionHash: log.TransactionHash,
LogIndex: log.LogIndex,
StrategyIndex: uint64(i),
}
rewardSubmissions = append(rewardSubmissions, rewardSubmission)
}

return &OperatorDirectedRewardSubmissions{Submissions: rewardSubmissions}, nil
return rewardSubmissions, nil
}

func (odrs *OperatorDirectedRewardSubmissionsModel) GetStateTransitions() (types.StateTransitions[OperatorDirectedRewardSubmissions], []uint64) {
stateChanges := make(types.StateTransitions[OperatorDirectedRewardSubmissions])
func (odrs *OperatorDirectedRewardSubmissionsModel) GetStateTransitions() (types.StateTransitions[[]*OperatorDirectedRewardSubmission], []uint64) {
stateChanges := make(types.StateTransitions[[]*OperatorDirectedRewardSubmission])

stateChanges[0] = func(log *storage.TransactionLog) (*OperatorDirectedRewardSubmissions, error) {
stateChanges[0] = func(log *storage.TransactionLog) ([]*OperatorDirectedRewardSubmission, error) {
rewardSubmissions, err := odrs.handleRewardSubmissionCreatedEvent(log)
if err != nil {
return nil, err
}

for _, rewardSubmission := range rewardSubmissions.Submissions {
slotId := NewSlotID(rewardSubmission.RewardHash, rewardSubmission.Strategy, rewardSubmission.Operator)
for _, rewardSubmission := range rewardSubmissions {
slotId := NewSlotID(rewardSubmission.TransactionHash, rewardSubmission.LogIndex, rewardSubmission.RewardHash, rewardSubmission.StrategyIndex)

_, ok := odrs.stateAccumulator[log.BlockNumber][slotId]
if ok {
fmt.Printf("Submissions: %+v\n", odrs.stateAccumulator[log.BlockNumber])
err := xerrors.Errorf("Duplicate distribution root submitted for slot %s at block %d", slotId, log.BlockNumber)
odrs.logger.Sugar().Errorw("Duplicate distribution root submitted", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -225,7 +219,11 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) getContractAddressesForEnvir
contracts := odrs.globalConfig.GetContractsMapForChain()
return map[string][]string{
contracts.RewardsCoordinator: {
"OperatorDirectedAVSRewardsSubmissionCreated",
"RangePaymentForAllCreated",
"RewardsSubmissionForAllCreated",
"RangePaymentCreated",
"AVSRewardsSubmissionCreated",
"RewardsSubmissionForAllEarnersCreated",
},
}
}
Expand Down Expand Up @@ -266,76 +264,31 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) HandleStateChange(log *stora
}

// prepareState prepares the state for commit by adding the new state to the existing state.
func (odrs *OperatorDirectedRewardSubmissionsModel) prepareState(blockNumber uint64) ([]*RewardSubmissionDiff, []*RewardSubmissionDiff, error) {
func (odrs *OperatorDirectedRewardSubmissionsModel) prepareState(blockNumber uint64) ([]*OperatorDirectedRewardSubmission, error) {
accumulatedState, ok := odrs.stateAccumulator[blockNumber]
if !ok {
err := xerrors.Errorf("No accumulated state found for block %d", blockNumber)
odrs.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber))
return nil, nil, err
}

currentBlock := &storage.Block{}
err := odrs.DB.Where("number = ?", blockNumber).First(currentBlock).Error
if err != nil {
odrs.logger.Sugar().Errorw("Failed to fetch block", zap.Error(err), zap.Uint64("blockNumber", blockNumber))
return nil, nil, err
}

inserts := make([]*RewardSubmissionDiff, 0)
for _, change := range accumulatedState {
if change == nil {
continue
}

inserts = append(inserts, &RewardSubmissionDiff{
OperatorDirectedRewardSubmission: change,
IsNew: true,
})
}

// find all the records that are no longer active
noLongerActiveSubmissions := make([]*OperatorDirectedRewardSubmission, 0)
query := `
select
*
from reward_submissions
where
block_number = @previousBlock
and end_timestamp <= @blockTime
`
res := odrs.DB.
Model(&OperatorDirectedRewardSubmission{}).
Raw(query,
sql.Named("previousBlock", blockNumber-1),
sql.Named("blockTime", currentBlock.BlockTime),
).
Find(&noLongerActiveSubmissions)

if res.Error != nil {
odrs.logger.Sugar().Errorw("Failed to fetch no longer active submissions", zap.Error(res.Error))
return nil, nil, res.Error
return nil, err
}

deletes := make([]*RewardSubmissionDiff, 0)
for _, submission := range noLongerActiveSubmissions {
deletes = append(deletes, &RewardSubmissionDiff{
OperatorDirectedRewardSubmission: submission,
IsNoLongerActive: true,
})
recordsToInsert := make([]*OperatorDirectedRewardSubmission, 0)
for _, submission := range accumulatedState {
recordsToInsert = append(recordsToInsert, submission)
}
return inserts, deletes, nil
return recordsToInsert, nil
}

// CommitFinalState commits the final state for the given block number.
func (odrs *OperatorDirectedRewardSubmissionsModel) CommitFinalState(blockNumber uint64) error {
recordsToInsert, _, err := odrs.prepareState(blockNumber)
recordsToInsert, err := odrs.prepareState(blockNumber)
if err != nil {
return err
}

if len(recordsToInsert) > 0 {
for _, record := range recordsToInsert {
res := odrs.DB.Model(&OperatorDirectedRewardSubmission{}).Clauses(clause.Returning{}).Create(&record.OperatorDirectedRewardSubmission)
res := odrs.DB.Model(&OperatorDirectedRewardSubmission{}).Clauses(clause.Returning{}).Create(&record)
if res.Error != nil {
odrs.logger.Sugar().Errorw("Failed to insert records", zap.Error(res.Error))
return res.Error
Expand All @@ -347,16 +300,12 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) CommitFinalState(blockNumber

// GenerateStateRoot generates the state root for the given block number using the results of the state changes.
func (odrs *OperatorDirectedRewardSubmissionsModel) GenerateStateRoot(blockNumber uint64) (types.StateRoot, error) {
inserts, deletes, err := odrs.prepareState(blockNumber)
inserts, err := odrs.prepareState(blockNumber)
if err != nil {
return "", err
}

combinedResults := make([]*RewardSubmissionDiff, 0)
combinedResults = append(combinedResults, inserts...)
combinedResults = append(combinedResults, deletes...)

inputs := odrs.sortValuesForMerkleTree(combinedResults)
inputs := odrs.sortValuesForMerkleTree(inserts)

if len(inputs) == 0 {
return "", nil
Expand All @@ -374,14 +323,11 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) GenerateStateRoot(blockNumbe
return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil
}

func (odrs *OperatorDirectedRewardSubmissionsModel) sortValuesForMerkleTree(submissions []*RewardSubmissionDiff) []*base.MerkleTreeInput {
func (odrs *OperatorDirectedRewardSubmissionsModel) sortValuesForMerkleTree(submissions []*OperatorDirectedRewardSubmission) []*base.MerkleTreeInput {
inputs := make([]*base.MerkleTreeInput, 0)
for _, submission := range submissions {
slotID := NewSlotID(submission.OperatorDirectedRewardSubmission.RewardHash, submission.OperatorDirectedRewardSubmission.Strategy, submission.OperatorDirectedRewardSubmission.Operator)
slotID := NewSlotID(submission.TransactionHash, submission.LogIndex, submission.RewardHash, submission.StrategyIndex)
value := "added"
if submission.IsNoLongerActive {
value = "removed"
}
inputs = append(inputs, &base.MerkleTreeInput{
SlotID: slotID,
Value: []byte(value),
Expand All @@ -396,5 +342,5 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) sortValuesForMerkleTree(subm
}

func (odrs *OperatorDirectedRewardSubmissionsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return odrs.BaseEigenState.DeleteState("operator_directed_reward_submissions", startBlockNumber, endBlockNumber, odrs.DB)
return odrs.BaseEigenState.DeleteState("reward_submissions", startBlockNumber, endBlockNumber, odrs.DB)
}

0 comments on commit c9a7384

Please sign in to comment.