Skip to content

Commit

Permalink
Update state manager
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Sep 8, 2024
1 parent 5c52762 commit aa501de
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cmd/sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {

idxr := indexer.NewIndexer(mds, contractStore, etherscanClient, cm, client, fetchr, l, cfg)

p := pipeline.NewPipeline(fetchr, idxr, mds, l)
p := pipeline.NewPipeline(fetchr, idxr, mds, sm, l)

// Create new sidecar instance
sidecar := sidecar.NewSidecar(&sidecar.SidecarConfig{
Expand Down
4 changes: 2 additions & 2 deletions internal/eigenState/stateManager/stateManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ func (e *EigenStateManager) GenerateStateRoot(blockNumber uint64) (types.StateRo
func (e *EigenStateManager) WriteStateRoot(
blockNumber uint64,
blockHash string,
stateroot string,
stateroot types.StateRoot,
) (*StateRoot, error) {
root := &StateRoot{
EthBlockNumber: blockNumber,
EthBlockHash: blockHash,
StateRoot: stateroot,
StateRoot: string(stateroot),
}

result := e.Db.Model(&StateRoot{}).Clauses(clause.Returning{}).Create(&root)
Expand Down
5 changes: 3 additions & 2 deletions internal/eigenState/stateManager/stateManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stateManager

import (
"github.com/Layr-Labs/sidecar/internal/config"
"github.com/Layr-Labs/sidecar/internal/eigenState/types"
"github.com/Layr-Labs/sidecar/internal/logger"
"github.com/Layr-Labs/sidecar/internal/sqlite/migrations"
"github.com/Layr-Labs/sidecar/internal/tests"
Expand Down Expand Up @@ -58,13 +59,13 @@ func Test_StateManager(t *testing.T) {

blockNumber := uint64(200)
blockHash := "0x123"
stateRoot := "0x456"
stateRoot := types.StateRoot("0x456")

root, err := esm.WriteStateRoot(blockNumber, blockHash, stateRoot)
assert.Nil(t, err)
assert.Equal(t, blockNumber, root.EthBlockNumber)
assert.Equal(t, blockHash, root.EthBlockHash)
assert.Equal(t, stateRoot, root.StateRoot)
assert.Equal(t, string(stateRoot), root.StateRoot)
insertedStateRoots = append(insertedStateRoots, root)
})
t.Run("Should read a state root from the db", func(t *testing.T) {
Expand Down
80 changes: 54 additions & 26 deletions internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,34 @@ package pipeline

import (
"context"
"github.com/Layr-Labs/sidecar/internal/eigenState/stateManager"
"github.com/Layr-Labs/sidecar/internal/fetcher"
"github.com/Layr-Labs/sidecar/internal/indexer"
"github.com/Layr-Labs/sidecar/internal/storage"
"go.uber.org/zap"
"gorm.io/gorm"
)

type Pipeline struct {
Fetcher *fetcher.Fetcher
Indexer *indexer.Indexer
BlockStore storage.BlockStore
Logger *zap.Logger
Fetcher *fetcher.Fetcher
Indexer *indexer.Indexer
BlockStore storage.BlockStore
Logger *zap.Logger
stateManager *stateManager.EigenStateManager
}

func NewPipeline(f *fetcher.Fetcher, i *indexer.Indexer, bs storage.BlockStore, l *zap.Logger) *Pipeline {
func NewPipeline(
f *fetcher.Fetcher,
i *indexer.Indexer,
bs storage.BlockStore,
sm *stateManager.EigenStateManager,
l *zap.Logger,
) *Pipeline {
return &Pipeline{
Fetcher: f,
Indexer: i,
Logger: l,
BlockStore: bs,
Fetcher: f,
Indexer: i,
Logger: l,
stateManager: sm,
BlockStore: bs,
}
}

Expand Down Expand Up @@ -51,6 +59,11 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error {
p.Logger.Sugar().Infow("Block already indexed", zap.Uint64("blockNumber", blockNumber))
}

if err := p.stateManager.InitProcessingForBlock(blockNumber); err != nil {
p.Logger.Sugar().Errorw("Failed to init processing for block", zap.Uint64("blockNumber", blockNumber), zap.Error(err))
return err
}

// Parse all transactions and logs for the block.
// - If a transaction is not calling to a contract, it is ignored
// - If a transaction has 0 interesting logs and itself is not interesting, it is ignored
Expand Down Expand Up @@ -79,7 +92,7 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error {
p.Logger.Debug("Indexed transaction", zap.Uint64("blockNumber", blockNumber), zap.String("transactionHash", indexedTransaction.TransactionHash))

for _, log := range pt.Logs {
_, err := p.Indexer.IndexLog(
indexedLog, err := p.Indexer.IndexLog(
ctx,
indexedBlock.Number,
indexedTransaction.TransactionHash,
Expand All @@ -100,6 +113,21 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error {
zap.String("transactionHash", indexedTransaction.TransactionHash),
zap.Uint64("logIndex", log.LogIndex),
)

p.Logger.Sugar().Debugw("Handling log state change",
zap.Uint64("blockNumber", blockNumber),
zap.String("transactionHash", pt.Transaction.Hash.Value()),
zap.Uint64("logIndex", log.LogIndex),
)
if err := p.stateManager.HandleLogStateChange(indexedLog); err != nil {
p.Logger.Sugar().Errorw("Failed to handle log state change",
zap.Uint64("blockNumber", blockNumber),
zap.String("transactionHash", pt.Transaction.Hash.Value()),
zap.Uint64("logIndex", log.LogIndex),
zap.Error(err),
)
return err
}
}
// Check the logs for any contract upgrades.
upgradedLogs := p.Indexer.FindContractUpgradedLogs(pt.Logs)
Expand All @@ -122,23 +150,23 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error {
}
p.Indexer.FindAndHandleContractCreationForTransactions(interestingTransactions, block.TxReceipts, block.ContractStorage, blockNumber)

// if err := p.CloneAggregatedStateTablesFromPreviousBlock(blockNumber); err != nil {
// p.Logger.Sugar().Errorw("Failed to clone aggregated state tables", zap.Uint64("blockNumber", blockNumber), zap.Error(err))
// return err
// }
// if err := p.GenerateStateTransactionsFromLogs(blockNumber, nil, nil); err != nil {
// p.Logger.Sugar().Errorw("Failed to generate state transactions from logs", zap.Uint64("blockNumber", blockNumber), zap.Error(err))
// return err
// }

return nil
}
if err := p.stateManager.CommitFinalState(blockNumber); err != nil {
p.Logger.Sugar().Errorw("Failed to commit final state", zap.Uint64("blockNumber", blockNumber), zap.Error(err))
return err
}

func (p *Pipeline) CloneAggregatedStateTablesFromPreviousBlock(currentBlock uint64) error {
return nil
}
stateRoot, err := p.stateManager.GenerateStateRoot(blockNumber)
if err != nil {
p.Logger.Sugar().Errorw("Failed to generate state root", zap.Uint64("blockNumber", blockNumber), zap.Error(err))
return err
}

func (p *Pipeline) GenerateStateTransactionsFromLogs(currentBlock uint64, db *gorm.DB, tx *gorm.DB) error {
if sr, err := p.stateManager.WriteStateRoot(blockNumber, block.Block.Hash.Value(), stateRoot); err != nil {
p.Logger.Sugar().Errorw("Failed to write state root", zap.Uint64("blockNumber", blockNumber), zap.Error(err))
return err
} else {
p.Logger.Sugar().Infow("Wrote state root", zap.Uint64("blockNumber", blockNumber), zap.Any("stateRoot", sr))
}

return nil
}

0 comments on commit aa501de

Please sign in to comment.