diff --git a/cmd/database.go b/cmd/database.go index 1009845a..95397133 100644 --- a/cmd/database.go +++ b/cmd/database.go @@ -15,6 +15,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/eventBus" "github.com/Layr-Labs/sidecar/pkg/fetcher" "github.com/Layr-Labs/sidecar/pkg/indexer" + "github.com/Layr-Labs/sidecar/pkg/metaState/metaStateManager" "github.com/Layr-Labs/sidecar/pkg/pipeline" "github.com/Layr-Labs/sidecar/pkg/postgres" "github.com/Layr-Labs/sidecar/pkg/postgres/migrations" @@ -87,6 +88,7 @@ var runDatabaseCmd = &cobra.Command{ } sm := stateManager.NewEigenStateManager(l, grm) + msm := metaStateManager.NewMetaStateManager(grm, l, cfg) if err := eigenState.LoadEigenStateModels(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to load eigen state models", zap.Error(err)) @@ -107,7 +109,7 @@ var runDatabaseCmd = &cobra.Command{ rcq := rewardsCalculatorQueue.NewRewardsCalculatorQueue(rc, l) - _ = pipeline.NewPipeline(fetchr, idxr, mds, sm, rc, rcq, cfg, sdc, eb, l) + _ = pipeline.NewPipeline(fetchr, idxr, mds, sm, msm, rc, rcq, cfg, sdc, eb, l) l.Sugar().Infow("Done") }, diff --git a/cmd/debugger/main.go b/cmd/debugger/main.go index 556ee807..ea9255fa 100644 --- a/cmd/debugger/main.go +++ b/cmd/debugger/main.go @@ -11,6 +11,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/eventBus" "github.com/Layr-Labs/sidecar/pkg/fetcher" "github.com/Layr-Labs/sidecar/pkg/indexer" + "github.com/Layr-Labs/sidecar/pkg/metaState/metaStateManager" "github.com/Layr-Labs/sidecar/pkg/pipeline" "github.com/Layr-Labs/sidecar/pkg/postgres" "github.com/Layr-Labs/sidecar/pkg/proofs" @@ -80,6 +81,7 @@ func main() { } sm := stateManager.NewEigenStateManager(l, grm) + msm := metaStateManager.NewMetaStateManager(grm, l, cfg) if err := eigenState.LoadEigenStateModels(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to load eigen state models", zap.Error(err)) @@ -100,14 +102,14 @@ func main() { rcq := rewardsCalculatorQueue.NewRewardsCalculatorQueue(rc, l) - p := pipeline.NewPipeline(fetchr, idxr, mds, sm, rc, rcq, cfg, sdc, eb, l) + p := pipeline.NewPipeline(fetchr, idxr, mds, sm, msm, rc, rcq, cfg, sdc, eb, l) rps := proofs.NewRewardsProofsStore(rc, l) // Create new sidecar instance // Create new sidecar instance _ = sidecar.NewSidecar(&sidecar.SidecarConfig{ GenesisBlockNumber: cfg.GetGenesisBlockNumber(), - }, cfg, mds, p, sm, rc, rcq, rps, l, client) + }, cfg, mds, p, sm, msm, rc, rcq, rps, l, client) rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ GrpcPort: cfg.RpcConfig.GrpcPort, diff --git a/cmd/run.go b/cmd/run.go index c0b96f2d..dbb246db 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -13,6 +13,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/eventBus" "github.com/Layr-Labs/sidecar/pkg/fetcher" "github.com/Layr-Labs/sidecar/pkg/indexer" + "github.com/Layr-Labs/sidecar/pkg/metaState/metaStateManager" "github.com/Layr-Labs/sidecar/pkg/pipeline" "github.com/Layr-Labs/sidecar/pkg/postgres" "github.com/Layr-Labs/sidecar/pkg/proofs" @@ -97,6 +98,7 @@ var runCmd = &cobra.Command{ } sm := stateManager.NewEigenStateManager(l, grm) + msm := metaStateManager.NewMetaStateManager(grm, l, cfg) if err := eigenState.LoadEigenStateModels(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to load eigen state models", zap.Error(err)) @@ -121,12 +123,12 @@ var runCmd = &cobra.Command{ go rcq.Process() - p := pipeline.NewPipeline(fetchr, idxr, mds, sm, rc, rcq, cfg, sdc, eb, l) + p := pipeline.NewPipeline(fetchr, idxr, mds, sm, msm, rc, rcq, cfg, sdc, eb, l) // Create new sidecar instance sidecar := sidecar.NewSidecar(&sidecar.SidecarConfig{ GenesisBlockNumber: cfg.GetGenesisBlockNumber(), - }, cfg, mds, p, sm, rc, rcq, rps, l, client) + }, cfg, mds, p, sm, msm, rc, rcq, rps, l, client) rpc := rpcServer.NewRpcServer(&rpcServer.RpcServerConfig{ GrpcPort: cfg.RpcConfig.GrpcPort, diff --git a/pkg/metaState/baseModel/baseModel.go b/pkg/metaState/baseModel/baseModel.go new file mode 100644 index 00000000..46133e57 --- /dev/null +++ b/pkg/metaState/baseModel/baseModel.go @@ -0,0 +1,88 @@ +package baseModel + +import ( + "database/sql" + "encoding/json" + "errors" + "fmt" + "github.com/Layr-Labs/sidecar/pkg/parser" + "github.com/Layr-Labs/sidecar/pkg/storage" + "go.uber.org/zap" + "gorm.io/gorm" + "slices" + "strings" +) + +func IsInterestingLog(contractsEvents map[string][]string, log *storage.TransactionLog) bool { + logAddress := strings.ToLower(log.Address) + if eventNames, ok := contractsEvents[logAddress]; ok { + if slices.Contains(eventNames, log.EventName) { + return true + } + } + return false +} + +func ParseLogArguments(log *storage.TransactionLog, l *zap.Logger) ([]parser.Argument, error) { + arguments := make([]parser.Argument, 0) + err := json.Unmarshal([]byte(log.Arguments), &arguments) + if err != nil { + l.Sugar().Errorw("Failed to unmarshal arguments", + zap.Error(err), + zap.String("transactionHash", log.TransactionHash), + zap.Uint64("transactionIndex", log.TransactionIndex), + ) + return nil, err + } + return arguments, nil +} + +func ParseLogOutput[T any](log *storage.TransactionLog, l *zap.Logger) (*T, error) { + var outputData *T + err := json.Unmarshal([]byte(log.OutputData), &outputData) + if err != nil { + l.Sugar().Errorw("Failed to unmarshal outputData", + zap.Error(err), + zap.String("transactionHash", log.TransactionHash), + zap.Uint64("transactionIndex", log.TransactionIndex), + ) + return nil, err + } + return outputData, nil +} + +func DeleteState(tableName string, startBlockNumber uint64, endBlockNumber uint64, db *gorm.DB, l *zap.Logger) error { + if endBlockNumber != 0 && endBlockNumber < startBlockNumber { + l.Sugar().Errorw("Invalid block range", + zap.Uint64("startBlockNumber", startBlockNumber), + zap.Uint64("endBlockNumber", endBlockNumber), + ) + return errors.New("Invalid block range; endBlockNumber must be greater than or equal to startBlockNumber") + } + + // tokenizing the table name apparently doesnt work, so we need to use Sprintf to include it. + query := fmt.Sprintf(` + delete from %s + where block_number >= @startBlockNumber + `, tableName) + if endBlockNumber > 0 { + query += " and block_number <= @endBlockNumber" + } + res := db.Exec(query, + sql.Named("tableName", tableName), + sql.Named("startBlockNumber", startBlockNumber), + sql.Named("endBlockNumber", endBlockNumber)) + if res.Error != nil { + l.Sugar().Errorw("Failed to delete state", zap.Error(res.Error)) + return res.Error + } + return nil +} + +func CastCommittedStateToInterface[T any](committedState []*T) []interface{} { + state := make([]interface{}, len(committedState)) + for i, v := range committedState { + state[i] = *v + } + return state +} diff --git a/pkg/metaState/metaState.go b/pkg/metaState/metaState.go new file mode 100644 index 00000000..34b96d6e --- /dev/null +++ b/pkg/metaState/metaState.go @@ -0,0 +1,23 @@ +package metaState + +import ( + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/metaState/metaStateManager" + "github.com/Layr-Labs/sidecar/pkg/metaState/rewardsClaimed" + "go.uber.org/zap" + "gorm.io/gorm" +) + +func LoadMetaStateModels( + msm *metaStateManager.MetaStateManager, + db *gorm.DB, + l *zap.Logger, + cfg *config.Config, +) error { + if _, err := rewardsClaimed.NewRewardsClaimedModel(db, l, cfg, msm); err != nil { + l.Sugar().Errorw("Failed to create RewardsClaimedModel", zap.Error(err)) + return err + } + + return nil +} diff --git a/pkg/metaState/metaStateManager/metaStateManager.go b/pkg/metaState/metaStateManager/metaStateManager.go new file mode 100644 index 00000000..9b4b76a2 --- /dev/null +++ b/pkg/metaState/metaStateManager/metaStateManager.go @@ -0,0 +1,90 @@ +package metaStateManager + +import ( + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/metaState/types" + "github.com/Layr-Labs/sidecar/pkg/storage" + "go.uber.org/zap" + "gorm.io/gorm" +) + +type MetaStateManager struct { + db *gorm.DB + logger *zap.Logger + globalConfig *config.Config + metaStateModels []types.IMetaStateModel +} + +func NewMetaStateManager(db *gorm.DB, l *zap.Logger, gc *config.Config) *MetaStateManager { + return &MetaStateManager{ + db: db, + logger: l, + globalConfig: gc, + metaStateModels: make([]types.IMetaStateModel, 0), + } +} + +func (msm *MetaStateManager) RegisterMetaStateModel(model types.IMetaStateModel) { + msm.metaStateModels = append(msm.metaStateModels, model) +} + +func (msm *MetaStateManager) InitProcessingForBlock(blockNumber uint64) error { + for _, model := range msm.metaStateModels { + if err := model.SetupStateForBlock(blockNumber); err != nil { + msm.logger.Sugar().Errorw("Failed to setup state for block", + "blockNumber", blockNumber, + "model", model, + "error", err, + ) + return err + } + } + return nil +} + +func (msm *MetaStateManager) CleanupProcessedStateForBlock(blockNumber uint64) error { + for _, model := range msm.metaStateModels { + if err := model.CleanupProcessedStateForBlock(blockNumber); err != nil { + msm.logger.Sugar().Errorw("Failed to cleanup state for block", + "blockNumber", blockNumber, + "model", model, + "error", err, + ) + return err + } + } + return nil +} + +func (msm *MetaStateManager) HandleTransactionLog(log *storage.TransactionLog) error { + for _, model := range msm.metaStateModels { + if model.IsInterestingLog(log) { + if _, err := model.HandleTransactionLog(log); err != nil { + msm.logger.Sugar().Errorw("Failed to handle log", + "log", log, + "model", model, + "error", err, + ) + return err + } + } + } + return nil +} + +func (msm *MetaStateManager) CommitFinalState(blockNumber uint64) (map[string][]interface{}, error) { + committedState := make(map[string][]interface{}) + for _, model := range msm.metaStateModels { + state, err := model.CommitFinalState(blockNumber) + if err != nil { + msm.logger.Sugar().Errorw("Failed to commit final state", + "blockNumber", blockNumber, + "model", model, + "error", err, + ) + return nil, err + } + committedState[model.ModelName()] = state + } + return committedState, nil +} diff --git a/pkg/metaState/rewardsClaimed/rewardsClaimed.go b/pkg/metaState/rewardsClaimed/rewardsClaimed.go new file mode 100644 index 00000000..bebd2451 --- /dev/null +++ b/pkg/metaState/rewardsClaimed/rewardsClaimed.go @@ -0,0 +1,135 @@ +package rewardsClaimed + +import ( + "encoding/json" + "fmt" + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/metaState/baseModel" + "github.com/Layr-Labs/sidecar/pkg/metaState/metaStateManager" + "github.com/Layr-Labs/sidecar/pkg/metaState/types" + "github.com/Layr-Labs/sidecar/pkg/storage" + "github.com/Layr-Labs/sidecar/pkg/utils" + "go.uber.org/zap" + "gorm.io/gorm" + "gorm.io/gorm/clause" + "strings" +) + +type RewardsClaimedModel struct { + db *gorm.DB + logger *zap.Logger + globalConfig *config.Config + + accumulatedState map[uint64][]*types.RewardsClaimed +} + +func NewRewardsClaimedModel( + db *gorm.DB, + logger *zap.Logger, + globalConfig *config.Config, + msm *metaStateManager.MetaStateManager, +) (*RewardsClaimedModel, error) { + model := &RewardsClaimedModel{ + db: db, + logger: logger, + globalConfig: globalConfig, + accumulatedState: make(map[uint64][]*types.RewardsClaimed), + } + msm.RegisterMetaStateModel(model) + return model, nil +} + +const RewardsClaimedModelName = "rewards_claimed" + +func (rcm *RewardsClaimedModel) ModelName() string { + return RewardsClaimedModelName +} + +func (rcm *RewardsClaimedModel) SetupStateForBlock(blockNumber uint64) error { + rcm.accumulatedState[blockNumber] = make([]*types.RewardsClaimed, 0) + return nil +} + +func (rcm *RewardsClaimedModel) CleanupProcessedStateForBlock(blockNumber uint64) error { + delete(rcm.accumulatedState, blockNumber) + return nil +} + +func (rcm *RewardsClaimedModel) getContractAddressesForEnvironment() map[string][]string { + contracts := rcm.globalConfig.GetContractsMapForChain() + return map[string][]string{ + contracts.RewardsCoordinator: { + "RewardsClaimed", + }, + } +} + +func (rcm *RewardsClaimedModel) IsInterestingLog(log *storage.TransactionLog) bool { + contracts := rcm.getContractAddressesForEnvironment() + return baseModel.IsInterestingLog(contracts, log) +} + +type LogOutput struct { + Root []byte `json:"root"` + Token string `json:"token"` + ClaimedAmount json.Number `json:"claimedAmount"` +} + +func (rcm *RewardsClaimedModel) HandleTransactionLog(log *storage.TransactionLog) (interface{}, error) { + arguments, err := baseModel.ParseLogArguments(log, rcm.logger) + if err != nil { + return nil, err + } + outputData, err := baseModel.ParseLogOutput[LogOutput](log, rcm.logger) + if err != nil { + return nil, err + } + + rootString := utils.ConvertBytesToString(outputData.Root) + + var recipient string + if arguments[3].Value != nil { + recipient = arguments[3].Value.(string) + } else { + recipient = "" + } + + claimed := &types.RewardsClaimed{ + Root: rootString, + Earner: strings.ToLower(arguments[1].Value.(string)), + Claimer: strings.ToLower(arguments[2].Value.(string)), + Recipient: recipient, + Token: outputData.Token, + ClaimedAmount: outputData.ClaimedAmount.String(), + BlockNumber: log.BlockNumber, + TransactionHash: log.TransactionHash, + LogIndex: log.LogIndex, + } + + rcm.accumulatedState[log.BlockNumber] = append(rcm.accumulatedState[log.BlockNumber], claimed) + return claimed, nil +} + +func (rcm *RewardsClaimedModel) CommitFinalState(blockNumber uint64) ([]interface{}, error) { + rowsToInsert, ok := rcm.accumulatedState[blockNumber] + if !ok { + return nil, fmt.Errorf("block number not initialized in accumulatedState %d", blockNumber) + } + + if len(rowsToInsert) == 0 { + rcm.logger.Sugar().Debugf("No rewards claimed to insert for block %d", blockNumber) + return nil, nil + } + + res := rcm.db.Model(&types.RewardsClaimed{}).Clauses(clause.Returning{}).Create(&rowsToInsert) + if res.Error != nil { + rcm.logger.Sugar().Errorw("Failed to insert rewards claimed records", zap.Error(res.Error)) + return nil, res.Error + } + + return baseModel.CastCommittedStateToInterface(rowsToInsert), nil +} + +func (rcm *RewardsClaimedModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { + return baseModel.DeleteState(rcm.ModelName(), startBlockNumber, endBlockNumber, rcm.db, rcm.logger) +} diff --git a/pkg/metaState/rewardsClaimed/rewardsClaimed_test.go b/pkg/metaState/rewardsClaimed/rewardsClaimed_test.go new file mode 100644 index 00000000..c42c9b4b --- /dev/null +++ b/pkg/metaState/rewardsClaimed/rewardsClaimed_test.go @@ -0,0 +1,169 @@ +package rewardsClaimed + +import ( + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/internal/logger" + "github.com/Layr-Labs/sidecar/internal/tests" + "github.com/Layr-Labs/sidecar/pkg/metaState/metaStateManager" + "github.com/Layr-Labs/sidecar/pkg/metaState/types" + "github.com/Layr-Labs/sidecar/pkg/postgres" + "github.com/Layr-Labs/sidecar/pkg/storage" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "gorm.io/gorm" + "os" + "testing" + "time" +) + +func setup() ( + string, + *gorm.DB, + *zap.Logger, + *config.Config, + error, +) { + cfg := config.NewConfig() + cfg.Chain = config.Chain_Mainnet + cfg.Debug = os.Getenv(config.Debug) == "true" + cfg.DatabaseConfig = *tests.GetDbConfigFromEnv() + + l, _ := logger.NewLogger(&logger.LoggerConfig{Debug: cfg.Debug}) + + dbname, _, grm, err := postgres.GetTestPostgresDatabase(cfg.DatabaseConfig, cfg, l) + if err != nil { + return dbname, nil, nil, nil, err + } + + return dbname, grm, l, cfg, nil +} + +func Test_RewardsClaimed(t *testing.T) { + dbName, grm, l, cfg, err := setup() + + if err != nil { + t.Fatal(err) + } + + msm := metaStateManager.NewMetaStateManager(grm, l, cfg) + + rewardsClaimedModel, err := NewRewardsClaimedModel(grm, l, cfg, msm) + assert.Nil(t, err) + + t.Run("Should insert a rewardsClaimed event with a null recipient", func(t *testing.T) { + block := &storage.Block{ + Number: 20535299, + Hash: "", + BlockTime: time.Time{}, + } + res := grm.Model(&storage.Block{}).Create(&block) + if res.Error != nil { + t.Fatal(res.Error) + } + log := &storage.TransactionLog{ + TransactionHash: "0x767e002f6f3a7942b22e38f2434ecd460fb2111b7ea584d16adb71692b856801", + TransactionIndex: 77, + Address: "0x7750d328b314effa365a0402ccfd489b80b0adda", + Arguments: `[{"Name": "root", "Type": "bytes32", "Value": "0x0000000000000000000000003449fe2810b0a5f6dffc62b8b6ee6b732dfe4438", "Indexed": false}, {"Name": "earner", "Type": "address", "Value": "0x3449fe2810b0a5f6dffc62b8b6ee6b732dfe4438", "Indexed": true}, {"Name": "claimer", "Type": "address", "Value": "0x3449fe2810b0a5f6dffc62b8b6ee6b732dfe4438", "Indexed": true}, {"Name": "recipient", "Type": "address", "Value": null, "Indexed": true}, {"Name": "token", "Type": "address", "Value": null, "Indexed": false}, {"Name": "claimedAmount", "Type": "uint256", "Value": null, "Indexed": false}]`, + EventName: "RewardsClaimed", + OutputData: `{"root": [200, 194, 94, 171, 12, 231, 185, 90, 53, 50, 87, 206, 179, 62, 194, 139, 92, 52, 159, 42, 165, 234, 249, 2, 180, 77, 155, 202, 81, 229, 100, 188], "token": "0x127500cd2030577f66d1b79600d30dcdba2ed32d", "claimedAmount": 306564275428435710000000}`, + LogIndex: 270, + BlockNumber: block.Number, + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + DeletedAt: time.Time{}, + } + + err := rewardsClaimedModel.SetupStateForBlock(block.Number) + assert.Nil(t, err) + + isInteresting := rewardsClaimedModel.IsInterestingLog(log) + assert.True(t, isInteresting) + + state, err := rewardsClaimedModel.HandleTransactionLog(log) + assert.Nil(t, err) + + typedState := state.(*types.RewardsClaimed) + assert.Equal(t, "0xc8c25eab0ce7b95a353257ceb33ec28b5c349f2aa5eaf902b44d9bca51e564bc", typedState.Root) + assert.Equal(t, "0x3449fe2810b0a5f6dffc62b8b6ee6b732dfe4438", typedState.Earner) + assert.Equal(t, "0x3449fe2810b0a5f6dffc62b8b6ee6b732dfe4438", typedState.Claimer) + assert.Equal(t, "", typedState.Recipient) + assert.Equal(t, "0x127500cd2030577f66d1b79600d30dcdba2ed32d", typedState.Token) + assert.Equal(t, "306564275428435710000000", typedState.ClaimedAmount) + assert.Equal(t, block.Number, typedState.BlockNumber) + assert.Equal(t, log.TransactionHash, typedState.TransactionHash) + assert.Equal(t, log.LogIndex, typedState.LogIndex) + + _, err = rewardsClaimedModel.CommitFinalState(block.Number) + assert.Nil(t, err) + + // Check if the rewardsClaimed event was inserted + var rewardsClaimed types.RewardsClaimed + res = grm.Model(&types.RewardsClaimed{}).Where("block_number = ?", block.Number).First(&rewardsClaimed) + assert.Nil(t, res.Error) + + err = rewardsClaimedModel.CleanupProcessedStateForBlock(block.Number) + assert.Nil(t, err) + + }) + + t.Run("Should insert a rewardsClaimed event with a not null", func(t *testing.T) { + block := &storage.Block{ + Number: 20535362, + Hash: "", + BlockTime: time.Time{}, + } + res := grm.Model(&storage.Block{}).Create(&block) + if res.Error != nil { + t.Fatal(res.Error) + } + log := &storage.TransactionLog{ + TransactionHash: "0x767e002f6f3a7942b22e38f2434ecd460fb2111b7ea584d16adb71692b856801", + TransactionIndex: 42, + Address: "0x7750d328b314effa365a0402ccfd489b80b0adda", + Arguments: `[{"Name": "root", "Type": "bytes32", "Value": null, "Indexed": false}, {"Name": "earner", "Type": "address", "Value": "0x769e73da377876dd688b23d51ed01b7c7b154c65", "Indexed": true}, {"Name": "claimer", "Type": "address", "Value": "0x769e73da377876dd688b23d51ed01b7c7b154c65", "Indexed": true}, {"Name": "recipient", "Type": "address", "Value": "0x769e73da377876dd688b23d51ed01b7c7b154c65", "Indexed": true}, {"Name": "token", "Type": "address", "Value": null, "Indexed": false}, {"Name": "claimedAmount", "Type": "uint256", "Value": null, "Indexed": false}]`, + EventName: "RewardsClaimed", + OutputData: `{"root": [200, 194, 94, 171, 12, 231, 185, 90, 53, 50, 87, 206, 179, 62, 194, 139, 92, 52, 159, 42, 165, 234, 249, 2, 180, 77, 155, 202, 81, 229, 100, 188], "token": "0x127500cd2030577f66d1b79600d30dcdba2ed32d", "claimedAmount": 134162726422194540000000}`, + LogIndex: 200, + BlockNumber: block.Number, + CreatedAt: time.Time{}, + UpdatedAt: time.Time{}, + DeletedAt: time.Time{}, + } + + err := rewardsClaimedModel.SetupStateForBlock(block.Number) + assert.Nil(t, err) + + isInteresting := rewardsClaimedModel.IsInterestingLog(log) + assert.True(t, isInteresting) + + state, err := rewardsClaimedModel.HandleTransactionLog(log) + assert.Nil(t, err) + + typedState := state.(*types.RewardsClaimed) + assert.Equal(t, "0xc8c25eab0ce7b95a353257ceb33ec28b5c349f2aa5eaf902b44d9bca51e564bc", typedState.Root) + assert.Equal(t, "0x769e73da377876dd688b23d51ed01b7c7b154c65", typedState.Earner) + assert.Equal(t, "0x769e73da377876dd688b23d51ed01b7c7b154c65", typedState.Claimer) + assert.Equal(t, "0x769e73da377876dd688b23d51ed01b7c7b154c65", typedState.Recipient) + assert.Equal(t, "0x127500cd2030577f66d1b79600d30dcdba2ed32d", typedState.Token) + assert.Equal(t, "134162726422194540000000", typedState.ClaimedAmount) + assert.Equal(t, block.Number, typedState.BlockNumber) + assert.Equal(t, log.TransactionHash, typedState.TransactionHash) + assert.Equal(t, log.LogIndex, typedState.LogIndex) + + _, err = rewardsClaimedModel.CommitFinalState(block.Number) + assert.Nil(t, err) + + // Check if the rewardsClaimed event was inserted + var rewardsClaimed types.RewardsClaimed + res = grm.Model(&types.RewardsClaimed{}).Where("block_number = ?", block.Number).First(&rewardsClaimed) + assert.Nil(t, res.Error) + + err = rewardsClaimedModel.CleanupProcessedStateForBlock(block.Number) + assert.Nil(t, err) + }) + + t.Cleanup(func() { + postgres.TeardownTestDatabase(dbName, cfg, grm, l) + }) +} diff --git a/pkg/metaState/types/tables.go b/pkg/metaState/types/tables.go new file mode 100644 index 00000000..717fbead --- /dev/null +++ b/pkg/metaState/types/tables.go @@ -0,0 +1,17 @@ +package types + +type RewardsClaimed struct { + Root string + Earner string + Claimer string + Recipient string + Token string + ClaimedAmount string + TransactionHash string + BlockNumber uint64 + LogIndex uint64 +} + +func (*RewardsClaimed) TableName() string { + return "rewards_claimed" +} diff --git a/pkg/metaState/types/types.go b/pkg/metaState/types/types.go new file mode 100644 index 00000000..35c2d71d --- /dev/null +++ b/pkg/metaState/types/types.go @@ -0,0 +1,19 @@ +package types + +import "github.com/Layr-Labs/sidecar/pkg/storage" + +type IMetaStateModel interface { + ModelName() string + + SetupStateForBlock(blockNumber uint64) error + + CleanupProcessedStateForBlock(blockNumber uint64) error + + IsInterestingLog(log *storage.TransactionLog) bool + + HandleTransactionLog(log *storage.TransactionLog) (interface{}, error) + + CommitFinalState(blockNumber uint64) ([]interface{}, error) + + DeleteState(startBlockNumber uint64, endBlockNumber uint64) error +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index c5d32117..4f223444 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -9,6 +9,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/eventBus/eventBusTypes" "github.com/Layr-Labs/sidecar/pkg/fetcher" "github.com/Layr-Labs/sidecar/pkg/indexer" + "github.com/Layr-Labs/sidecar/pkg/metaState/metaStateManager" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewardsCalculatorQueue" "github.com/Layr-Labs/sidecar/pkg/storage" @@ -27,6 +28,7 @@ type Pipeline struct { BlockStore storage.BlockStore Logger *zap.Logger stateManager *stateManager.EigenStateManager + metaStateManager *metaStateManager.MetaStateManager rewardsCalculator *rewards.RewardsCalculator rcq *rewardsCalculatorQueue.RewardsCalculatorQueue globalConfig *config.Config @@ -39,6 +41,7 @@ func NewPipeline( i *indexer.Indexer, bs storage.BlockStore, sm *stateManager.EigenStateManager, + msm *metaStateManager.MetaStateManager, rc *rewards.RewardsCalculator, rcq *rewardsCalculatorQueue.RewardsCalculatorQueue, gc *config.Config, @@ -51,6 +54,7 @@ func NewPipeline( Indexer: i, Logger: l, stateManager: sm, + metaStateManager: msm, rewardsCalculator: rc, rcq: rcq, BlockStore: bs, @@ -103,6 +107,10 @@ func (p *Pipeline) RunForFetchedBlock(ctx context.Context, block *fetcher.Fetche p.Logger.Sugar().Errorw("Failed to init processing for block", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) return err } + if err := p.metaStateManager.InitProcessingForBlock(blockNumber); err != nil { + p.Logger.Sugar().Errorw("MetaStateManager: Failed to init processing for block", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) + return err + } p.Logger.Sugar().Debugw("Initialized processing for block", zap.Uint64("blockNumber", blockNumber)) p.Logger.Sugar().Debugw("Handling parsed transactions", zap.Int("count", len(parsedTransactions)), zap.Uint64("blockNumber", blockNumber)) @@ -163,6 +171,16 @@ func (p *Pipeline) RunForFetchedBlock(ctx context.Context, block *fetcher.Fetche ) return err } + + if err := p.metaStateManager.HandleTransactionLog(indexedLog); err != nil { + p.Logger.Sugar().Errorw("MetaStateManager: Failed to handle log state change", + zap.Uint64("blockNumber", blockNumber), + zap.String("transactionHash", pt.Transaction.Hash.Value()), + zap.Uint64("logIndex", log.LogIndex), + zap.Error(err), + ) + return err + } } p.Logger.Sugar().Debugw("Handled log state changes", zap.Uint64("blockNumber", blockNumber), @@ -189,6 +207,11 @@ func (p *Pipeline) RunForFetchedBlock(ctx context.Context, block *fetcher.Fetche p.Logger.Sugar().Errorw("Failed to commit final state", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) return err } + _, err = p.metaStateManager.CommitFinalState(blockNumber) + if err != nil { + p.Logger.Sugar().Errorw("MetaStateManager: Failed to commit final state", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) + return err + } p.Logger.Sugar().Debugw("Committed final state", zap.Uint64("blockNumber", blockNumber), zap.Duration("indexTime", time.Since(blockFetchTime))) p.Logger.Sugar().Debugw("Checking for rewards to validate", zap.Uint64("blockNumber", blockNumber)) @@ -325,6 +348,7 @@ func (p *Pipeline) RunForFetchedBlock(ctx context.Context, block *fetcher.Fetche // Push cleanup to the background since it doesnt need to be blocking go func() { _ = p.stateManager.CleanupProcessedStateForBlock(blockNumber) + _ = p.metaStateManager.CleanupProcessedStateForBlock(blockNumber) }() return err diff --git a/pkg/pipeline/pipelineIntegration_test.go b/pkg/pipeline/pipelineIntegration_test.go index 2f16c389..932a6d8c 100644 --- a/pkg/pipeline/pipelineIntegration_test.go +++ b/pkg/pipeline/pipelineIntegration_test.go @@ -17,6 +17,7 @@ import ( "github.com/Layr-Labs/sidecar/pkg/eventBus" "github.com/Layr-Labs/sidecar/pkg/fetcher" "github.com/Layr-Labs/sidecar/pkg/indexer" + "github.com/Layr-Labs/sidecar/pkg/metaState/metaStateManager" "github.com/Layr-Labs/sidecar/pkg/postgres" "github.com/Layr-Labs/sidecar/pkg/rewards" "github.com/Layr-Labs/sidecar/pkg/rewards/stakerOperators" @@ -40,6 +41,7 @@ func setup(ethConfig *ethereum.EthereumClientConfig) ( *indexer.Indexer, storage.BlockStore, *stateManager.EigenStateManager, + *metaStateManager.MetaStateManager, *rewards.RewardsCalculator, *rewardsCalculatorQueue.RewardsCalculatorQueue, *config.Config, @@ -89,6 +91,7 @@ func setup(ethConfig *ethereum.EthereumClientConfig) ( mds := pgStorage.NewPostgresBlockStore(grm, l, cfg) sm := stateManager.NewEigenStateManager(l, grm) + msm := metaStateManager.NewMetaStateManager(grm, l, cfg) if err := eigenState.LoadEigenStateModels(sm, grm, l, cfg); err != nil { l.Sugar().Fatalw("Failed to load eigen state models", zap.Error(err)) @@ -106,7 +109,7 @@ func setup(ethConfig *ethereum.EthereumClientConfig) ( eb := eventBus.NewEventBus(l) - return fetchr, idxr, mds, sm, rc, rcq, cfg, l, sdc, grm, eb, dbname + return fetchr, idxr, mds, sm, msm, rc, rcq, cfg, l, sdc, grm, eb, dbname } @@ -114,10 +117,10 @@ func Test_PipelineIntegration(t *testing.T) { t.Run("Should index a block, transaction with logs using native batched ethereum client", func(t *testing.T) { ethConfig := ethereum.DefaultNativeCallEthereumClientConfig() - fetchr, idxr, mds, sm, rc, rcq, cfg, l, sdc, grm, eb, dbName := setup(ethConfig) + fetchr, idxr, mds, sm, msm, rc, rcq, cfg, l, sdc, grm, eb, dbName := setup(ethConfig) blockNumber := uint64(20386320) - p := NewPipeline(fetchr, idxr, mds, sm, rc, rcq, cfg, sdc, eb, l) + p := NewPipeline(fetchr, idxr, mds, sm, msm, rc, rcq, cfg, sdc, eb, l) err := p.RunForBlockBatch(context.Background(), blockNumber, blockNumber+1, true) assert.Nil(t, err) @@ -143,10 +146,10 @@ func Test_PipelineIntegration(t *testing.T) { }) t.Run("Should index a block, transaction with logs using chunked ethereum client", func(t *testing.T) { ethConfig := ethereum.DefaultChunkedCallEthereumClientConfig() - fetchr, idxr, mds, sm, rc, rcq, cfg, l, sdc, grm, eb, dbName := setup(ethConfig) + fetchr, idxr, mds, sm, msm, rc, rcq, cfg, l, sdc, grm, eb, dbName := setup(ethConfig) blockNumber := uint64(20386320) - p := NewPipeline(fetchr, idxr, mds, sm, rc, rcq, cfg, sdc, eb, l) + p := NewPipeline(fetchr, idxr, mds, sm, msm, rc, rcq, cfg, sdc, eb, l) err := p.RunForBlockBatch(context.Background(), blockNumber, blockNumber+1, true) assert.Nil(t, err) diff --git a/pkg/postgres/migrations/202501151039_rewardsClaimed/up.go b/pkg/postgres/migrations/202501151039_rewardsClaimed/up.go new file mode 100644 index 00000000..86335c2e --- /dev/null +++ b/pkg/postgres/migrations/202501151039_rewardsClaimed/up.go @@ -0,0 +1,61 @@ +package _202501151039_rewardsClaimed + +import ( + "database/sql" + "github.com/Layr-Labs/sidecar/internal/config" + "gorm.io/gorm" +) + +type Migration struct { +} + +func (m *Migration) Up(db *sql.DB, grm *gorm.DB, cfg *config.Config) error { + query := ` + CREATE TABLE IF NOT EXISTS rewards_claimed ( + root varchar not null, + earner varchar not null, + claimer varchar not null, + recipient varchar default null, + token varchar not null, + claimed_amount numeric not null, + transaction_hash varchar not null, + block_number bigint not null, + log_index bigint not null, + unique(transaction_hash, log_index), + foreign key (block_number) references blocks(number) on delete cascade + ); + ` + res := grm.Exec(query) + if res.Error != nil { + return res.Error + } + query = ` + insert into rewards_claimed (root, token, claimed_amount, earner, claimer, recipient, transaction_hash, block_number, log_index) + select + concat('0x', ( + SELECT lower(string_agg(lpad(to_hex(elem::int), 2, '0'), '')) + FROM jsonb_array_elements_text(tl.output_data->'root') AS elem + )) AS root, + lower(tl.output_data->>'token'::text) as token, + cast(tl.output_data->>'claimedAmount' as numeric) as claimed_amount, + lower(tl.arguments #>> '{1, Value}') as earner, + lower(tl.arguments #>> '{2, Value}') as claimer, + lower(coalesce(tl.arguments #>> '{3, Value}', '')) as recipient, + tl.transaction_hash, + tl.block_number, + tl.log_index + from transaction_logs as tl + where + tl.address = @rewardsCoordinatorAddress + and tl.event_name = 'RewardsClaimed' + order by tl.block_number asc + on conflict do nothing + ` + contractAddresses := cfg.GetContractsMapForChain() + res = grm.Exec(query, sql.Named("rewardsCoordinatorAddress", contractAddresses.RewardsCoordinator)) + return res.Error +} + +func (m *Migration) GetName() string { + return "202501151039_rewardsClaimed" +} diff --git a/pkg/postgres/migrations/migrator.go b/pkg/postgres/migrations/migrator.go index 7ef3cbf8..5c92558c 100644 --- a/pkg/postgres/migrations/migrator.go +++ b/pkg/postgres/migrations/migrator.go @@ -37,6 +37,7 @@ import ( _202412021311_stakerOperatorTables "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202412021311_stakerOperatorTables" _202412061553_addBlockNumberIndexes "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202412061553_addBlockNumberIndexes" _202412061626_operatorRestakedStrategiesConstraint "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202412061626_operatorRestakedStrategiesConstraint" + _202501151039_rewardsClaimed "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202501151039_rewardsClaimed" "go.uber.org/zap" "gorm.io/gorm" "time" @@ -113,6 +114,7 @@ func (m *Migrator) MigrateAll() error { &_202412021311_stakerOperatorTables.Migration{}, &_202412061553_addBlockNumberIndexes.Migration{}, &_202412061626_operatorRestakedStrategiesConstraint.Migration{}, + &_202501151039_rewardsClaimed.Migration{}, } for _, migration := range migrations { diff --git a/pkg/sidecar/sidecar.go b/pkg/sidecar/sidecar.go index 4c75cfbf..f8ae5223 100644 --- a/pkg/sidecar/sidecar.go +++ b/pkg/sidecar/sidecar.go @@ -5,6 +5,7 @@ import ( "github.com/Layr-Labs/sidecar/internal/config" "github.com/Layr-Labs/sidecar/pkg/clients/ethereum" "github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager" + "github.com/Layr-Labs/sidecar/pkg/metaState/metaStateManager" "github.com/Layr-Labs/sidecar/pkg/pipeline" "github.com/Layr-Labs/sidecar/pkg/proofs" "github.com/Layr-Labs/sidecar/pkg/rewards" @@ -26,6 +27,7 @@ type Sidecar struct { Pipeline *pipeline.Pipeline EthereumClient *ethereum.Client StateManager *stateManager.EigenStateManager + MetaStateManager *metaStateManager.MetaStateManager RewardsCalculator *rewards.RewardsCalculator RewardsCalculatorQueue *rewardsCalculatorQueue.RewardsCalculatorQueue RewardProofs *proofs.RewardsProofsStore @@ -39,6 +41,7 @@ func NewSidecar( s storage.BlockStore, p *pipeline.Pipeline, em *stateManager.EigenStateManager, + msm *metaStateManager.MetaStateManager, rc *rewards.RewardsCalculator, rcq *rewardsCalculatorQueue.RewardsCalculatorQueue, rp *proofs.RewardsProofsStore, @@ -57,6 +60,7 @@ func NewSidecar( RewardsCalculator: rc, RewardsCalculatorQueue: rcq, RewardProofs: rp, + MetaStateManager: msm, StateManager: em, ShutdownChan: make(chan bool), shouldShutdown: shouldShutdown,