Skip to content

Commit

Permalink
Handle potentially corrupt data after bad shutdown
Browse files Browse the repository at this point in the history
On startup:
* Pull the block number of the last stateroot
* If that block number is less than the latest block in the blocks table, delete everything > stateroot block
* Start indexing from stateroot block + 1

Closes #19
  • Loading branch information
seanmcgary committed Sep 9, 2024
1 parent 26c4cc1 commit d0355cf
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cmd/sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func main() {
// Create new sidecar instance
sidecar := sidecar.NewSidecar(&sidecar.SidecarConfig{
GenesisBlockNumber: cfg.GetGenesisBlockNumber(),
}, cfg, mds, p, l, client)
}, cfg, mds, p, sm, l, client)

// RPC channel to notify the RPC server to shutdown gracefully
rpcChannel := make(chan bool)
Expand Down
4 changes: 4 additions & 0 deletions internal/eigenState/avsOperators/avsOperators.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ func (a *AvsOperatorsModel) merkelizeState(blockNumber uint64, avsOperators []Re
)
}

func (a *AvsOperatorsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return a.BaseEigenState.DeleteState("registered_avs_operators", startBlockNumber, endBlockNumber, a.Db)
}

func encodeOperatorLeaf(operator string, registered bool) []byte {
return []byte(fmt.Sprintf("%s:%t", operator, registered))
}
Expand Down
30 changes: 30 additions & 0 deletions internal/eigenState/base/baseEigenState.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package base

import (
"database/sql"
"encoding/json"
"fmt"
"github.com/Layr-Labs/go-sidecar/internal/parser"
"github.com/Layr-Labs/go-sidecar/internal/storage"
"go.uber.org/zap"
"gorm.io/gorm"
"slices"
"strings"
)
Expand Down Expand Up @@ -61,3 +63,31 @@ func (b *BaseEigenState) IsInterestingLog(contractsEvents map[string][]string, l
}
return false
}

func (b *BaseEigenState) DeleteState(tableName string, startBlockNumber uint64, endBlockNumber uint64, db *gorm.DB) error {
if endBlockNumber != 0 && endBlockNumber < startBlockNumber {
b.Logger.Sugar().Errorw("Invalid block range",
zap.Uint64("startBlockNumber", startBlockNumber),
zap.Uint64("endBlockNumber", endBlockNumber),
)
return fmt.Errorf("Invalid block range; endBlockNumber must be greater than or equal to startBlockNumber")
}

// tokenizing the table name apparently doesnt work, so we need to use Sprintf to include it.
query := fmt.Sprintf(`
delete from %s
where block_number >= @startBlockNumber
`, tableName)
if endBlockNumber > 0 {
query += " and block_number <= @endBlockNumber"
}
res := db.Exec(query,
sql.Named("tableName", tableName),
sql.Named("startBlockNumber", startBlockNumber),
sql.Named("endBlockNumber", endBlockNumber))
if res.Error != nil {
b.Logger.Sugar().Errorw("Failed to delete state", zap.Error(res.Error))
return res.Error
}
return nil
}
4 changes: 4 additions & 0 deletions internal/eigenState/operatorShares/operatorShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,7 @@ func encodeStratTree(strategy string, operatorTreeRoot []byte) []byte {
strategyBytes := []byte(strategy)
return append(strategyBytes, operatorTreeRoot[:]...)
}

func (osm *OperatorSharesModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return osm.BaseEigenState.DeleteState("operator_shares", startBlockNumber, endBlockNumber, osm.Db)
}
4 changes: 4 additions & 0 deletions internal/eigenState/stakerDelegations/stakerDelegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,7 @@ func encodeStakerLeaf(staker string, delegated bool) []byte {
func encodeOperatorLeaf(operator string, operatorStakersRoot []byte) []byte {
return append([]byte(operator), operatorStakersRoot[:]...)
}

func (s *StakerDelegationsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return s.BaseEigenState.DeleteState("delegated_stakers", startBlockNumber, endBlockNumber, s.Db)
}
4 changes: 4 additions & 0 deletions internal/eigenState/stakerShares/stakerShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,7 @@ func encodeStratTree(strategy string, stakerTreeRoot []byte) []byte {
strategyBytes := []byte(strategy)
return append(strategyBytes, stakerTreeRoot[:]...)
}

func (ss *StakerSharesModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return ss.BaseEigenState.DeleteState("staker_shares", startBlockNumber, endBlockNumber, ss.Db)
}
28 changes: 28 additions & 0 deletions internal/eigenState/stateManager/stateManager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stateManager

import (
"errors"
"fmt"
"github.com/Layr-Labs/go-sidecar/internal/eigenState/types"
"github.com/Layr-Labs/go-sidecar/internal/storage"
Expand Down Expand Up @@ -169,3 +170,30 @@ func (e *EigenStateManager) GetSortedModelIndexes() []int {
slices.Sort(indexes)
return indexes
}

func (e *EigenStateManager) GetLatestStateRoot() (*StateRoot, error) {
root := &StateRoot{}
result := e.Db.Model(&StateRoot{}).Order("eth_block_number desc").First(&root)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return root, nil
}

// DeleteCorruptedState deletes state stored that may be incomplete or corrupted
//
// @param startBlock the block number to start deleting state from (inclusive)
// @param endBlock the block number to end deleting state from (inclusive). If 0, delete all state from startBlock
func (e *EigenStateManager) DeleteCorruptedState(startBlock uint64, endBlock uint64) error {
for _, index := range e.GetSortedModelIndexes() {
state := e.StateModels[index]
err := state.DeleteState(startBlock, endBlock)
if err != nil {
return err
}
}
return nil
}
7 changes: 7 additions & 0 deletions internal/eigenState/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type IEigenStateModel interface {
// ClearAccumulatedState
// Clear the accumulated state for the model to free up memory
ClearAccumulatedState(blockNumber uint64) error

// DeleteState used to delete state stored that may be incomplete or corrupted
// to allow for reprocessing of the state
//
// @param startBlockNumber the block number to start deleting state from (inclusive)
// @param endBlockNumber the block number to end deleting state from (inclusive). If 0, delete all state from startBlockNumber
DeleteState(startBlockNumber uint64, endBlockNumber uint64) error
}

// StateTransitions
Expand Down
33 changes: 32 additions & 1 deletion internal/sidecar/blockIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,46 @@ func (ct *currentTip) Set(tip uint64) {
}

func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {

latestBlock, err := s.GetLastIndexedBlock()
if err != nil {
return err
}

latestStateRoot, err := s.StateManager.GetLatestStateRoot()
if err != nil {
s.Logger.Sugar().Errorw("Failed to get latest state root", zap.Error(err))
return err
}

s.Logger.Sugar().Infow("Comparing latest block and latest state root",
zap.Int64("latestBlock", latestBlock),
zap.Uint64("latestStateRootBlock", latestStateRoot.EthBlockNumber),
)

if latestBlock == 0 {
s.Logger.Sugar().Infow("No blocks indexed, starting from genesis block", zap.Uint64("genesisBlock", s.Config.GenesisBlockNumber))
latestBlock = int64(s.Config.GenesisBlockNumber)
} else {
latestBlock += 1
// if the latest state root is behind the latest block, delete the corrupted state and set the
// latest block to the latest state root + 1
if latestStateRoot != nil && latestStateRoot.EthBlockNumber < uint64(latestBlock) {
s.Logger.Sugar().Infow("Latest state root is behind latest block, deleting corrupted state",
zap.Uint64("latestStateRoot", latestStateRoot.EthBlockNumber),
zap.Int64("latestBlock", latestBlock),
)
if err := s.StateManager.DeleteCorruptedState(latestStateRoot.EthBlockNumber+1, uint64(latestBlock)); err != nil {
s.Logger.Sugar().Errorw("Failed to delete corrupted state", zap.Error(err))
return err
}
if err := s.Storage.DeleteCorruptedState(uint64(latestStateRoot.EthBlockNumber+1), uint64(latestBlock)); err != nil {
s.Logger.Sugar().Errorw("Failed to delete corrupted state", zap.Error(err))
return err
}
} else {
// otherwise, start from the latest block + 1
latestBlock += 1
}
}

blockNumber, err := s.EthereumClient.GetBlockNumberUint64(ctx)
Expand Down
3 changes: 3 additions & 0 deletions internal/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Sidecar struct {
Storage storage.BlockStore
Pipeline *pipeline.Pipeline
EthereumClient *ethereum.Client
StateManager *stateManager.EigenStateManager
ShutdownChan chan bool
}

Expand All @@ -43,6 +44,7 @@ func NewSidecar(
gCfg *config.Config,
s storage.BlockStore,
p *pipeline.Pipeline,
em *stateManager.EigenStateManager,
l *zap.Logger,
ethClient *ethereum.Client,
) *Sidecar {
Expand All @@ -53,6 +55,7 @@ func NewSidecar(
Storage: s,
Pipeline: p,
EthereumClient: ethClient,
StateManager: em,
ShutdownChan: make(chan bool),
}
}
Expand Down
48 changes: 48 additions & 0 deletions internal/storage/sqlite/storage.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sqlite

import (
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -203,3 +204,50 @@ func (s *SqliteBlockStore) GetLatestActiveAvsOperators(blockNumber uint64, avsDi
}
return rows, nil
}

func (s *SqliteBlockStore) DeleteCorruptedState(startBlockNumber uint64, endBlockNumber uint64) error {
if endBlockNumber != 0 && endBlockNumber < startBlockNumber {
s.Logger.Sugar().Errorw("Invalid block range",
zap.Uint64("startBlockNumber", startBlockNumber),
zap.Uint64("endBlockNumber", endBlockNumber),
)
return fmt.Errorf("Invalid block range; endBlockNumber must be greater than or equal to startBlockNumber")
}

tablesWithBlockNumber := []string{
"transaction_logs",
"transactions",
}

for _, tableName := range tablesWithBlockNumber {
query := fmt.Sprintf(`
delete from %s
where block_number >= @startBlockNumber
`, tableName)
if endBlockNumber > 0 {
query += " and block_number <= @endBlockNumber"
}
res := s.Db.Exec(query,
sql.Named("startBlockNumber", startBlockNumber),
sql.Named("endBlockNumber", endBlockNumber),
)
if res.Error != nil {
return xerrors.Errorf("Failed to delete corrupted state from table '%s': %w", tableName, res.Error)
}
}
blocksQuery := `
delete from blocks
where number >= @startBlockNumber
`
if endBlockNumber > 0 {
blocksQuery += " and number <= @endBlockNumber"
}
res := s.Db.Exec(blocksQuery,
sql.Named("startBlockNumber", startBlockNumber),
sql.Named("endBlockNumber", endBlockNumber),
)
if res.Error != nil {
return xerrors.Errorf("Failed to delete corrupted state from table 'blocks': %w", res.Error)
}
return nil
}
6 changes: 6 additions & 0 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ type BlockStore interface {

// Less generic functions
GetLatestActiveAvsOperators(blockNumber uint64, avsDirectoryAddress string) ([]*ActiveAvsOperator, error)

// DeleteCorruptedState deletes all the corrupted state from the database
//
// @param startBlockNumber: The block number from which to start (inclusive)
// @param endBlockNumber: The block number at which to end (inclusive). If 0, it will delete all the corrupted state from the startBlock
DeleteCorruptedState(startBlockNumber uint64, endBlockNumber uint64) error
}

// Tables
Expand Down

0 comments on commit d0355cf

Please sign in to comment.