From 82ddc0fd23639cf7d60e47ad38701d3a9f3a9630 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Thu, 7 Oct 2021 15:47:50 +0200 Subject: [PATCH 1/6] core: improve shutdown synchronization in BlockChain --- core/blockchain.go | 156 +++++++++++++++++++++++++--------------- core/blockchain_test.go | 5 +- internal/syncx/mutex.go | 64 +++++++++++++++++ 3 files changed, 164 insertions(+), 61 deletions(-) create mode 100644 internal/syncx/mutex.go diff --git a/core/blockchain.go b/core/blockchain.go index 87cdaded4..880fc3bd4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -39,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/internal/syncx" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -80,6 +81,7 @@ var ( blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) errInsertionInterrupted = errors.New("insertion is interrupted") + errChainStopped = errors.New("blockchain is stopped") ) const ( @@ -183,7 +185,9 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - chainmu sync.RWMutex // blockchain insertion lock + // This mutex synchronizes chain write operations. + // Readers don't need to take it, they can just read the database. + chainmu *syncx.ClosableMutex currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) @@ -196,8 +200,8 @@ type BlockChain struct { txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing - quit chan struct{} // blockchain quit channel - wg sync.WaitGroup // chain processing wait group for shutting down + wg sync.WaitGroup // + quit chan struct{} // shutdown signal, closed in Stop. running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing @@ -243,6 +247,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par Preimages: cacheConfig.Preimages, }), quit: make(chan struct{}), + chainmu: syncx.NewClosableMutex(), shouldPreserve: shouldPreserve, bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -288,6 +293,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par if err := bc.loadLastState(); err != nil { return nil, err } + // Make sure the state associated with the block is available head := bc.CurrentBlock() if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil { @@ -316,6 +322,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } } + // Ensure that a previous crash in SetHead doesn't leave extra ancients if frozen, err := bc.db.Ancients(); err == nil && frozen > 0 { var ( @@ -367,6 +374,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } } + // Load any existing snapshot, regenerating it if loading failed if bc.cacheConfig.SnapshotLimit > 0 { // If the chain was rewound past the snapshot persistent layer (causing @@ -382,14 +390,19 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } bc.snaps, _ = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, true, recover) } - // Take ownership of this particular state - go bc.update() + + // Start future block processor. + bc.wg.Add(1) + go bc.futureBlocksLoop() + + // Start tx indexer/unindexer. if txLookupLimit != nil { bc.txLookupLimit = *txLookupLimit bc.wg.Add(1) go bc.maintainTxIndex(txIndexBlock) } + // If periodic cache journal is required, spin it up. if bc.cacheConfig.TrieCleanRejournal > 0 { if bc.cacheConfig.TrieCleanRejournal < time.Minute { @@ -498,7 +511,9 @@ func (bc *BlockChain) SetHead(head uint64) error { // // The method returns the block number where the requested root cap was found. func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() // Track the block number of the requested root hash @@ -646,8 +661,11 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB()); err != nil { return err } - // If all checks out, manually set the head block - bc.chainmu.Lock() + + // If all checks out, manually set the head block. + if !bc.chainmu.TryLock() { + return errChainStopped + } bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() @@ -720,7 +738,9 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.SetHead(0); err != nil { return err } - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return errChainStopped + } defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain @@ -750,8 +770,10 @@ func (bc *BlockChain) Export(w io.Writer) error { // ExportN writes a subset of the active chain to the given writer. func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { - bc.chainmu.RLock() - defer bc.chainmu.RUnlock() + if !bc.chainmu.TryLock() { + return errChainStopped + } + defer bc.chainmu.Unlock() if first > last { return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last) @@ -1004,10 +1026,21 @@ func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } - // Unsubscribe all subscriptions registered from blockchain + + // Unsubscribe all subscriptions registered from blockchain. bc.scope.Close() + + // Signal shutdown to all goroutines. close(bc.quit) bc.StopInsert() + + // Now wait for all chain modifications to end and persistent goroutines to exit. + // + // Note: Close waits for the mutex to become available, i.e. any running chain + // modification will have exited when Close returns. Since we also called StopInsert, + // the mutex should become available quickly. It cannot be taken again after Close has + // returned. + bc.chainmu.Close() bc.wg.Wait() // Ensure that the entirety of the state snapshot is journalled to disk. @@ -1018,6 +1051,7 @@ func (bc *BlockChain) Stop() { log.Error("Failed to journal state snapshot", "err", err) } } + // Ensure the state of a recent block is also stored to disk before exiting. // We're writing three different states to catch different restart scenarios: // - HEAD: So we don't need to reprocess any blocks in the general case @@ -1172,7 +1206,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // updateHead updates the head fast sync block if the inserted blocks are better // and returns an indicator whether the inserted blocks are canonical. updateHead := func(head *types.Block) bool { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return false + } + defer bc.chainmu.Unlock() // Rewind may have occurred, skip in that case. if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 { @@ -1419,8 +1456,9 @@ var lastWrite uint64 // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if bc.insertStopped() { + return errInsertionInterrupted + } batch := bc.db.NewBatch() rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) @@ -1434,9 +1472,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e // writeKnownBlock updates the head block flag with a known block // and introduces chain reorg if necessary. func (bc *BlockChain) writeKnownBlock(block *types.Block) error { - bc.wg.Add(1) - defer bc.wg.Done() - current := bc.CurrentBlock() if block.ParentHash() != current.Hash() { if err := bc.reorg(current, block); err != nil { @@ -1449,17 +1484,19 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { // WriteBlockWithState writes the block and all associated state to the database. func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return NonStatTy, errInsertionInterrupted + } defer bc.chainmu.Unlock() - return bc.writeBlockWithState(block, receipts, logs, state, emitHeadEvent) } // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if bc.insertStopped() { + return NonStatTy, errInsertionInterrupted + } // Calculate the total difficulty of the block ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1) @@ -1668,31 +1705,28 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { bc.blockProcFeed.Send(true) defer bc.blockProcFeed.Send(false) - // Remove already known canon-blocks - var ( - block, prev *types.Block - ) - // Do a sanity check that the provided chain is actually ordered and linked + // Do a sanity check that the provided chain is actually ordered and linked. for i := 1; i < len(chain); i++ { - block = chain[i] - prev = chain[i-1] + block, prev := chain[i], chain[i-1] if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() { - // Chain broke ancestry, log a message (programming error) and skip insertion - log.Error("Non contiguous block insert", "number", block.Number(), "hash", block.Hash(), - "parent", block.ParentHash(), "prevnumber", prev.Number(), "prevhash", prev.Hash()) - + log.Error("Non contiguous block insert", + "number", block.Number(), + "hash", block.Hash(), + "parent", block.ParentHash(), + "prevnumber", prev.Number(), + "prevhash", prev.Hash(), + ) return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, prev.NumberU64(), prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4]) } } - // Pre-checks passed, start the full block imports - bc.wg.Add(1) - bc.chainmu.Lock() - n, err := bc.insertChain(chain, true) - bc.chainmu.Unlock() - bc.wg.Done() - return n, err + // Pre-check passed, start the full block imports. + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } + defer bc.chainmu.Unlock() + return bc.insertChain(chain, true) } // InsertChainWithoutSealVerification works exactly the same @@ -1701,14 +1735,11 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in bc.blockProcFeed.Send(true) defer bc.blockProcFeed.Send(false) - // Pre-checks passed, start the full block imports - bc.wg.Add(1) - bc.chainmu.Lock() - n, err := bc.insertChain(types.Blocks([]*types.Block{block}), false) - bc.chainmu.Unlock() - bc.wg.Done() - - return n, err + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } + defer bc.chainmu.Unlock() + return bc.insertChain(types.Blocks([]*types.Block{block}), false) } // insertChain is the internal implementation of InsertChain, which assumes that @@ -1720,10 +1751,11 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in // is imported, but then new canon-head is added before the actual sidechain // completes, then the historic state could be pruned again func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error) { - // If the chain is terminating, don't even bother starting up - if atomic.LoadInt32(&bc.procInterrupt) == 1 { + // If the chain is terminating, don't even bother starting up. + if bc.insertStopped() { return 0, nil } + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) @@ -1758,8 +1790,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // First block (and state) is known // 1. We did a roll-back, and should now do a re-import // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot - // from the canonical chain, which has not been verified. - // Skip all known blocks that are behind us + // from the canonical chain, which has not been verified. + // Skip all known blocks that are behind us. var ( current = bc.CurrentBlock() localTd = bc.GetTd(current.Hash(), current.NumberU64()) @@ -1883,9 +1915,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er lastCanon = block continue } + // Retrieve the parent block and it's state to execute on top start := time.Now() - parent := it.previous() if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) @@ -1894,6 +1926,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if err != nil { return it.index, err } + // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") activeState = statedb @@ -1915,6 +1948,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er }(time.Now(), followup, throwaway, &followupInterrupt) } } + // Process block using the parent state as reference point substart := time.Now() receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) @@ -2004,6 +2038,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er dirty, _ := bc.stateCache.TrieDB().Size() stats.report(chain, it.index, dirty) } + // Any blocks remaining here? The only ones we care about are the future ones if block != nil && errors.Is(err, consensus.ErrFutureBlock) { if err := bc.addFutureBlock(block); err != nil { @@ -2319,7 +2354,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return nil } -func (bc *BlockChain) update() { +// futureBlocksLoop processes the 'future block' queue. +func (bc *BlockChain) futureBlocksLoop() { + defer bc.wg.Done() + futureTimer := time.NewTicker(5 * time.Second) defer futureTimer.Stop() for { @@ -2356,6 +2394,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { } rawdb.IndexTransactions(bc.db, from, ancients, bc.quit) } + // indexBlocks reindexes or unindexes transactions depending on user configuration indexBlocks := func(tail *uint64, head uint64, done chan struct{}) { defer func() { done <- struct{}{} }() @@ -2388,6 +2427,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) } } + // Any reindexing done, start listening to chain events and moving the index window var ( done chan struct{} // Non-nil if background unindexing or reindexing routine is active. @@ -2455,12 +2495,10 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i return i, err } - // Make sure only one thread manipulates the chain at once - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() - - bc.wg.Add(1) - defer bc.wg.Done() _, err := bc.hc.InsertHeaderChain(chain, start) return 0, err } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 4e5df633b..8f6069a5c 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -163,7 +163,8 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.reportBlock(block, receipts, err) return err } - blockchain.chainmu.Lock() + + blockchain.chainmu.MustLock() rawdb.WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) rawdb.WriteBlock(blockchain.db, block) statedb.Commit(false) @@ -181,7 +182,7 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error return err } // Manually insert the header into the database, but don't reorganise (allows subsequent testing) - blockchain.chainmu.Lock() + blockchain.chainmu.MustLock() rawdb.WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) rawdb.WriteHeader(blockchain.db, header) blockchain.chainmu.Unlock() diff --git a/internal/syncx/mutex.go b/internal/syncx/mutex.go new file mode 100644 index 000000000..96a21986c --- /dev/null +++ b/internal/syncx/mutex.go @@ -0,0 +1,64 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package syncx contains exotic synchronization primitives. +package syncx + +// ClosableMutex is a mutex that can also be closed. +// Once closed, it can never be taken again. +type ClosableMutex struct { + ch chan struct{} +} + +func NewClosableMutex() *ClosableMutex { + ch := make(chan struct{}, 1) + ch <- struct{}{} + return &ClosableMutex{ch} +} + +// TryLock attempts to lock cm. +// If the mutex is closed, TryLock returns false. +func (cm *ClosableMutex) TryLock() bool { + _, ok := <-cm.ch + return ok +} + +// MustLock locks cm. +// If the mutex is closed, MustLock panics. +func (cm *ClosableMutex) MustLock() { + _, ok := <-cm.ch + if !ok { + panic("mutex closed") + } +} + +// Unlock unlocks cm. +func (cm *ClosableMutex) Unlock() { + select { + case cm.ch <- struct{}{}: + default: + panic("Unlock of already-unlocked ClosableMutex") + } +} + +// Close locks the mutex, then closes it. +func (cm *ClosableMutex) Close() { + _, ok := <-cm.ch + if !ok { + panic("Close of already-closed ClosableMutex") + } + close(cm.ch) +} From 469cc5939d96dce56bcc732cf4971af8e926aa49 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 8 Oct 2021 20:12:52 +0200 Subject: [PATCH 2/6] miner: fix data race during shutdown (#23435) This fixes a data race on worker.current by moving the call to StopPrefetcher into the main loop. The commit also contains fixes for two other races in unit tests of unrelated packages. --- eth/gasprice/gasprice_test.go | 2 +- eth/handler_eth_test.go | 1 - miner/worker.go | 8 +++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/eth/gasprice/gasprice_test.go b/eth/gasprice/gasprice_test.go index feecfddec..ced9010bc 100644 --- a/eth/gasprice/gasprice_test.go +++ b/eth/gasprice/gasprice_test.go @@ -151,7 +151,7 @@ func newTestBackend(t *testing.T, londonBlock *big.Int, pending bool) *testBacke // Construct testing chain diskdb := rawdb.NewMemoryDatabase() gspec.Commit(diskdb) - chain, err := core.NewBlockChain(diskdb, nil, gspec.Config, engine, vm.Config{}, nil, nil) + chain, err := core.NewBlockChain(diskdb, &core.CacheConfig{TrieCleanNoPrefetch: true}, gspec.Config, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to create local chain, %v", err) } diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 039091244..b8db5039c 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -486,7 +486,6 @@ func TestCheckpointChallenge(t *testing.T) { } func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpoint bool, timeout bool, empty bool, match bool, drop bool) { - t.Parallel() // Reduce the checkpoint handshake challenge timeout defer func(old time.Duration) { syncChallengeTimeout = old }(syncChallengeTimeout) diff --git a/miner/worker.go b/miner/worker.go index 8bdb1eff7..b9935fbbf 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -318,9 +318,6 @@ func (w *worker) isRunning() bool { // close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { - if w.current != nil && w.current.state != nil { - w.current.state.StopPrefetcher() - } atomic.StoreInt32(&w.running, 0) close(w.exitCh) } @@ -449,6 +446,11 @@ func (w *worker) mainLoop() { defer w.txsSub.Unsubscribe() defer w.chainHeadSub.Unsubscribe() defer w.chainSideSub.Unsubscribe() + defer func() { + if w.current != nil && w.current.state != nil { + w.current.state.StopPrefetcher() + } + }() for { select { From 30220944cde0a2d783cc2678b5918cef0d851b63 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 8 Oct 2021 18:36:58 +0200 Subject: [PATCH 3/6] eth: close miner on exit (instead of just stopping) (#21992) This ensures that all miner goroutines have exited before stopping the blockchain. Co-authored-by: Felix Lange --- eth/backend.go | 2 +- miner/miner.go | 8 +++++++- miner/worker.go | 8 ++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 273fe9fa5..ecd6a1b5f 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -585,7 +585,7 @@ func (s *Ethereum) Stop() error { close(s.closeBloomHandler) s.txPool.Stop() - s.miner.Stop() + s.miner.Close() s.blockchain.Stop() s.engine.Close() diff --git a/miner/miner.go b/miner/miner.go index a4a01b9f4..1c33b3bd2 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -20,6 +20,7 @@ package miner import ( "fmt" "math/big" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -63,6 +64,8 @@ type Miner struct { exitCh chan struct{} startCh chan common.Address stopCh chan struct{} + + wg sync.WaitGroup } func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner { @@ -75,8 +78,8 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even stopCh: make(chan struct{}), worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true), } + miner.wg.Add(1) go miner.update() - return miner } @@ -85,6 +88,8 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even // the loop is exited. This to prevent a major security vuln where external parties can DOS you with blocks // and halt your mining operation for as long as the DOS continues. func (miner *Miner) update() { + defer miner.wg.Done() + events := miner.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) defer func() { if !events.Closed() { @@ -154,6 +159,7 @@ func (miner *Miner) Stop() { func (miner *Miner) Close() { close(miner.exitCh) + miner.wg.Wait() } func (miner *Miner) Mining() bool { diff --git a/miner/worker.go b/miner/worker.go index b9935fbbf..8a34ac117 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -150,6 +150,8 @@ type worker struct { resubmitIntervalCh chan time.Duration resubmitAdjustCh chan *intervalAdjust + wg sync.WaitGroup + current *environment // An environment for current running cycle. localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks. remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. @@ -225,6 +227,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus recommit = minRecommitInterval } + worker.wg.Add(4) go worker.mainLoop() go worker.newWorkLoop(recommit) go worker.resultLoop() @@ -320,6 +323,7 @@ func (w *worker) isRunning() bool { func (w *worker) close() { atomic.StoreInt32(&w.running, 0) close(w.exitCh) + w.wg.Wait() } // recalcRecommit recalculates the resubmitting interval upon feedback. @@ -346,6 +350,7 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t // newWorkLoop is a standalone goroutine to submit new mining work upon received events. func (w *worker) newWorkLoop(recommit time.Duration) { + defer w.wg.Done() var ( interrupt *int32 minRecommit = recommit // minimal resubmit interval specified by user. @@ -443,6 +448,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { // mainLoop is a standalone goroutine to regenerate the sealing task based on the received event. func (w *worker) mainLoop() { + defer w.wg.Done() defer w.txsSub.Unsubscribe() defer w.chainHeadSub.Unsubscribe() defer w.chainSideSub.Unsubscribe() @@ -550,6 +556,7 @@ func (w *worker) mainLoop() { // taskLoop is a standalone goroutine to fetch sealing task from the generator and // push them to consensus engine. func (w *worker) taskLoop() { + defer w.wg.Done() var ( stopCh chan struct{} prev common.Hash @@ -597,6 +604,7 @@ func (w *worker) taskLoop() { // resultLoop is a standalone goroutine to handle sealing result submitting // and flush relative data to the database. func (w *worker) resultLoop() { + defer w.wg.Done() for { select { case block := <-w.resultCh: From cb2b934900115c51ab7df60969b006ae0ed48138 Mon Sep 17 00:00:00 2001 From: bladehan1 Date: Tue, 25 Apr 2023 12:17:40 +0800 Subject: [PATCH 4/6] merge: dependent forward modification --- core/blockchain.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 880fc3bd4..09ba345a4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1218,11 +1218,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) bc.currentFastBlock.Store(head) headFastBlockGauge.Update(int64(head.NumberU64())) - bc.chainmu.Unlock() return true } } - bc.chainmu.Unlock() return false } // writeAncient writes blockchain and corresponding receipt chain into ancient store. From 6d06d879968fedfbc57baf72ae458c286fdc397d Mon Sep 17 00:00:00 2001 From: bladehan1 Date: Fri, 19 May 2023 15:17:42 +0800 Subject: [PATCH 5/6] fix: validator verify --- consensus/bor/bor.go | 78 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index da76ea943..60dcd7d6e 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -410,6 +410,31 @@ func (c *Bor) verifyCascadingFields(chain consensus.ChainHeaderReader, header *t if err != nil { return err } + // Verify the validator list match the local contract + if isSprintStart(number+1, c.config.Sprint) { + newValidators, err := c.GetCurrentValidatorsByBlockNrOrHash(context.Background(), rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber), number+1) + if err != nil { + return err + } + + sort.Sort(ValidatorsByAddress(newValidators)) + + headerVals, err := ParseValidators(header.Extra[extraVanity : len(header.Extra)-extraSeal]) + + if err != nil { + return err + } + + if len(newValidators) != len(headerVals) { + return errInvalidSpanValidators + } + + for i, val := range newValidators { + if !bytes.Equal(val.HeaderBytes(), headerVals[i].HeaderBytes()) { + return errInvalidSpanValidators + } + } + } // verify the validator list in the last sprint block if isSprintStart(number, c.config.Sprint) { @@ -1005,6 +1030,59 @@ func (c *Bor) GetCurrentValidators(headerHash common.Hash, blockNumber uint64) ( return valz, nil } +// GetCurrentValidatorsByBlockNrOrHash get current validators +func (c *Bor) GetCurrentValidatorsByBlockNrOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash, blockNumber uint64) ([]*Validator, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // method + const method = "getBorValidators" + + data, err := c.validatorSetABI.Pack(method, big.NewInt(0).SetUint64(blockNumber)) + if err != nil { + log.Error("Unable to pack tx for getValidator", "error", err) + return nil, err + } + + // call + msgData := (hexutil.Bytes)(data) + toAddress := common.HexToAddress(c.config.ValidatorContract) + gas := (hexutil.Uint64)(uint64(math.MaxUint64 / 2)) + + result, err := c.ethAPI.Call(ctx, ethapi.TransactionArgs{ + Gas: &gas, + To: &toAddress, + Data: &msgData, + }, blockNrOrHash, nil) + if err != nil { + return nil, err + } + + var ( + ret0 = new([]common.Address) + ret1 = new([]*big.Int) + ) + + out := &[]interface{}{ + ret0, + ret1, + } + + if err := c.validatorSetABI.UnpackIntoInterface(out, method, result); err != nil { + return nil, err + } + + valz := make([]*Validator, len(*ret0)) + for i, a := range *ret0 { + valz[i] = &Validator{ + Address: a, + VotingPower: (*ret1)[i].Int64(), + } + } + + return valz, nil +} + func (c *Bor) checkAndCommitSpan( state *state.StateDB, header *types.Header, From 8be8f42b307b0acde94f595e54c08e34069a10af Mon Sep 17 00:00:00 2001 From: yuekun Date: Fri, 26 May 2023 13:47:14 +0800 Subject: [PATCH 6/6] update version number to 1.0.8 --- params/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/version.go b/params/version.go index 6b5f60319..04145892d 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 0 // Minor version component of the current release - VersionPatch = 7 // Patch version component of the current release + VersionPatch = 8 // Patch version component of the current release VersionMeta = "stable" // Version metadata to append to the version string )