Skip to content

Commit

Permalink
Gracefully shutdown to prevent data corruption
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Sep 9, 2024
1 parent 6b2b93f commit c0ed9b7
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 39 deletions.
64 changes: 33 additions & 31 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,39 @@ jobs:
go-version: 1.22
- name: Run tests
run: make test
#build:
# # needs: test
# runs-on: ubuntu-latest
# steps:
# - name: Checkout
# uses: actions/checkout@v4
# - name: Configure AWS credentials
# uses: aws-actions/configure-aws-credentials@v4
# with:
# aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
# aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
# aws-region: ${{ secrets.AWS_REGION }}
# - name: Login to Amazon ECR
# id: login-ecr
# uses: aws-actions/amazon-ecr-login@v2
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v2
# - name: Set up Docker Buildx
# uses: docker/setup-buildx-action@v3
# - name: Build, tag, and push docker image to Amazon ECR
# env:
# REGISTRY: "767397703211.dkr.ecr.us-east-1.amazonaws.com"
# REPOSITORY: ${{ github.event.repository.name }}
# IMAGE_TAG: ${{ github.sha }}
# PLATFORMS: "linux/amd64"
# run: |
# if [[ $GITHUB_REF == refs/heads/master ]]; then
# docker buildx build --platform $PLATFORMS -t $REGISTRY/$REPOSITORY:$IMAGE_TAG -t $REGISTRY/$REPOSITORY:latest --push .
# else
# docker buildx build --platform $PLATFORMS -t $REGISTRY/$REPOSITORY:$IMAGE_TAG --push .
# fi
build:
# needs: test
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ secrets.AWS_REGION }}
- name: Login to Amazon ECR
id: login-ecr-public
uses: aws-actions/amazon-ecr-login@v2
with:
registry-type: public
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build, tag, and push docker image to Amazon ECR
env:
REGISTRY: "public.ecr.aws/z6g0f8n7"
REPOSITORY: ${{ github.event.repository.name }}
IMAGE_TAG: ${{ github.sha }}
PLATFORMS: "linux/amd64,linux/arm64"
run: |
if [[ $GITHUB_REF == refs/heads/master ]]; then
docker buildx build --platform $PLATFORMS -t $REGISTRY/$REPOSITORY:$IMAGE_TAG -t $REGISTRY/$REPOSITORY:latest --push .
else
docker buildx build --platform $PLATFORMS -t $REGISTRY/$REPOSITORY:$IMAGE_TAG --push .
fi
#build-mainnet:
# # needs: test
# if: github.ref == 'refs/heads/master'
Expand Down
1 change: 1 addition & 0 deletions cmd/sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,6 @@ func main() {
shutdown.ListenForShutdown(gracefulShutdown, done, func() {
l.Sugar().Info("Shutting down...")
rpcChannel <- true
sidecar.ShutdownChan <- true
}, time.Second*5, l)
}
11 changes: 11 additions & 0 deletions internal/eigenState/stateManager/stateManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ func (e *EigenStateManager) CommitFinalState(blockNumber uint64) error {
return nil
}

func (e *EigenStateManager) CleanupBlock(blockNumber uint64) error {
for _, index := range e.GetSortedModelIndexes() {
state := e.StateModels[index]
err := state.ClearAccumulatedState(blockNumber)
if err != nil {
return err
}
}
return nil
}

func (e *EigenStateManager) GenerateStateRoot(blockNumber uint64, blockHash string) (types.StateRoot, error) {
sortedIndexes := e.GetSortedModelIndexes()
roots := [][]byte{
Expand Down
18 changes: 10 additions & 8 deletions internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewPipeline(
}

func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error {
p.Logger.Sugar().Infow("Running pipeline for block", zap.Uint64("blockNumber", blockNumber))
p.Logger.Sugar().Debugw("Running pipeline for block", zap.Uint64("blockNumber", blockNumber))

/*
- Fetch block
Expand Down Expand Up @@ -63,7 +63,7 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error {
p.Logger.Sugar().Errorw("Failed to init processing for block", zap.Uint64("blockNumber", blockNumber), zap.Error(err))
return err
}
p.Logger.Sugar().Infow("Initialized processing for block", zap.Uint64("blockNumber", blockNumber))
p.Logger.Sugar().Debugw("Initialized processing for block", zap.Uint64("blockNumber", blockNumber))

// Parse all transactions and logs for the block.
// - If a transaction is not calling to a contract, it is ignored
Expand All @@ -77,7 +77,7 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error {
)
return err
}
p.Logger.Sugar().Infow("Parsed transactions", zap.Uint64("blockNumber", blockNumber), zap.Int("count", len(parsedTransactions)))
p.Logger.Sugar().Debugw("Parsed transactions", zap.Uint64("blockNumber", blockNumber), zap.Int("count", len(parsedTransactions)))

// With only interesting transactions/logs parsed, insert them into the database
for _, pt := range parsedTransactions {
Expand All @@ -90,7 +90,7 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error {
)
return err
}
p.Logger.Debug("Indexed transaction", zap.Uint64("blockNumber", blockNumber), zap.String("transactionHash", indexedTransaction.TransactionHash))
p.Logger.Sugar().Debugw("Indexed transaction", zap.Uint64("blockNumber", blockNumber), zap.String("transactionHash", indexedTransaction.TransactionHash))

for _, log := range pt.Logs {
indexedLog, err := p.Indexer.IndexLog(
Expand Down Expand Up @@ -162,12 +162,14 @@ func (p *Pipeline) RunForBlock(ctx context.Context, blockNumber uint64) error {
return err
}

if sr, err := p.stateManager.WriteStateRoot(blockNumber, block.Block.Hash.Value(), stateRoot); err != nil {
sr, err := p.stateManager.WriteStateRoot(blockNumber, block.Block.Hash.Value(), stateRoot)
if err != nil {
p.Logger.Sugar().Errorw("Failed to write state root", zap.Uint64("blockNumber", blockNumber), zap.Error(err))
return err
} else {
p.Logger.Sugar().Infow("Wrote state root", zap.Uint64("blockNumber", blockNumber), zap.Any("stateRoot", sr))
p.Logger.Sugar().Debugw("Wrote state root", zap.Uint64("blockNumber", blockNumber), zap.Any("stateRoot", sr))
}

return nil
_ = p.stateManager.CleanupBlock(blockNumber)

return err
}
55 changes: 55 additions & 0 deletions internal/sidecar/blockIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sidecar

import (
"context"
"fmt"
"go.uber.org/zap"
"sync"
"time"
Expand Down Expand Up @@ -48,6 +49,8 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {
if latestBlock == 0 {
s.Logger.Sugar().Infow("No blocks indexed, starting from genesis block", zap.Uint64("genesisBlock", s.Config.GenesisBlockNumber))
latestBlock = int64(s.Config.GenesisBlockNumber)
} else {
latestBlock += 1
}

blockNumber, err := s.EthereumClient.GetBlockNumberUint64(ctx)
Expand All @@ -63,9 +66,25 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {

ct := currentTip{CurrentTip: blockNumber}

shouldShutdown := false

go func() {
for {
select {
case <-s.ShutdownChan:
s.Logger.Sugar().Infow("Received shutdown signal")
shouldShutdown = true
}
}
}()

go func() {
for {
time.Sleep(time.Second * 30)
if shouldShutdown {
s.Logger.Sugar().Infow("Shutting down block listener...")
return
}
latestTip, err := s.EthereumClient.GetBlockNumberUint64(ctx)
if err != nil {
s.Logger.Sugar().Errorw("Failed to get latest tip", zap.Error(err))
Expand All @@ -80,14 +99,50 @@ func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {
}
}
}()
blocksProcessed := 0
runningAvg := 0
totalDurationMs := 0
for i := latestBlock; i <= int64(ct.Get()); i++ {
if shouldShutdown {
s.Logger.Sugar().Infow("Shutting down block processor")
return nil
}
tip := ct.Get()
pctComplete := float64(i) / float64(tip) * 100
blocksRemaining := tip - uint64(i)
estTimeRemainingMs := runningAvg * int(blocksRemaining)
estTimeRemainingHours := float64(estTimeRemainingMs) / 1000 / 60 / 60

if i%10 == 0 {
s.Logger.Sugar().Infow("Progress",
zap.String("percentComplete", fmt.Sprintf("%.2f", pctComplete)),
zap.Uint64("blocksRemaining", blocksRemaining),
zap.Float64("estimatedTimeRemaining (hrs)", estTimeRemainingHours),
zap.Float64("avgBlockProcessTime (ms)", float64(runningAvg)),
)
}

startTime := time.Now()
if err := s.Pipeline.RunForBlock(ctx, uint64(i)); err != nil {
s.Logger.Sugar().Errorw("Failed to run pipeline for block",
zap.Int64("currentBlockNumber", i),
zap.Error(err),
)
return err
}
delta := time.Since(startTime).Milliseconds()
blocksProcessed++

totalDurationMs += int(delta)
runningAvg = totalDurationMs / blocksProcessed

s.Logger.Sugar().Debugw("Processed block",
zap.Int64("blockNumber", i),
zap.Int64("duration", delta),
zap.Int("avgDuration", runningAvg),
)
}

// TODO(seanmcgary): transition to listening for new blocks
return nil
}
2 changes: 2 additions & 0 deletions internal/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Sidecar struct {
Storage storage.BlockStore
Pipeline *pipeline.Pipeline
EthereumClient *ethereum.Client
ShutdownChan chan bool
}

func NewSidecar(
Expand All @@ -52,6 +53,7 @@ func NewSidecar(
Storage: s,
Pipeline: p,
EthereumClient: ethClient,
ShutdownChan: make(chan bool),
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ func NewGormSqliteFromSqlite(sqlite gorm.Dialector) (*gorm.DB, error) {
return nil, err
}

// https://phiresky.github.io/blog/2020/sqlite-performance-tuning/
pragmas := []string{
`PRAGMA foreign_keys = ON;`,
`PRAGMA journal_mode = WAL;`,
`PRAGMA synchronous = normal;`,
`pragma mmap_size = 30000000000;`,
}

for _, pragma := range pragmas {
Expand Down

0 comments on commit c0ed9b7

Please sign in to comment.