From aa501de7f2874059d28839d0ff3150f106ae051e Mon Sep 17 00:00:00 2001 From: Sean McGary Date: Sun, 8 Sep 2024 13:35:56 -0500 Subject: [PATCH] Update state manager --- cmd/sidecar/main.go | 2 +- .../eigenState/stateManager/stateManager.go | 4 +- .../stateManager/stateManager_test.go | 5 +- internal/pipeline/pipeline.go | 80 +++++++++++++------ 4 files changed, 60 insertions(+), 31 deletions(-) diff --git a/cmd/sidecar/main.go b/cmd/sidecar/main.go index 7a3f90ee..74624597 100644 --- a/cmd/sidecar/main.go +++ b/cmd/sidecar/main.go @@ -72,7 +72,7 @@ func main() { idxr := indexer.NewIndexer(mds, contractStore, etherscanClient, cm, client, fetchr, l, cfg) - p := pipeline.NewPipeline(fetchr, idxr, mds, l) + p := pipeline.NewPipeline(fetchr, idxr, mds, sm, l) // Create new sidecar instance sidecar := sidecar.NewSidecar(&sidecar.SidecarConfig{ diff --git a/internal/eigenState/stateManager/stateManager.go b/internal/eigenState/stateManager/stateManager.go index a7bbc648..130d7c28 100644 --- a/internal/eigenState/stateManager/stateManager.go +++ b/internal/eigenState/stateManager/stateManager.go @@ -110,12 +110,12 @@ func (e *EigenStateManager) GenerateStateRoot(blockNumber uint64) (types.StateRo func (e *EigenStateManager) WriteStateRoot( blockNumber uint64, blockHash string, - stateroot string, + stateroot types.StateRoot, ) (*StateRoot, error) { root := &StateRoot{ EthBlockNumber: blockNumber, EthBlockHash: blockHash, - StateRoot: stateroot, + StateRoot: string(stateroot), } result := e.Db.Model(&StateRoot{}).Clauses(clause.Returning{}).Create(&root) diff --git a/internal/eigenState/stateManager/stateManager_test.go b/internal/eigenState/stateManager/stateManager_test.go index 842bb082..4d65a7da 100644 --- a/internal/eigenState/stateManager/stateManager_test.go +++ b/internal/eigenState/stateManager/stateManager_test.go @@ -2,6 +2,7 @@ package stateManager import ( "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/internal/eigenState/types" "github.com/Layr-Labs/sidecar/internal/logger" "github.com/Layr-Labs/sidecar/internal/sqlite/migrations" "github.com/Layr-Labs/sidecar/internal/tests" @@ -58,13 +59,13 @@ func Test_StateManager(t *testing.T) { blockNumber := uint64(200) blockHash := "0x123" - stateRoot := "0x456" + stateRoot := types.StateRoot("0x456") root, err := esm.WriteStateRoot(blockNumber, blockHash, stateRoot) assert.Nil(t, err) assert.Equal(t, blockNumber, root.EthBlockNumber) assert.Equal(t, blockHash, root.EthBlockHash) - assert.Equal(t, stateRoot, root.StateRoot) + assert.Equal(t, string(stateRoot), root.StateRoot) insertedStateRoots = append(insertedStateRoots, root) }) t.Run("Should read a state root from the db", func(t *testing.T) { diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 06d16a98..516ca0b2 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -2,26 +2,34 @@ package pipeline import ( "context" + "github.com/Layr-Labs/sidecar/internal/eigenState/stateManager" "github.com/Layr-Labs/sidecar/internal/fetcher" "github.com/Layr-Labs/sidecar/internal/indexer" "github.com/Layr-Labs/sidecar/internal/storage" "go.uber.org/zap" - "gorm.io/gorm" ) type Pipeline struct { - Fetcher *fetcher.Fetcher - Indexer *indexer.Indexer - BlockStore storage.BlockStore - Logger *zap.Logger + Fetcher *fetcher.Fetcher + Indexer *indexer.Indexer + BlockStore storage.BlockStore + Logger *zap.Logger + stateManager *stateManager.EigenStateManager } -func NewPipeline(f *fetcher.Fetcher, i *indexer.Indexer, bs storage.BlockStore, l *zap.Logger) *Pipeline { +func NewPipeline( + f *fetcher.Fetcher, + i *indexer.Indexer, + bs storage.BlockStore, + sm *stateManager.EigenStateManager, + l *zap.Logger, +) *Pipeline { return &Pipeline{ - Fetcher: f, - Indexer: i, - Logger: l, - BlockStore: bs, + Fetcher: f, + Indexer: i, + Logger: l, + stateManager: sm, + BlockStore: bs, } } @@ -51,6 +59,11 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error { p.Logger.Sugar().Infow("Block already indexed", zap.Uint64("blockNumber", blockNumber)) } + if err := p.stateManager.InitProcessingForBlock(blockNumber); err != nil { + p.Logger.Sugar().Errorw("Failed to init processing for block", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) + return err + } + // Parse all transactions and logs for the block. // - If a transaction is not calling to a contract, it is ignored // - If a transaction has 0 interesting logs and itself is not interesting, it is ignored @@ -79,7 +92,7 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error { p.Logger.Debug("Indexed transaction", zap.Uint64("blockNumber", blockNumber), zap.String("transactionHash", indexedTransaction.TransactionHash)) for _, log := range pt.Logs { - _, err := p.Indexer.IndexLog( + indexedLog, err := p.Indexer.IndexLog( ctx, indexedBlock.Number, indexedTransaction.TransactionHash, @@ -100,6 +113,21 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error { zap.String("transactionHash", indexedTransaction.TransactionHash), zap.Uint64("logIndex", log.LogIndex), ) + + p.Logger.Sugar().Debugw("Handling log state change", + zap.Uint64("blockNumber", blockNumber), + zap.String("transactionHash", pt.Transaction.Hash.Value()), + zap.Uint64("logIndex", log.LogIndex), + ) + if err := p.stateManager.HandleLogStateChange(indexedLog); err != nil { + p.Logger.Sugar().Errorw("Failed to handle log state change", + zap.Uint64("blockNumber", blockNumber), + zap.String("transactionHash", pt.Transaction.Hash.Value()), + zap.Uint64("logIndex", log.LogIndex), + zap.Error(err), + ) + return err + } } // Check the logs for any contract upgrades. upgradedLogs := p.Indexer.FindContractUpgradedLogs(pt.Logs) @@ -122,23 +150,23 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error { } p.Indexer.FindAndHandleContractCreationForTransactions(interestingTransactions, block.TxReceipts, block.ContractStorage, blockNumber) - // if err := p.CloneAggregatedStateTablesFromPreviousBlock(blockNumber); err != nil { - // p.Logger.Sugar().Errorw("Failed to clone aggregated state tables", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) - // return err - // } - // if err := p.GenerateStateTransactionsFromLogs(blockNumber, nil, nil); err != nil { - // p.Logger.Sugar().Errorw("Failed to generate state transactions from logs", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) - // return err - // } - - return nil -} + if err := p.stateManager.CommitFinalState(blockNumber); err != nil { + p.Logger.Sugar().Errorw("Failed to commit final state", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) + return err + } -func (p *Pipeline) CloneAggregatedStateTablesFromPreviousBlock(currentBlock uint64) error { - return nil -} + stateRoot, err := p.stateManager.GenerateStateRoot(blockNumber) + if err != nil { + p.Logger.Sugar().Errorw("Failed to generate state root", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) + return err + } -func (p *Pipeline) GenerateStateTransactionsFromLogs(currentBlock uint64, db *gorm.DB, tx *gorm.DB) error { + if sr, err := p.stateManager.WriteStateRoot(blockNumber, block.Block.Hash.Value(), stateRoot); err != nil { + p.Logger.Sugar().Errorw("Failed to write state root", zap.Uint64("blockNumber", blockNumber), zap.Error(err)) + return err + } else { + p.Logger.Sugar().Infow("Wrote state root", zap.Uint64("blockNumber", blockNumber), zap.Any("stateRoot", sr)) + } return nil }