From a630f1626c19010d2fed0d3d8fb6e190914dd88e Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 10 Mar 2022 16:54:42 +0800 Subject: [PATCH] Parallel: handle fixup & code review & enhancement No fundamental change, some improvements, include: ** Add a new type ParallelStateProcessor; ** move Parallel Config to BlockChain ** more precious ParallelNum set ** Add EnableParallelProcessor() ** remove panic() ** remove useless: redo flag, ** change waitChan from `chan int` to `chan struct {}` and communicate by close() ** dispatch policy: queue `from` ahead of `to` ** pre-allocate allLogs ** disable parallel processor is snapshot is not enabled ** others: rename... --- cmd/utils/flags.go | 46 ++-- core/blockchain.go | 45 ++-- core/state/journal.go | 2 +- core/state/state_object.go | 2 - core/state/statedb.go | 283 ++++++++++-------------- core/state_processor.go | 438 +++++++++++++++++-------------------- core/types.go | 4 - eth/backend.go | 2 + eth/ethconfig/config.go | 5 +- 9 files changed, 384 insertions(+), 443 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 87affa96c4..b73d17e5f8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -26,6 +26,7 @@ import ( "math/big" "os" "path/filepath" + "runtime" godebug "runtime/debug" "strconv" "strings" @@ -804,17 +805,16 @@ var ( } ParallelTxFlag = cli.BoolFlag{ Name: "parallel", - Usage: "Enable the experimental parallel transaction execution mode (default = false)", + Usage: "Enable the experimental parallel transaction execution mode, only valid in full sync mode (default = false)", } ParallelTxNumFlag = cli.IntFlag{ Name: "parallel.num", - Usage: "Number of slot for transaction execution, only valid in parallel mode (default: CPUNum - 1)", - Value: core.ParallelExecNum, + Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)", } ParallelTxQueueSizeFlag = cli.IntFlag{ Name: "parallel.queuesize", - Usage: "Max number of Tx that can be queued to a slot, only valid in parallel mode", - Value: core.MaxPendingQueueSize, + Usage: "Max number of Tx that can be queued to a slot, only valid in parallel mode (advanced option)", + Value: 20, } // Init network @@ -1336,16 +1336,6 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) { if ctx.GlobalIsSet(InsecureUnlockAllowedFlag.Name) { cfg.InsecureUnlockAllowed = ctx.GlobalBool(InsecureUnlockAllowedFlag.Name) } - if ctx.GlobalIsSet(ParallelTxFlag.Name) { - core.ParallelTxMode = true - } - if ctx.GlobalIsSet(ParallelTxNumFlag.Name) { - core.ParallelExecNum = ctx.GlobalInt(ParallelTxNumFlag.Name) - } - if ctx.GlobalIsSet(ParallelTxQueueSizeFlag.Name) { - core.MaxPendingQueueSize = ctx.GlobalInt(ParallelTxQueueSizeFlag.Name) - } - } func setSmartCard(ctx *cli.Context, cfg *node.Config) { @@ -1666,6 +1656,32 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(RangeLimitFlag.Name) { cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name) } + if ctx.GlobalIsSet(ParallelTxFlag.Name) { + cfg.ParallelTxMode = ctx.GlobalBool(ParallelTxFlag.Name) + // The best prallel num will be tuned later, we do a simple parallel num set here + numCpu := runtime.NumCPU() + var parallelNum int + if ctx.GlobalIsSet(ParallelTxNumFlag.Name) { + // first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed + parallelNum = ctx.GlobalInt(ParallelTxNumFlag.Name) + if parallelNum < 1 { + parallelNum = 1 + } + } else if numCpu == 1 { + parallelNum = 1 // single CPU core + } else if numCpu < 10 { + parallelNum = numCpu - 1 + } else { + parallelNum = 8 // we found concurrency 8 is slightly better than 15 + } + cfg.ParallelTxNum = parallelNum + // set up queue size, it is an advanced option + if ctx.GlobalIsSet(ParallelTxQueueSizeFlag.Name) { + cfg.ParallelTxQueueSize = ctx.GlobalInt(ParallelTxQueueSizeFlag.Name) + } else { + cfg.ParallelTxQueueSize = 20 // default queue size, will be optimized + } + } // Read the value from the flag no matter if it's set or not. cfg.Preimages = ctx.GlobalBool(CachePreimagesFlag.Name) if cfg.NoPruning && !cfg.Preimages { diff --git a/core/blockchain.go b/core/blockchain.go index dcbecdbabe..96c607a0ef 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -79,7 +79,6 @@ var ( errInsertionInterrupted = errors.New("insertion is interrupted") errStateRootVerificationFailed = errors.New("state root verification failed") - ParallelTxMode = false // parallel transaction execution ) const ( @@ -241,12 +240,13 @@ type BlockChain struct { running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing - engine consensus.Engine - prefetcher Prefetcher - validator Validator // Block and state validator interface - processor Processor // Block transaction processor interface - vmConfig vm.Config - pipeCommit bool + engine consensus.Engine + prefetcher Prefetcher + validator Validator // Block and state validator interface + processor Processor // Block transaction processor interface + vmConfig vm.Config + pipeCommit bool + parallelExecution bool shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. @@ -311,9 +311,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bc.prefetcher = NewStatePrefetcher(chainConfig, bc, engine) bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.processor = NewStateProcessor(chainConfig, bc, engine) - if ParallelTxMode { - bc.processor.InitParallelOnce() - } var err error bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) @@ -2105,12 +2102,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } - statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) if err != nil { return it.index, err } - bc.updateHighestVerifiedHeader(block.Header()) // Enable prefetching to pull in trie node paths while processing transactions @@ -2118,7 +2113,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er var followupInterrupt uint32 // For diff sync, it may fallback to full sync, so we still do prefetch // parallel mode has a pipeline, similar to this prefetch, to save CPU we disable this prefetch for parallel - if !ParallelTxMode { + if !bc.parallelExecution { if len(block.Transactions()) >= prefetchTxNumber { throwaway := statedb.Copy() go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { @@ -2132,16 +2127,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er statedb.EnablePipeCommit() } statedb.SetExpectedStateRoot(block.Root()) - - var receipts types.Receipts - var logs []*types.Log - var usedGas uint64 - if ParallelTxMode { - statedb, receipts, logs, usedGas, err = bc.processor.ProcessParallel(block, statedb, bc.vmConfig) - } else { - statedb, receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig) - } - + statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) atomic.StoreUint32(&followupInterrupt, 1) activeState = statedb if err != nil { @@ -3118,3 +3104,16 @@ func EnablePersistDiff(limit uint64) BlockChainOption { return chain } } + +func EnableParallelProcessor(parallelNum int, queueSize int) BlockChainOption { + return func(chain *BlockChain) *BlockChain { + if chain.snaps == nil { + // disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie + log.Info("parallel processor is not enabled since snapshot is not enabled") + return chain + } + chain.parallelExecution = true + chain.processor = NewParallelStateProcessor(chain.Config(), chain, chain.engine, parallelNum, queueSize) + return chain + } +} diff --git a/core/state/journal.go b/core/state/journal.go index 487e79a57d..b3a2956f75 100644 --- a/core/state/journal.go +++ b/core/state/journal.go @@ -145,7 +145,7 @@ func (ch createObjectChange) revert(s *StateDB) { if s.parallel.isSlotDB { delete(s.parallel.dirtiedStateObjectsInSlot, *ch.account) } else { - s.deleteStateObjectFromStateDB(*ch.account) + s.deleteStateObj(*ch.account) } delete(s.stateObjectsDirty, *ch.account) } diff --git a/core/state/state_object.go b/core/state/state_object.go index a809d2a565..36adf786d6 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -37,8 +37,6 @@ func (c Code) String() string { return string(c) //strings.Join(Disassemble(c), " ") } -type StorageKeys map[common.Hash]struct{} - type Storage map[common.Hash]common.Hash func (s Storage) String() (str string) { diff --git a/core/state/statedb.go b/core/state/statedb.go index b4e60811ee..3a4297ea2f 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -85,18 +85,17 @@ func (s *StateObjectSyncMap) StoreStateObject(addr common.Address, stateObject * s.Store(addr, stateObject) } -// loadStateObjectFromStateDB is the entry for loading state object from stateObjects in StateDB or stateObjects in parallel -func (s *StateDB) loadStateObjectFromStateDB(addr common.Address) (*StateObject, bool) { +// loadStateObj is the entry for loading state object from stateObjects in StateDB or stateObjects in parallel +func (s *StateDB) loadStateObj(addr common.Address) (*StateObject, bool) { if s.isParallel { return s.parallel.stateObjects.LoadStateObject(addr) - } else { - obj, ok := s.stateObjects[addr] - return obj, ok } + obj, ok := s.stateObjects[addr] + return obj, ok } -// storeStateObjectToStateDB is the entry for storing state object to stateObjects in StateDB or stateObjects in parallel -func (s *StateDB) storeStateObjectToStateDB(addr common.Address, stateObject *StateObject) { +// storeStateObj is the entry for storing state object to stateObjects in StateDB or stateObjects in parallel +func (s *StateDB) storeStateObj(addr common.Address, stateObject *StateObject) { if s.isParallel { s.parallel.stateObjects.Store(addr, stateObject) } else { @@ -104,8 +103,8 @@ func (s *StateDB) storeStateObjectToStateDB(addr common.Address, stateObject *St } } -// deleteStateObjectFromStateDB is the entry for deleting state object to stateObjects in StateDB or stateObjects in parallel -func (s *StateDB) deleteStateObjectFromStateDB(addr common.Address) { +// deleteStateObj is the entry for deleting state object to stateObjects in StateDB or stateObjects in parallel +func (s *StateDB) deleteStateObj(addr common.Address) { if s.isParallel { s.parallel.stateObjects.Delete(addr) } else { @@ -115,7 +114,7 @@ func (s *StateDB) deleteStateObjectFromStateDB(addr common.Address) { // For parallel mode only, keep the change list for later conflict detect type SlotChangeList struct { - TxIndex int // the tx index of change list + TxIndex int StateObjectSuicided map[common.Address]struct{} StateChangeSet map[common.Address]StateKeys BalanceChangeSet map[common.Address]struct{} @@ -131,7 +130,8 @@ type ParallelState struct { // stateObjects holds the state objects in the base slot db // the reason for using stateObjects instead of stateObjects on the outside is // we need a thread safe map to hold state objects since there are many slots will read - // state objects from this and in the same time we will change this when merging slot db to the base slot db + // state objects from it; + // And we will merge all the changes made by the concurrent slot into it. stateObjects *StateObjectSyncMap baseTxIndex int // slotDB is created base on this tx index. @@ -144,7 +144,7 @@ type ParallelState struct { stateReadsInSlot map[common.Address]StateKeys stateChangesInSlot map[common.Address]StateKeys // no need record value // Actions such as SetCode, Suicide will change address's state. - // Later call like Exist(), Empty(), HasSuicided() depond on the address's state. + // Later call like Exist(), Empty(), HasSuicided() depend on the address's state. addrStateReadsInSlot map[common.Address]struct{} addrStateChangesInSlot map[common.Address]struct{} stateObjectsSuicidedInSlot map[common.Address]struct{} @@ -241,10 +241,9 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return newStateDB(root, db, snaps) } -// NewSlotDB creates a new slot stateDB base on the provided stateDB. -// With parallel, each execute slot would have its own stateDB. +// NewSlotDB creates a new State DB based on the provided StateDB. +// With parallel, each execution slot would have its own StateDB. func NewSlotDB(db *StateDB, systemAddr common.Address, baseTxIndex int, keepSystem bool) *StateDB { - log.Debug("NewSlotDB", "baseTxIndex", baseTxIndex) slotDB := db.CopyForSlot() slotDB.originalRoot = db.originalRoot slotDB.parallel.baseTxIndex = baseTxIndex @@ -252,8 +251,12 @@ func NewSlotDB(db *StateDB, systemAddr common.Address, baseTxIndex int, keepSyst slotDB.parallel.systemAddressOpsCount = 0 slotDB.parallel.keepSystemAddressBalance = keepSystem - // clear the slotDB's validator's balance first - // for slotDB, systemAddr's value is the tx's gas fee + // All transactions will pay gas fee to the systemAddr at the end, this address is + // deemed to conflict, we handle it specially, clear it now and set it back to the main + // StateDB later; + // But there are transactions that will try to read systemAddr's balance, such as: + // https://bscscan.com/tx/0xcd69755be1d2f55af259441ff5ee2f312830b8539899e82488a21e85bc121a2a. + // It will trigger transaction redo and keepSystem will be marked as true. if !keepSystem { slotDB.SetBalance(systemAddr, big.NewInt(0)) } @@ -300,13 +303,12 @@ func (s *StateDB) getStateObjectFromStateObjects(addr common.Address) (*StateObj return obj, ok } } - return s.loadStateObjectFromStateDB(addr) + return s.loadStateObj(addr) } -// If the transaction execution is failed, keep its read list for conflict detect -// and discard its state changed, execept its own balance change. +// RevertSlotDB keep its read list for conflict detect and discard its state changes except its own balance change, +// if the transaction execution is reverted, func (s *StateDB) RevertSlotDB(from common.Address) { - log.Debug("RevertSlotDB", "addr", from, "txIndex", s.txIndex) s.parallel.stateObjectsSuicidedInSlot = make(map[common.Address]struct{}) s.parallel.stateChangesInSlot = make(map[common.Address]StateKeys) s.parallel.balanceChangesInSlot = make(map[common.Address]struct{}, 1) @@ -315,26 +317,25 @@ func (s *StateDB) RevertSlotDB(from common.Address) { s.parallel.nonceChangesInSlot = make(map[common.Address]struct{}) } -// PrepareForParallel prepares for state db to be used in parallel process. +// PrepareForParallel prepares for state db to be used in parallel execution mode. func (s *StateDB) PrepareForParallel() { s.isParallel = true s.parallel.stateObjects = &StateObjectSyncMap{} } -// MergeSlotDB is for Parallel TX, when the TX is finalized(dirty -> pending) -// A bit similar to StateDB.Copy(), -// mainly copy stateObjects, since slotDB has been finalized. -// return and keep the slot's change list for later conflict detect. +// MergeSlotDB is for Parallel execution mode, when the transaction has been +// finalized(dirty -> pending) on execution slot, the execution results should be +// merged back to the main StateDB. +// And it will return and keep the slot's change list for later conflict detect. func (s *StateDB) MergeSlotDB(slotDb *StateDB, slotReceipt *types.Receipt, txIndex int) SlotChangeList { - // receipt.Logs with unified log Index within a block - // align slotDB's logs Index to the block stateDB's logSize + // receipt.Logs use unified log index within a block + // align slotDB's log index to the block stateDB's logSize for _, l := range slotReceipt.Logs { l.Index += s.logSize } s.logSize += slotDb.logSize - // before merge, do validator reward first: AddBalance to consensus.SystemAddress - // object of SystemAddress is take care specially + // before merge, pay the gas fee first: AddBalance to consensus.SystemAddress systemAddress := slotDb.parallel.systemAddress if slotDb.parallel.keepSystemAddressBalance { s.SetBalance(systemAddress, slotDb.GetBalance(systemAddress)) @@ -356,26 +357,21 @@ func (s *StateDB) MergeSlotDB(slotDb *StateDB, slotReceipt *types.Receipt, txInd // stateObjects: KV, balance, nonce... dirtyObj, ok := slotDb.getStateObjectFromStateObjects(addr) if !ok { - panic(fmt.Sprintf("MergeSlotDB dirty object not exist! (txIndex: %d, addr: %s)", slotDb.txIndex, addr.String())) + log.Error("parallel merge, but dirty object not exist!", "txIndex:", slotDb.txIndex, "addr", addr) + continue } - mainObj, exist := s.loadStateObjectFromStateDB(addr) - - log.Debug("MergeSlotDB", "txIndex", slotDb.txIndex, "addr", addr, - "exist", exist, "dirtyObj.deleted", dirtyObj.deleted) + mainObj, exist := s.loadStateObj(addr) if !exist { // addr not exist on main DB, do ownership transfer dirtyObj.db = s dirtyObj.finalise(true) // true: prefetch on dispatcher - s.storeStateObjectToStateDB(addr, dirtyObj) + s.storeStateObj(addr, dirtyObj) delete(slotDb.parallel.dirtiedStateObjectsInSlot, addr) // transfer ownership } else { // addr already in main DB, do merge: balance, KV, code, State(create, suicide) // can not do copy or ownership transfer directly, since dirtyObj could have outdated // data(may be update within the conflict window) - // Do deepCopy a temporary *StateObject for safety, - // since slot could read the address, dispatch should avoid overwrite the StateObject directly - // otherwise, it could crash for: concurrent map iteration and map write var newMainObj *StateObject if _, created := slotDb.parallel.addrStateChangesInSlot[addr]; created { // there are 3 kinds of state change: @@ -397,21 +393,24 @@ func (s *StateDB) MergeSlotDB(slotDb *StateDB, slotReceipt *types.Receipt, txInd delete(s.snapStorage, addr) } } else { - // do merge: balance, KV, code... + // deepCopy a temporary *StateObject for safety, since slot could read the address, + // dispatch should avoid overwrite the StateObject directly otherwise, it could + // crash for: concurrent map iteration and map write newMainObj = mainObj.deepCopy(s) if _, balanced := slotDb.parallel.balanceChangesInSlot[addr]; balanced { - log.Debug("MergeSlotDB state object merge: state merge: balance", + log.Debug("merge state object: Balance", "newMainObj.Balance()", newMainObj.Balance(), "dirtyObj.Balance()", dirtyObj.Balance()) newMainObj.SetBalance(dirtyObj.Balance()) } if _, coded := slotDb.parallel.codeChangesInSlot[addr]; coded { - log.Debug("MergeSlotDB state object merge: state merge: code") + log.Debug("merge state object: Code") newMainObj.code = dirtyObj.code newMainObj.data.CodeHash = dirtyObj.data.CodeHash newMainObj.dirtyCode = true } if keys, stated := slotDb.parallel.stateChangesInSlot[addr]; stated { + log.Debug("merge state object: KV") newMainObj.MergeSlotObject(s.db, dirtyObj, keys) } // dirtyObj.Nonce() should not be less than newMainObj @@ -419,7 +418,7 @@ func (s *StateDB) MergeSlotDB(slotDb *StateDB, slotReceipt *types.Receipt, txInd } newMainObj.finalise(true) // true: prefetch on dispatcher // update the object - s.storeStateObjectToStateDB(addr, newMainObj) + s.storeStateObj(addr, newMainObj) } addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure } @@ -440,62 +439,44 @@ func (s *StateDB) MergeSlotDB(slotDb *StateDB, slotReceipt *types.Receipt, txInd s.preimages[hash] = preimage } if s.accessList != nil { - // fixme: accessList is not enabled yet, should use merged rather than overwrite + // fixme: accessList is not enabled yet, but it should use merge rather than overwrite Copy s.accessList = slotDb.accessList.Copy() } if slotDb.snaps != nil { for k := range slotDb.snapDestructs { // There could be a race condition for parallel transaction execution - // One add balance 0 to an empty address, it will delete it(delete empty is enabled,). + // One transaction add balance 0 to an empty address, will delete it(delete empty is enabled). // While another concurrent transaction could add a none-zero balance to it, make it not empty // We fixed it by add a addr state read record for add balance 0 s.snapDestructs[k] = struct{}{} } - for k, v := range slotDb.snapAccounts { - s.snapAccounts[k] = v - } - for k, v := range slotDb.snapStorage { - temp := make(map[string][]byte) - for kk, vv := range v { - temp[kk] = vv - } - s.snapStorage[k] = temp - } + + // slotDb.snapAccounts should be empty, comment out and to be deleted later + // for k, v := range slotDb.snapAccounts { + // s.snapAccounts[k] = v + // } + // slotDb.snapStorage should be empty, comment out and to be deleted later + // for k, v := range slotDb.snapStorage { + // temp := make(map[string][]byte) + // for kk, vv := range v { + // temp[kk] = vv + // } + // s.snapStorage[k] = temp + // } } - // we have to create a new object to store change list for conflict detect, since - // StateDB could be reused and its elements could be overwritten + // to create a new object to store change list for conflict detect, + // since slot db reuse is disabled, we do not need to do copy. changeList := SlotChangeList{ TxIndex: txIndex, - StateObjectSuicided: make(map[common.Address]struct{}, len(slotDb.parallel.stateObjectsSuicidedInSlot)), - StateChangeSet: make(map[common.Address]StateKeys, len(slotDb.parallel.stateChangesInSlot)), - BalanceChangeSet: make(map[common.Address]struct{}, len(slotDb.parallel.balanceChangesInSlot)), - CodeChangeSet: make(map[common.Address]struct{}, len(slotDb.parallel.codeChangesInSlot)), - AddrStateChangeSet: make(map[common.Address]struct{}, len(slotDb.parallel.addrStateChangesInSlot)), - NonceChangeSet: make(map[common.Address]struct{}, len(slotDb.parallel.nonceChangesInSlot)), + StateObjectSuicided: slotDb.parallel.stateObjectsSuicidedInSlot, + StateChangeSet: slotDb.parallel.stateChangesInSlot, + BalanceChangeSet: slotDb.parallel.balanceChangesInSlot, + CodeChangeSet: slotDb.parallel.codeChangesInSlot, + AddrStateChangeSet: slotDb.parallel.addrStateChangesInSlot, + NonceChangeSet: slotDb.parallel.nonceChangesInSlot, } - for addr := range slotDb.parallel.stateObjectsSuicidedInSlot { - changeList.StateObjectSuicided[addr] = struct{}{} - } - for addr, storage := range slotDb.parallel.stateChangesInSlot { - changeList.StateChangeSet[addr] = storage - } - for addr := range slotDb.parallel.balanceChangesInSlot { - changeList.BalanceChangeSet[addr] = struct{}{} - } - for addr := range slotDb.parallel.codeChangesInSlot { - changeList.CodeChangeSet[addr] = struct{}{} - } - for addr := range slotDb.parallel.addrStateChangesInSlot { - changeList.AddrStateChangeSet[addr] = struct{}{} - } - for addr := range slotDb.parallel.nonceChangesInSlot { - changeList.NonceChangeSet[addr] = struct{}{} - } - - // the slot DB's is valid now, move baseTxIndex forward, since it could be reused. - slotDb.parallel.baseTxIndex = txIndex return changeList } @@ -503,9 +484,6 @@ func (s *StateDB) MergeSlotDB(slotDb *StateDB, slotReceipt *types.Receipt, txInd // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. func (s *StateDB) StartPrefetcher(namespace string) { - if s.parallel.isSlotDB { - log.Warn("StartPrefetcher should not be called by slot DB") - } s.prefetcherLock.Lock() defer s.prefetcherLock.Unlock() if s.prefetcher != nil { @@ -520,9 +498,6 @@ func (s *StateDB) StartPrefetcher(namespace string) { // StopPrefetcher terminates a running prefetcher and reports any leftover stats // from the gathered metrics. func (s *StateDB) StopPrefetcher() { - if s.parallel.isSlotDB { - log.Warn("StopPrefetcher should not be called by slot DB") - } s.prefetcherLock.Lock() defer s.prefetcherLock.Unlock() if s.prefetcher != nil { @@ -713,6 +688,12 @@ func (s *StateDB) StateReadsInSlot() map[common.Address]StateKeys { func (s *StateDB) BalanceReadsInSlot() map[common.Address]struct{} { return s.parallel.balanceReadsInSlot } + +// For most of the transactions, systemAddressOpsCount should be 2: +// one for SetBalance(0) on NewSlotDB() +// the other is for AddBalance(GasFee) at the end. +// (systemAddressOpsCount > 2) means the transaction tries to access systemAddress, in +// this case, we should redo and keep its balance on NewSlotDB() func (s *StateDB) SystemAddressRedo() bool { return s.parallel.systemAddressOpsCount > 2 } @@ -854,11 +835,9 @@ func (s *StateDB) HasSuicided(addr common.Address) bool { // AddBalance adds amount to the account associated with addr. func (s *StateDB) AddBalance(addr common.Address, amount *big.Int) { if s.parallel.isSlotDB { - // just in case other tx creates this account, - // we will miss this if we only add this account when found if amount.Sign() != 0 { s.parallel.balanceChangesInSlot[addr] = struct{}{} - // add balance will perform a read operation first, empty object will be deleted + // add balance will perform a read operation first s.parallel.balanceReadsInSlot[addr] = struct{}{} } else { // if amount == 0, no balance change, but there is still an empty check. @@ -877,20 +856,16 @@ func (s *StateDB) AddBalance(addr common.Address, amount *big.Int) { newStateObject := stateObject.deepCopy(s) newStateObject.AddBalance(amount) s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject - } else { - stateObject.AddBalance(amount) + return } - } else { - stateObject.AddBalance(amount) } + stateObject.AddBalance(amount) } } // SubBalance subtracts amount from the account associated with addr. func (s *StateDB) SubBalance(addr common.Address, amount *big.Int) { if s.parallel.isSlotDB { - // just in case other tx creates this account, - // we will miss this if we only add this account when found if amount.Sign() != 0 { s.parallel.balanceChangesInSlot[addr] = struct{}{} // unlike add, sub 0 balance will not touch empty object @@ -908,12 +883,10 @@ func (s *StateDB) SubBalance(addr common.Address, amount *big.Int) { newStateObject := stateObject.deepCopy(s) newStateObject.SubBalance(amount) s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject - } else { - stateObject.SubBalance(amount) + return } - } else { - stateObject.SubBalance(amount) } + stateObject.SubBalance(amount) } } @@ -921,20 +894,19 @@ func (s *StateDB) SetBalance(addr common.Address, amount *big.Int) { stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { if s.parallel.isSlotDB { + s.parallel.balanceChangesInSlot[addr] = struct{}{} + if addr == s.parallel.systemAddress { + s.parallel.systemAddressOpsCount++ + } + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { newStateObject := stateObject.deepCopy(s) newStateObject.SetBalance(amount) s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject - } else { - stateObject.SetBalance(amount) + return } - s.parallel.balanceChangesInSlot[addr] = struct{}{} - if addr == s.parallel.systemAddress { - s.parallel.systemAddressOpsCount++ - } - } else { - stateObject.SetBalance(amount) } + stateObject.SetBalance(amount) } } @@ -958,12 +930,10 @@ func (s *StateDB) SetNonce(addr common.Address, nonce uint64) { newStateObject := stateObject.deepCopy(s) newStateObject.SetNonce(nonce) s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject - } else { - stateObject.SetNonce(nonce) + return } - } else { - stateObject.SetNonce(nonce) } + stateObject.SetNonce(nonce) } } @@ -971,18 +941,16 @@ func (s *StateDB) SetCode(addr common.Address, code []byte) { stateObject := s.GetOrNewStateObject(addr) if stateObject != nil { if s.parallel.isSlotDB { + s.parallel.codeChangesInSlot[addr] = struct{}{} + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { newStateObject := stateObject.deepCopy(s) newStateObject.SetCode(crypto.Keccak256Hash(code), code) s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject - } else { - stateObject.SetCode(crypto.Keccak256Hash(code), code) + return } - - s.parallel.codeChangesInSlot[addr] = struct{}{} - } else { - stateObject.SetCode(crypto.Keccak256Hash(code), code) } + stateObject.SetCode(crypto.Keccak256Hash(code), code) } } @@ -999,21 +967,20 @@ func (s *StateDB) SetState(addr common.Address, key, value common.Hash) { return } } - if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { - newStateObject := stateObject.deepCopy(s) - newStateObject.SetState(s.db, key, value) - s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject - } else { - stateObject.SetState(s.db, key, value) - } if s.parallel.stateChangesInSlot[addr] == nil { s.parallel.stateChangesInSlot[addr] = make(StateKeys, defaultNumOfSlots) } s.parallel.stateChangesInSlot[addr][key] = struct{}{} - } else { - stateObject.SetState(s.db, key, value) + + if _, ok := s.parallel.dirtiedStateObjectsInSlot[addr]; !ok { + newStateObject := stateObject.deepCopy(s) + newStateObject.SetState(s.db, key, value) + s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject + return + } } + stateObject.SetState(s.db, key, value) } } @@ -1033,10 +1000,6 @@ func (s *StateDB) SetStorage(addr common.Address, storage map[common.Hash]common // getStateObject will return a non-nil account after Suicide. func (s *StateDB) Suicide(addr common.Address) bool { stateObject := s.getStateObject(addr) - - if s.parallel.isSlotDB { - s.parallel.addrStateReadsInSlot[addr] = struct{}{} - } if stateObject == nil { return false } @@ -1056,15 +1019,12 @@ func (s *StateDB) Suicide(addr common.Address) bool { newStateObject.markSuicided() newStateObject.data.Balance = new(big.Int) s.parallel.dirtiedStateObjectsInSlot[addr] = newStateObject - } else { - stateObject.markSuicided() - stateObject.data.Balance = new(big.Int) + return true } - } else { - stateObject.markSuicided() - stateObject.data.Balance = new(big.Int) } + stateObject.markSuicided() + stateObject.data.Balance = new(big.Int) return true } @@ -1266,7 +1226,7 @@ func (s *StateDB) SetStateObject(object *StateObject) { if s.parallel.isSlotDB { s.parallel.dirtiedStateObjectsInSlot[object.Address()] = object } else { - s.storeStateObjectToStateDB(object.Address(), object) + s.storeStateObj(object.Address(), object) } } @@ -1283,8 +1243,8 @@ func (s *StateDB) GetOrNewStateObject(addr common.Address) *StateObject { // the given address, it is overwritten and returned as the second return value. func (s *StateDB) createObject(addr common.Address) (newobj, prev *StateObject) { if s.parallel.isSlotDB { - s.parallel.addrStateReadsInSlot[addr] = struct{}{} // fixme: may not necessary - s.parallel.addrStateChangesInSlot[addr] = struct{}{} // address created. + s.parallel.addrStateReadsInSlot[addr] = struct{}{} // will try to get the previous object. + s.parallel.addrStateChangesInSlot[addr] = struct{}{} } prev = s.getDeletedStateObject(addr) // Note, prev might have been deleted, we need that! @@ -1293,8 +1253,8 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *StateObject) if s.snap != nil && prev != nil { _, prevdestruct = s.snapDestructs[prev.address] if !prevdestruct { - // createObject for deleted object is ok, - // it will destroy the previous trie node and update with the new object on block commit + // createObject for deleted object will destroy the previous trie node first + // and update the trie tree with the new object on block commit. s.snapDestructs[prev.address] = struct{}{} } } @@ -1325,10 +1285,10 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *StateObject) func (s *StateDB) CreateAccount(addr common.Address) { newObj, prev := s.createObject(addr) if prev != nil { - newObj.setBalance(prev.data.Balance) // this read + newObj.setBalance(prev.data.Balance) } if s.parallel.isSlotDB { - s.parallel.balanceReadsInSlot[addr] = struct{}{} + s.parallel.balanceReadsInSlot[addr] = struct{}{} // read the balance of previous object s.parallel.dirtiedStateObjectsInSlot[addr] = newObj } } @@ -1366,10 +1326,6 @@ func (s *StateDB) ForEachStorage(addr common.Address, cb func(key, value common. // Snapshots of the copied state cannot be applied to the copy. func (s *StateDB) Copy() *StateDB { // Copy all the basic fields, initialize the memory ones - parallel := ParallelState{ - isSlotDB: false, - } - state := &StateDB{ db: s.db, trie: s.db.CopyTrie(s.trie), @@ -1382,7 +1338,7 @@ func (s *StateDB) Copy() *StateDB { preimages: make(map[common.Hash][]byte, len(s.preimages)), journal: newJournal(), hasher: crypto.NewKeccakState(), - parallel: parallel, + parallel: ParallelState{}, } // Copy the dirty states, logs, and preimages for addr := range s.journal.dirties { @@ -1394,7 +1350,7 @@ func (s *StateDB) Copy() *StateDB { // Even though the original object is dirty, we are not copying the journal, // so we need to make sure that anyside effect the journal would have caused // during a commit (or similar op) is already applied to the copy. - state.storeStateObjectToStateDB(addr, object.deepCopy(state)) + state.storeStateObj(addr, object.deepCopy(state)) state.stateObjectsDirty[addr] = struct{}{} // Mark the copy dirty to force internal (code/state) commits state.stateObjectsPending[addr] = struct{}{} // Mark the copy pending to force external (account) commits @@ -1406,14 +1362,14 @@ func (s *StateDB) Copy() *StateDB { for addr := range s.stateObjectsPending { if _, exist := state.getStateObjectFromStateObjects(addr); !exist { object, _ := s.getStateObjectFromStateObjects(addr) - state.storeStateObjectToStateDB(addr, object.deepCopy(state)) + state.storeStateObj(addr, object.deepCopy(state)) } state.stateObjectsPending[addr] = struct{}{} } for addr := range s.stateObjectsDirty { if _, exist := state.getStateObjectFromStateObjects(addr); !exist { object, _ := s.getStateObjectFromStateObjects(addr) - state.storeStateObjectToStateDB(addr, object.deepCopy(state)) + state.storeStateObj(addr, object.deepCopy(state)) } state.stateObjectsDirty[addr] = struct{}{} } @@ -1471,32 +1427,32 @@ func (s *StateDB) Copy() *StateDB { return state } +// Copy all the basic fields, initialize the memory ones func (s *StateDB) CopyForSlot() *StateDB { - // Copy all the basic fields, initialize the memory ones parallel := ParallelState{ - // Share base slot db's stateObjects + // use base(dispatcher) slot db's stateObjects. // It is a SyncMap, only readable to slot, not writable stateObjects: s.parallel.stateObjects, - stateObjectsSuicidedInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), + stateObjectsSuicidedInSlot: make(map[common.Address]struct{}, 10), codeReadsInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), - codeChangesInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), + codeChangesInSlot: make(map[common.Address]struct{}, 10), stateChangesInSlot: make(map[common.Address]StateKeys, defaultNumOfSlots), stateReadsInSlot: make(map[common.Address]StateKeys, defaultNumOfSlots), balanceChangesInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), balanceReadsInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), addrStateReadsInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), - addrStateChangesInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), - nonceChangesInSlot: make(map[common.Address]struct{}, defaultNumOfSlots), + addrStateChangesInSlot: make(map[common.Address]struct{}, 10), + nonceChangesInSlot: make(map[common.Address]struct{}, 10), isSlotDB: true, dirtiedStateObjectsInSlot: make(map[common.Address]*StateObject, defaultNumOfSlots), } state := &StateDB{ db: s.db, trie: s.db.CopyTrie(s.trie), - stateObjects: make(map[common.Address]*StateObject, defaultNumOfSlots), + stateObjects: make(map[common.Address]*StateObject), // replaced by parallel.stateObjects in parallel mode stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots), stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots), - refund: s.refund, + refund: s.refund, // should be 0 logs: make(map[common.Hash][]*types.Log, defaultNumOfSlots), logSize: 0, preimages: make(map[common.Hash][]byte, len(s.preimages)), @@ -1525,6 +1481,7 @@ func (s *StateDB) CopyForSlot() *StateDB { for k, v := range s.snapDestructs { state.snapDestructs[k] = v } + // state.snapAccounts = make(map[common.Address][]byte) for k, v := range s.snapAccounts { state.snapAccounts[k] = v diff --git a/core/state_processor.go b/core/state_processor.go index 14cf549f65..38fe88ef99 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -24,7 +24,6 @@ import ( "math/rand" "runtime" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -50,9 +49,6 @@ const ( farDiffLayerTimeout = 2 ) -var MaxPendingQueueSize = 20 // parallel slot's maximum number of pending Txs -var ParallelExecNum = runtime.NumCPU() - 1 // leave a CPU to dispatcher - // StateProcessor is a basic Processor, which takes care of transitioning // state from one point to another. // @@ -61,17 +57,8 @@ type StateProcessor struct { config *params.ChainConfig // Chain configuration options bc *BlockChain // Canonical block chain engine consensus.Engine // Consensus engine used for block rewards - - // add for parallel execute - paraInitialized int32 - paraTxResultChan chan *ParallelTxResult // to notify dispatcher that a tx is done - slotState []*SlotState // idle, or pending messages - mergedTxIndex int // the latest finalized tx index - debugErrorRedoNum int - debugConflictRedoNum int } -// NewStateProcessor initialises a new StateProcessor. func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor { return &StateProcessor{ config: config, @@ -80,6 +67,28 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen } } +// add for parallel executions +type ParallelStateProcessor struct { + StateProcessor + parallelNum int // leave a CPU to dispatcher + queueSize int // parallel slot's maximum number of pending Txs + txResultChan chan *ParallelTxResult // to notify dispatcher that a tx is done + slotState []*SlotState // idle, or pending messages + mergedTxIndex int // the latest finalized tx index + debugErrorRedoNum int + debugConflictRedoNum int +} + +func NewParallelStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine, parallelNum int, queueSize int) *ParallelStateProcessor { + processor := &ParallelStateProcessor{ + StateProcessor: *NewStateProcessor(config, bc, engine), + parallelNum: parallelNum, + queueSize: queueSize, + } + processor.init() + return processor +} + type LightStateProcessor struct { check int64 StateProcessor @@ -389,65 +398,55 @@ type SlotState struct { pendingTxReqList []*ParallelTxRequest // maintained by dispatcher for dispatch policy mergedChangeList []state.SlotChangeList slotdbChan chan *state.StateDB // dispatch will create and send this slotDB to slot - // conflict check uses conflict window - // conflict check will check all state changes from (cfWindowStart + 1) to the previous Tx } type ParallelTxResult struct { - redo bool // for redo, dispatch will wait new tx result - updateSlotDB bool // for redo and pending tx quest, slot needs new slotDB, - keepSystem bool // for redo, should keep system address's balance - txIndex int + updateSlotDB bool // for redo and pending tx quest, slot needs new slotDB, + keepSystem bool // for redo, should keep system address's balance slotIndex int // slot index err error // to describe error message? - tx *types.Transaction txReq *ParallelTxRequest receipt *types.Receipt - slotDB *state.StateDB + slotDB *state.StateDB // if updated, it is not equal to txReq.slotDB } type ParallelTxRequest struct { - txIndex int - tx *types.Transaction - slotDB *state.StateDB - gp *GasPool - msg types.Message - block *types.Block - vmConfig vm.Config - bloomProcessors *AsyncReceiptBloomGenerator - usedGas *uint64 - waitTxChan chan int // "int" represents the tx index - curTxChan chan int // "int" represents the tx index + txIndex int + tx *types.Transaction + slotDB *state.StateDB + gasLimit uint64 + msg types.Message + block *types.Block + vmConfig vm.Config + bloomProcessor *AsyncReceiptBloomGenerator + usedGas *uint64 + waitTxChan chan struct{} + curTxChan chan struct{} } -func (p *StateProcessor) InitParallelOnce() { - // to create and start the execution slot goroutines - if !atomic.CompareAndSwapInt32(&p.paraInitialized, 0, 1) { // not swapped means already initialized. - return - } - log.Info("Parallel execution mode is used and initialized", "Parallel Num", ParallelExecNum) - p.paraTxResultChan = make(chan *ParallelTxResult, ParallelExecNum) // fixme: use blocked chan? - p.slotState = make([]*SlotState, ParallelExecNum) - - wg := sync.WaitGroup{} // make sure all goroutines are created and started - for i := 0; i < ParallelExecNum; i++ { - p.slotState[i] = new(SlotState) - p.slotState[i].slotdbChan = make(chan *state.StateDB, 1) - p.slotState[i].pendingTxReqChan = make(chan *ParallelTxRequest, MaxPendingQueueSize) - - wg.Add(1) +// to create and start the execution slot goroutines +func (p *ParallelStateProcessor) init() { + log.Info("Parallel execution mode is enabled", "Parallel Num", p.parallelNum, + "CPUNum", runtime.NumCPU(), + "QueueSize", p.queueSize) + p.txResultChan = make(chan *ParallelTxResult, p.parallelNum) + p.slotState = make([]*SlotState, p.parallelNum) + + for i := 0; i < p.parallelNum; i++ { + p.slotState[i] = &SlotState{ + slotdbChan: make(chan *state.StateDB, 1), + pendingTxReqChan: make(chan *ParallelTxRequest, p.queueSize), + } // start the slot's goroutine go func(slotIndex int) { - wg.Done() p.runSlotLoop(slotIndex) // this loop will be permanent live - log.Error("runSlotLoop exit!", "Slot", slotIndex) }(i) } - wg.Wait() } -// if any state in readDb is updated in changeList, then it has state conflict -func (p *StateProcessor) hasStateConflict(readDb *state.StateDB, changeList state.SlotChangeList) bool { +// conflict check uses conflict window, it will check all state changes from (cfWindowStart + 1) +// to the previous Tx, if any state in readDb is updated in changeList, then it is conflicted +func (p *ParallelStateProcessor) hasStateConflict(readDb *state.StateDB, changeList state.SlotChangeList) bool { // check KV change reads := readDb.StateReadsInSlot() writes := changeList.StateChangeSet @@ -525,7 +524,7 @@ func (p *StateProcessor) hasStateConflict(readDb *state.StateDB, changeList stat // for parallel execute, we put contracts of same address in a slot, // since these txs probably would have conflicts -func (p *StateProcessor) queueSameToAddress(txReq *ParallelTxRequest) bool { +func (p *ParallelStateProcessor) queueSameToAddress(txReq *ParallelTxRequest) bool { txToAddr := txReq.tx.To() // To() == nil means contract creation, no same To address if txToAddr == nil { @@ -560,7 +559,7 @@ func (p *StateProcessor) queueSameToAddress(txReq *ParallelTxRequest) bool { // for parallel execute, we put contracts of same address in a slot, // since these txs probably would have conflicts -func (p *StateProcessor) queueSameFromAddress(txReq *ParallelTxRequest) bool { +func (p *ParallelStateProcessor) queueSameFromAddress(txReq *ParallelTxRequest) bool { txFromAddr := txReq.msg.From() for i, slot := range p.slotState { if slot.tailTxReq == nil { // this slot is idle @@ -586,7 +585,7 @@ func (p *StateProcessor) queueSameFromAddress(txReq *ParallelTxRequest) bool { } // if there is idle slot, dispatch the msg to the first idle slot -func (p *StateProcessor) dispatchToIdleSlot(statedb *state.StateDB, txReq *ParallelTxRequest) bool { +func (p *ParallelStateProcessor) dispatchToIdleSlot(statedb *state.StateDB, txReq *ParallelTxRequest) bool { for i, slot := range p.slotState { if slot.tailTxReq == nil { if len(slot.mergedChangeList) == 0 { @@ -604,14 +603,14 @@ func (p *StateProcessor) dispatchToIdleSlot(statedb *state.StateDB, txReq *Paral } // wait until the next Tx is executed and its result is merged to the main stateDB -func (p *StateProcessor) waitUntilNextTxDone(statedb *state.StateDB) *ParallelTxResult { +func (p *ParallelStateProcessor) waitUntilNextTxDone(statedb *state.StateDB, gp *GasPool) *ParallelTxResult { var result *ParallelTxResult for { - result = <-p.paraTxResultChan - // slot may request new slotDB, if it think its slotDB is outdated + result = <-p.txResultChan + // slot may request new slotDB, if slotDB is outdated // such as: // tx in pending tx request, previous tx in same queue is likely "damaged" the slotDB - // tx redo for confict + // tx redo for conflict // tx stage 1 failed, nonce out of order... if result.updateSlotDB { // the target slot is waiting for new slotDB @@ -620,15 +619,17 @@ func (p *StateProcessor) waitUntilNextTxDone(statedb *state.StateDB) *ParallelTx slotState.slotdbChan <- slotDB continue } - if result.redo { - // wait result of redo - continue - } // ok, the tx result is valid and can be merged break } + + if err := gp.SubGas(result.receipt.GasUsed); err != nil { + log.Error("gas limit reached", "block", result.txReq.block.Number(), + "txIndex", result.txReq.txIndex, "GasUsed", result.receipt.GasUsed, "gp.Gas", gp.Gas()) + } + resultSlotIndex := result.slotIndex - resultTxIndex := result.txIndex + resultTxIndex := result.txReq.txIndex resultSlotState := p.slotState[resultSlotIndex] resultSlotState.pendingTxReqList = resultSlotState.pendingTxReqList[1:] if resultSlotState.tailTxReq.txIndex == resultTxIndex { @@ -643,29 +644,28 @@ func (p *StateProcessor) waitUntilNextTxDone(statedb *state.StateDB) *ParallelTx resultSlotState.mergedChangeList = append(resultSlotState.mergedChangeList, changeList) if resultTxIndex != p.mergedTxIndex+1 { - log.Warn("ProcessParallel tx result out of order", "resultTxIndex", resultTxIndex, + log.Error("ProcessParallel tx result out of order", "resultTxIndex", resultTxIndex, "p.mergedTxIndex", p.mergedTxIndex) - panic("ProcessParallel tx result out of order") } p.mergedTxIndex = resultTxIndex // notify the following Tx, it is merged, - // fixme: what if no wait or next tx is in same slot? - result.txReq.curTxChan <- resultTxIndex + // todo(optimize): if next tx is in same slot, it do not need to wait; save this channel cost. + close(result.txReq.curTxChan) return result } -func (p *StateProcessor) execInParallelSlot(slotIndex int, txReq *ParallelTxRequest) *ParallelTxResult { +func (p *ParallelStateProcessor) execInSlot(slotIndex int, txReq *ParallelTxRequest) *ParallelTxResult { txIndex := txReq.txIndex tx := txReq.tx slotDB := txReq.slotDB - gp := txReq.gp // goroutine unsafe + slotGasLimit := txReq.gasLimit // not accurate, but it is ok for block import. msg := txReq.msg block := txReq.block header := block.Header() cfg := txReq.vmConfig - bloomProcessors := txReq.bloomProcessors + bloomProcessor := txReq.bloomProcessor - blockContext := NewEVMBlockContext(header, p.bc, nil) // fixme: share blockContext within a block? + blockContext := NewEVMBlockContext(header, p.bc, nil) // can share blockContext within a block for efficiency vmenv := vm.NewEVM(blockContext, vm.TxContext{}, slotDB, p.config, cfg) var receipt *types.Receipt @@ -676,7 +676,6 @@ func (p *StateProcessor) execInParallelSlot(slotIndex int, txReq *ParallelTxRequ slotDB.Prepare(tx.Hash(), block.Hash(), txIndex) log.Debug("exec In Slot", "Slot", slotIndex, "txIndex", txIndex, "slotDB.baseTxIndex", slotDB.BaseTxIndex()) - slotGasLimit := gp.Gas() gpSlot := new(GasPool).AddGas(slotGasLimit) // each slot would use its own gas pool, and will do gaslimit check later evm, result, err = applyTransactionStageExecution(msg, gpSlot, slotDB, vmenv) log.Debug("Stage Execution done", "Slot", slotIndex, "txIndex", txIndex, "slotDB.baseTxIndex", slotDB.BaseTxIndex()) @@ -684,58 +683,43 @@ func (p *StateProcessor) execInParallelSlot(slotIndex int, txReq *ParallelTxRequ // wait until the previous tx is finalized. if txReq.waitTxChan != nil { log.Debug("Stage wait previous Tx done", "Slot", slotIndex, "txIndex", txIndex) - waitTxIndex := <-txReq.waitTxChan - if waitTxIndex != txIndex-1 { - log.Error("Stage wait tx index mismatch", "expect", txIndex-1, "actual", waitTxIndex) - panic(fmt.Sprintf("wait tx index mismatch expect:%d, actual:%d", txIndex-1, waitTxIndex)) - } + <-txReq.waitTxChan // close the channel } - // in parallel, tx can run into trouble - // for example: err="nonce too high" - // in this case, we will do re-run. + // in parallel mode, tx can run into trouble, for example: err="nonce too high" + // in these cases, we will wait and re-run. if err != nil { p.debugErrorRedoNum++ log.Debug("Stage Execution err", "Slot", slotIndex, "txIndex", txIndex, "current slotDB.baseTxIndex", slotDB.BaseTxIndex(), "err", err) redoResult := &ParallelTxResult{ - redo: true, updateSlotDB: true, - txIndex: txIndex, slotIndex: slotIndex, - tx: tx, txReq: txReq, receipt: receipt, err: err, } - p.paraTxResultChan <- redoResult + p.txResultChan <- redoResult slotDB = <-p.slotState[slotIndex].slotdbChan slotDB.Prepare(tx.Hash(), block.Hash(), txIndex) - // vmenv.Reset(vm.TxContext{}, slotDB) log.Debug("Stage Execution get new slotdb to redo", "Slot", slotIndex, "txIndex", txIndex, "new slotDB.baseTxIndex", slotDB.BaseTxIndex()) - slotGasLimit = gp.Gas() gpSlot = new(GasPool).AddGas(slotGasLimit) evm, result, err = applyTransactionStageExecution(msg, gpSlot, slotDB, vmenv) if err != nil { - panic(fmt.Sprintf("Stage Execution redo, error %v", err)) + log.Error("Stage Execution redo, error", err) } } - // fixme: - // parallel mode can not precheck, - // precheck should be replace by postCheck when previous Tx is finalized - // do conflict detect hasConflict := false systemAddrConflict := false - log.Debug("Stage Execution done, do conflict check", "Slot", slotIndex, "txIndex", txIndex) if slotDB.SystemAddressRedo() { hasConflict = true systemAddrConflict = true } else { - for index := 0; index < ParallelExecNum; index++ { + for index := 0; index < p.parallelNum; index++ { if index == slotIndex { continue } @@ -743,7 +727,6 @@ func (p *StateProcessor) execInParallelSlot(slotIndex int, txReq *ParallelTxRequ // check all finalizedDb from current slot's for _, changeList := range p.slotState[index].mergedChangeList { if changeList.TxIndex <= slotDB.BaseTxIndex() { - // log.Debug("skip finalized DB which is out of the conflict window", "finDb.txIndex", finDb.txIndex, "slotDB.baseTxIndex", slotDB.baseTxIndex) continue } if p.hasStateConflict(slotDB, changeList) { @@ -764,25 +747,20 @@ func (p *StateProcessor) execInParallelSlot(slotIndex int, txReq *ParallelTxRequ p.debugConflictRedoNum++ // re-run should not have conflict, since it has the latest world state. redoResult := &ParallelTxResult{ - redo: true, updateSlotDB: true, keepSystem: systemAddrConflict, - txIndex: txIndex, slotIndex: slotIndex, - tx: tx, txReq: txReq, receipt: receipt, err: err, } - p.paraTxResultChan <- redoResult + p.txResultChan <- redoResult slotDB = <-p.slotState[slotIndex].slotdbChan slotDB.Prepare(tx.Hash(), block.Hash(), txIndex) - // vmenv.Reset(vm.TxContext{}, slotDB) - slotGasLimit = gp.Gas() gpSlot = new(GasPool).AddGas(slotGasLimit) evm, result, err = applyTransactionStageExecution(msg, gpSlot, slotDB, vmenv) if err != nil { - panic(fmt.Sprintf("Stage Execution conflict redo, error %v", err)) + log.Error("Stage Execution conflict redo, error", err) } } @@ -791,18 +769,12 @@ func (p *StateProcessor) execInParallelSlot(slotIndex int, txReq *ParallelTxRequ if gasConsumed != result.UsedGas { log.Error("gasConsumed != result.UsedGas mismatch", "gasConsumed", gasConsumed, "result.UsedGas", result.UsedGas) - panic(fmt.Sprintf("gas consume mismatch, consumed:%d, result.UsedGas:%d", gasConsumed, result.UsedGas)) - } - - if err := gp.SubGas(gasConsumed); err != nil { - log.Error("gas limit reached", "gasConsumed", gasConsumed, "gp", gp.Gas()) - panic(fmt.Sprintf("gas limit reached, gasConsumed:%d, gp.Gas():%d", gasConsumed, gp.Gas())) } log.Debug("ok to finalize this TX", "Slot", slotIndex, "txIndex", txIndex, "result.UsedGas", result.UsedGas, "txReq.usedGas", *txReq.usedGas) // ok, time to do finalize, stage2 should not be parallel - receipt, err = applyTransactionStageFinalization(evm, result, msg, p.config, slotDB, header, tx, txReq.usedGas, bloomProcessors) + receipt, err = applyTransactionStageFinalization(evm, result, msg, p.config, slotDB, header, tx, txReq.usedGas, bloomProcessor) if result.Failed() { // if Tx is reverted, all its state change will be discarded @@ -811,11 +783,8 @@ func (p *StateProcessor) execInParallelSlot(slotIndex int, txReq *ParallelTxRequ } return &ParallelTxResult{ - redo: false, updateSlotDB: false, - txIndex: txIndex, slotIndex: slotIndex, - tx: tx, txReq: txReq, receipt: receipt, slotDB: slotDB, @@ -823,10 +792,9 @@ func (p *StateProcessor) execInParallelSlot(slotIndex int, txReq *ParallelTxRequ } } -func (p *StateProcessor) runSlotLoop(slotIndex int) { +func (p *ParallelStateProcessor) runSlotLoop(slotIndex int) { curSlot := p.slotState[slotIndex] for { - // log.Info("parallel slot waiting", "Slot", slotIndex) // wait for new TxReq txReq := <-curSlot.pendingTxReqChan // receive a dispatched message @@ -839,22 +807,21 @@ func (p *StateProcessor) runSlotLoop(slotIndex int) { // it is better to create a new SlotDB, since COW is used. if txReq.slotDB == nil { result := &ParallelTxResult{ - redo: false, updateSlotDB: true, slotIndex: slotIndex, err: nil, } - p.paraTxResultChan <- result + p.txResultChan <- result txReq.slotDB = <-curSlot.slotdbChan } - result := p.execInParallelSlot(slotIndex, txReq) + result := p.execInSlot(slotIndex, txReq) log.Debug("SlotLoop the TxReq is done", "Slot", slotIndex, "err", result.err) - p.paraTxResultChan <- result + p.txResultChan <- result } } // clear slot state for each block. -func (p *StateProcessor) resetParallelState(txNum int, statedb *state.StateDB) { +func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) { if txNum == 0 { return } @@ -871,6 +838,114 @@ func (p *StateProcessor) resetParallelState(txNum int, statedb *state.StateDB) { } } +// Implement BEP-130: Parallel Transaction Execution. +func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) { + var ( + usedGas = new(uint64) + header = block.Header() + gp = new(GasPool).AddGas(block.GasLimit()) + ) + var receipts = make([]*types.Receipt, 0) + txNum := len(block.Transactions()) + p.resetState(txNum, statedb) + + // Iterate over and process the individual transactions + posa, isPoSA := p.engine.(consensus.PoSA) + commonTxs := make([]*types.Transaction, 0, txNum) + // usually do have two tx, one for validator set contract, another for system reward contract. + systemTxs := make([]*types.Transaction, 0, 2) + + signer, _, bloomProcessor := p.preExecute(block, statedb, cfg, true) + var waitTxChan, curTxChan chan struct{} + for i, tx := range block.Transactions() { + if isPoSA { + if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil { + return statedb, nil, nil, 0, err + } else if isSystemTx { + systemTxs = append(systemTxs, tx) + continue + } + } + + // can be moved it into slot for efficiency, but signer is not concurrent safe + msg, err := tx.AsMessage(signer) + if err != nil { + return statedb, nil, nil, 0, err + } + + // parallel start, wrap an exec message, which will be dispatched to a slot + waitTxChan = curTxChan // can be nil, if this is the tx of first batch, otherwise, it is previous Tx's wait channel + curTxChan = make(chan struct{}, 1) + + txReq := &ParallelTxRequest{ + txIndex: i, + tx: tx, + slotDB: nil, + gasLimit: gp.Gas(), + msg: msg, + block: block, + vmConfig: cfg, + bloomProcessor: bloomProcessor, + usedGas: usedGas, + waitTxChan: waitTxChan, + curTxChan: curTxChan, + } + + // to optimize the for { for {} } loop code style? it is ok right now. + for { + if p.queueSameFromAddress(txReq) { + break + } + + if p.queueSameToAddress(txReq) { + break + } + // if idle slot available, just dispatch and process next tx. + if p.dispatchToIdleSlot(statedb, txReq) { + break + } + log.Debug("ProcessParallel no slot available, wait", "txIndex", txReq.txIndex) + // no idle slot, wait until a tx is executed and merged. + result := p.waitUntilNextTxDone(statedb, gp) + + // update tx result + if result.err != nil { + log.Warn("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex, + "resultTxIndex", result.txReq.txIndex, "result.err", result.err) + return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", result.txReq.txIndex, result.txReq.tx.Hash().Hex(), result.err) + } + + commonTxs = append(commonTxs, result.txReq.tx) + receipts = append(receipts, result.receipt) + } + } + + // wait until all tx request are done + for len(commonTxs)+len(systemTxs) < txNum { + result := p.waitUntilNextTxDone(statedb, gp) + // update tx result + if result.err != nil { + log.Warn("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex, + "resultTxIndex", result.txReq.txIndex, "result.err", result.err) + return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", result.txReq.txIndex, result.txReq.tx.Hash().Hex(), result.err) + } + commonTxs = append(commonTxs, result.txReq.tx) + receipts = append(receipts, result.receipt) + } + + // len(commonTxs) could be 0, such as: https://bscscan.com/block/14580486 + if len(commonTxs) > 0 { + log.Info("ProcessParallel tx all done", "block", header.Number, "usedGas", *usedGas, + "txNum", txNum, + "len(commonTxs)", len(commonTxs), + "errorNum", p.debugErrorRedoNum, + "conflictNum", p.debugConflictRedoNum, + "redoRate(%)", 100*(p.debugErrorRedoNum+p.debugConflictRedoNum)/len(commonTxs)) + } + allLogs, err := p.postExecute(block, statedb, &commonTxs, &receipts, &systemTxs, usedGas, bloomProcessor) + return statedb, receipts, allLogs, *usedGas, err +} + // Before transactions are executed, do shared preparation for Process() & ProcessParallel() func (p *StateProcessor) preExecute(block *types.Block, statedb *state.StateDB, cfg vm.Config, parallel bool) (types.Signer, *vm.EVM, *AsyncReceiptBloomGenerator) { signer := types.MakeSigner(p.bc.chainConfig, block.Number()) @@ -882,25 +957,25 @@ func (p *StateProcessor) preExecute(block *types.Block, statedb *state.StateDB, // Handle upgrade build-in system contract code systemcontracts.UpgradeBuildInSystemContract(p.config, block.Number(), statedb) - blockContext := NewEVMBlockContext(block.Header(), p.bc, nil) // with parallel mode, vmenv will be created inside of slot var vmenv *vm.EVM if !parallel { + blockContext := NewEVMBlockContext(block.Header(), p.bc, nil) vmenv = vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) } // initialise bloom processors - bloomProcessors := NewAsyncReceiptBloomGenerator(len(block.Transactions())) + bloomProcessor := NewAsyncReceiptBloomGenerator(len(block.Transactions())) statedb.MarkFullProcessed() - return signer, vmenv, bloomProcessors + return signer, vmenv, bloomProcessor } func (p *StateProcessor) postExecute(block *types.Block, statedb *state.StateDB, commonTxs *[]*types.Transaction, - receipts *[]*types.Receipt, systemTxs *[]*types.Transaction, usedGas *uint64, bloomProcessors *AsyncReceiptBloomGenerator) ([]*types.Log, error) { - var allLogs []*types.Log + receipts *[]*types.Receipt, systemTxs *[]*types.Transaction, usedGas *uint64, bloomProcessor *AsyncReceiptBloomGenerator) ([]*types.Log, error) { + allLogs := make([]*types.Log, 0, len(*receipts)) - bloomProcessors.Close() + bloomProcessor.Close() // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) err := p.engine.Finalize(p.bc, block.Header(), statedb, commonTxs, block.Uncles(), receipts, systemTxs, usedGas) @@ -934,7 +1009,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) - signer, vmenv, bloomProcessors := p.preExecute(block, statedb, cfg, false) + signer, vmenv, bloomProcessor := p.preExecute(block, statedb, cfg, false) for i, tx := range block.Transactions() { if isPoSA { if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil { @@ -950,7 +1025,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg return statedb, nil, nil, 0, err } statedb.Prepare(tx.Hash(), block.Hash(), i) - receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv, bloomProcessors) + receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv, bloomProcessor) if err != nil { return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } @@ -959,112 +1034,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg receipts = append(receipts, receipt) } - allLogs, err := p.postExecute(block, statedb, &commonTxs, &receipts, &systemTxs, usedGas, bloomProcessors) - return statedb, receipts, allLogs, *usedGas, err -} - -func (p *StateProcessor) ProcessParallel(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) { - var ( - usedGas = new(uint64) - header = block.Header() - gp = new(GasPool).AddGas(block.GasLimit()) - ) - var receipts = make([]*types.Receipt, 0) - txNum := len(block.Transactions()) - p.resetParallelState(txNum, statedb) - - // Iterate over and process the individual transactions - posa, isPoSA := p.engine.(consensus.PoSA) - commonTxs := make([]*types.Transaction, 0, txNum) - // usually do have two tx, one for validator set contract, another for system reward contract. - systemTxs := make([]*types.Transaction, 0, 2) - - signer, _, bloomProcessors := p.preExecute(block, statedb, cfg, true) - var waitTxChan, curTxChan chan int - for i, tx := range block.Transactions() { - if isPoSA { - if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil { - return statedb, nil, nil, 0, err - } else if isSystemTx { - systemTxs = append(systemTxs, tx) - continue - } - } - - msg, err := tx.AsMessage(signer) // fixme: move it into slot. - if err != nil { - return statedb, nil, nil, 0, err - } - - // parallel start, wrap an exec message, which will be dispatched to a slot - waitTxChan = curTxChan // can be nil, if this is the tx of first batch, otherwise, it is previous Tx's wait channel - curTxChan = make(chan int, 1) - - txReq := &ParallelTxRequest{ - txIndex: i, - tx: tx, - slotDB: nil, - gp: gp, - msg: msg, - block: block, - vmConfig: cfg, - bloomProcessors: bloomProcessors, - usedGas: usedGas, - waitTxChan: waitTxChan, - curTxChan: curTxChan, - } - - // fixme: to optimize the for { for {} } loop code style - for { - if p.queueSameToAddress(txReq) { - break - } - if p.queueSameFromAddress(txReq) { - break - } - // if idle slot available, just dispatch and process next tx. - if p.dispatchToIdleSlot(statedb, txReq) { - // log.Info("ProcessParallel dispatch to idle slot", "txIndex", txReq.txIndex) - break - } - log.Debug("ProcessParallel no slot available, wait", "txIndex", txReq.txIndex) - // no idle slot, wait until a tx is executed and merged. - result := p.waitUntilNextTxDone(statedb) - - // update tx result - if result.err != nil { - log.Warn("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex, - "resultTxIndex", result.txIndex, "result.err", result.err) - return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", result.txIndex, result.tx.Hash().Hex(), result.err) - } - commonTxs = append(commonTxs, result.tx) - receipts = append(receipts, result.receipt) - } - } - - // wait until all tx request are done - for len(commonTxs)+len(systemTxs) < txNum { - result := p.waitUntilNextTxDone(statedb) - // update tx result - if result.err != nil { - log.Warn("ProcessParallel a failed tx", "resultSlotIndex", result.slotIndex, - "resultTxIndex", result.txIndex, "result.err", result.err) - return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", result.txIndex, result.tx.Hash().Hex(), result.err) - } - commonTxs = append(commonTxs, result.tx) - receipts = append(receipts, result.receipt) - } - - // len(commonTxs) could be 0, such as: https://bscscan.com/block/14580486 - if len(commonTxs) > 0 { - log.Info("ProcessParallel tx all done", "block", header.Number, "usedGas", *usedGas, - "txNum", txNum, - "len(commonTxs)", len(commonTxs), - "errorNum", p.debugErrorRedoNum, - "conflictNum", p.debugConflictRedoNum, - "redoRate(%)", 100*(p.debugErrorRedoNum+p.debugConflictRedoNum)/len(commonTxs)) - } - allLogs, err := p.postExecute(block, statedb, &commonTxs, &receipts, &systemTxs, usedGas, bloomProcessors) + allLogs, err := p.postExecute(block, statedb, &commonTxs, &receipts, &systemTxs, usedGas, bloomProcessor) return statedb, receipts, allLogs, *usedGas, err } diff --git a/core/types.go b/core/types.go index 0038ce916b..5ed4817e68 100644 --- a/core/types.go +++ b/core/types.go @@ -48,8 +48,4 @@ type Processor interface { // the transaction messages using the statedb and applying any rewards to both // the processor (coinbase) and any included uncles. Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) - - // Implement BEP-130: Parallel Transaction Execution. - InitParallelOnce() - ProcessParallel(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) } diff --git a/eth/backend.go b/eth/backend.go index ab93006437..8551b57eaa 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -202,6 +202,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { bcOps := make([]core.BlockChainOption, 0) if config.DiffSync { bcOps = append(bcOps, core.EnableLightProcessor) + } else if config.ParallelTxMode { + bcOps = append(bcOps, core.EnableParallelProcessor(config.ParallelTxNum, config.ParallelTxQueueSize)) } if config.PipeCommit { bcOps = append(bcOps, core.EnablePipelineCommit) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 09baad1e1c..94998acd6c 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -135,10 +135,13 @@ type Config struct { NoPruning bool // Whether to disable pruning and flush everything to disk DirectBroadcast bool - DisableSnapProtocol bool //Whether disable snap protocol + DisableSnapProtocol bool // Whether disable snap protocol DiffSync bool // Whether support diff sync PipeCommit bool RangeLimit bool + ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync + ParallelTxNum int // Number of slot for transaction execution + ParallelTxQueueSize int // Max number of Tx that can be queued to a slot TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.