Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine scheduling algorithm #7

Merged
merged 6 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 106 additions & 24 deletions ebp/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,48 @@ type TxRange struct {
end uint64
}

type rwList struct {
rList []uint64
wList []uint64
}

func newRWList() rwList {
return rwList{
rList: make([]uint64, 0, 32),
wList: make([]uint64, 0, 32),
}
}

func (rwl *rwList) reset() {
rwl.wList = rwl.wList[:0]
rwl.rList = rwl.rList[:0]
}

func (rwl *rwList) add(k uint64, isWrite bool) {
if isWrite {
rwl.wList = append(rwl.wList, k)
} else {
rwl.rList = append(rwl.rList, k)
}
}

func (rwl rwList) conflictsWith(touchedSet map[uint64]struct{}) bool {
for _, l := range [2][]uint64{rwl.rList, rwl.wList} {
for _, k := range l {
if _, ok := touchedSet[k]; ok {
return true
}
}
}
return false
}

func (rwl rwList) updateTouchedSet(touchedSet map[uint64]struct{}) {
for _, k := range rwl.wList {
touchedSet[k] = struct{}{}
}
}

type txEngine struct {
// How many parallel execution round are performed for each block
roundNum int //consensus parameter
Expand All @@ -54,6 +96,9 @@ type txEngine struct {
signer gethtypes.Signer
currentBlock *types.BlockInfo

rwListMap map[common.Hash]rwList
checkRWInLoading bool

cumulativeGasUsed uint64
cumulativeFeeRefund *uint256.Int
cumulativeGasFee *uint256.Int
Expand All @@ -62,6 +107,9 @@ type txEngine struct {
aotReloadInterval int64

logger log.Logger

// for ut
txExecutedCount int
}

func (exec *txEngine) Context() *types.Context {
Expand Down Expand Up @@ -176,6 +224,10 @@ func NewEbpTxExec(exeRoundCount, runnerNumber, parallelNum, defaultTxListCap int
}
}

func (exec *txEngine) SetCheckRWInLoading(b bool) {
exec.checkRWInLoading = b
}

func (exec *txEngine) SetAotParam(aotDir string, aotReloadInterval int64) {
exec.aotDir = aotDir
exec.aotReloadInterval = aotReloadInterval
Expand Down Expand Up @@ -450,6 +502,7 @@ func (exec *txEngine) Execute(currBlock *types.BlockInfo) {
exec.cumulativeFeeRefund = uint256.NewInt(0)
exec.cumulativeGasFee = uint256.NewInt(0)
exec.currentBlock = currBlock
exec.rwListMap = make(map[common.Hash]rwList, 1024)
startKey, endKey := exec.getStandbyQueueRange()
if startKey == endKey {
return
Expand All @@ -458,6 +511,7 @@ func (exec *txEngine) Execute(currBlock *types.BlockInfo) {
start: startKey,
end: endKey,
}
exec.txExecutedCount = 0
committableRunnerList := make([]*TxRunner, 0, 4096)
// Repeat exec.roundNum round for execute txs in standby q. At the end of each round
// modifications made by TXs are written to world state. So TXs in later rounds can
Expand All @@ -467,6 +521,10 @@ func (exec *txEngine) Execute(currBlock *types.BlockInfo) {
break
}
numTx := exec.executeOneRound(txRange, exec.currentBlock)
exec.txExecutedCount += numTx
if numTx == 0 && exec.checkRWInLoading {
break
}
for i := 0; i < numTx; i++ {
if Runners[i] == nil {
continue // the TX is not committable and needs re-execution
Expand Down Expand Up @@ -504,44 +562,58 @@ func (exec *txEngine) setStandbyQueueRange(start, end uint64) {

// Execute 'runnerNumber' transactions in parallel and commit the ones without any interdependency
func (exec *txEngine) executeOneRound(txRange *TxRange, currBlock *types.BlockInfo) int {
txBundle := exec.loadStandbyTxs(txRange)
kvCount := exec.runTxInParallel(txRange, txBundle, currBlock)
exec.checkTxDepsAndUptStandbyQ(txRange, txBundle, int(kvCount))
txBundle, ignoreList := exec.loadStandbyTxs(txRange)
if exec.checkRWInLoading && len(txBundle) == 0 {
return 0
}
kvCount := exec.runTxInParallel(txRange, txBundle, len(ignoreList), currBlock)
exec.checkTxDepsAndUptStandbyQ(txRange, txBundle, ignoreList, int(kvCount))
return len(txBundle)
}

// Load at most 'exec.runnerNumber' transactions from standby queue
func (exec *txEngine) loadStandbyTxs(txRange *TxRange) (txBundle []types.TxToRun) {
func (exec *txEngine) loadStandbyTxs(txRange *TxRange) (txBundle, ignoreList []types.TxToRun) {
touchedSet := make(map[uint64]struct{}, 4096)
ctx := exec.cleanCtx.WithRbtCopy()
end := txRange.end
if end > txRange.start+uint64(exec.runnerNumber) { // load at most exec.runnerNumber
end = txRange.start + uint64(exec.runnerNumber)
}
txBundle = make([]types.TxToRun, end-txRange.start)
for i := txRange.start; i < end; i++ {
txBundle = make([]types.TxToRun, 0, exec.runnerNumber)
ignoreList = make([]types.TxToRun, 0, 2*exec.runnerNumber)
for i := txRange.start; i < txRange.end && len(txBundle) < exec.runnerNumber && len(ignoreList) < 2*exec.runnerNumber; i++ {
k := types.GetStandbyTxKey(i)
bz := ctx.Rbt.GetBaseStore().Get(k)
txBundle[i-txRange.start].FromBytes(bz)
var txToRun types.TxToRun
txToRun.FromBytes(bz)
rwList, isRecorded := exec.rwListMap[txToRun.HashID]
hasConflicts := exec.checkRWInLoading && isRecorded && rwList.conflictsWith(touchedSet)
if hasConflicts {
ignoreList = append(ignoreList, txToRun)
} else {
rwList.updateTouchedSet(touchedSet)
txBundle = append(txBundle, txToRun)
}
}
ctx.Close(false)
return
}

// Assign the transactions to global 'Runners' and run them in parallel.
// Record the count of touched KV pairs and return it as a hint for checkTxDepsAndUptStandbyQ
func (exec *txEngine) runTxInParallel(txRange *TxRange, txBundle []types.TxToRun, currBlock *types.BlockInfo) (kvCount int64) {
func (exec *txEngine) runTxInParallel(txRange *TxRange, txBundle []types.TxToRun, ignoreLen int, currBlock *types.BlockInfo) (kvCount int64) {
sharedIdx := int64(-1)
trunk := exec.cleanCtx.Rbt.GetBaseStore()
dt.ParallelRun(exec.parallelNum, func(_ int) {
for {
myIdx := atomic.AddInt64(&sharedIdx, 1)
if myIdx >= int64(len(txBundle)) {
if myIdx >= int64(len(txBundle)+ignoreLen) {
return
}
Runners[myIdx] = NewTxRunner(exec.cleanCtx.WithRbtCopy(), &txBundle[myIdx])
k := types.GetStandbyTxKey(txRange.start + uint64(myIdx))
Runners[myIdx].Ctx.Rbt.GetBaseStore().PrepareForDeletion(k) // remove it from the standby queue
trunk.PrepareForDeletion(k) // remove it from the standby queue
k = types.GetStandbyTxKey(txRange.end + uint64(myIdx))
Runners[myIdx].Ctx.Rbt.GetBaseStore().PrepareForUpdate(k) //warm up
trunk.PrepareForUpdate(k) //warm up
if myIdx >= int64(len(txBundle)) {
continue
}
Runners[myIdx] = NewTxRunner(exec.cleanCtx.WithRbtCopy(), &txBundle[myIdx])
if myIdx > 0 && txBundle[myIdx-1].From == txBundle[myIdx].From {
// In reorderInfoList, we placed the tx with same 'From' back-to-back
// same from-address as previous transaction, cannot run in same round
Expand All @@ -562,7 +634,7 @@ type indexAndBool struct {

// Check interdependency of TXs using 'touchedSet'. The ones with dependency with former committed TXs cannot
// be committed and should be inserted back into the standby queue.
func (exec *txEngine) checkTxDepsAndUptStandbyQ(txRange *TxRange, txBundle []types.TxToRun, kvCount int) {
func (exec *txEngine) checkTxDepsAndUptStandbyQ(txRange *TxRange, txBundle, ignoreList []types.TxToRun, kvCount int) {
touchedSet := make(map[uint64]struct{}, kvCount)
var wg sync.WaitGroup
idxChan := make(chan indexAndBool, 10)
Expand All @@ -577,10 +649,12 @@ func (exec *txEngine) checkTxDepsAndUptStandbyQ(txRange *TxRange, txBundle []typ
}
wg.Done()
}()
rwList := newRWList()
for idx := range txBundle {
canCommit := true
Runners[idx].Ctx.Rbt.ScanAllShortKeys(func(key [rabbit.KeySize]byte, dirty bool) (stop bool) {
k := binary.LittleEndian.Uint64(key[:])
rwList.add(k, dirty)
if _, ok := touchedSet[k]; ok {
canCommit = false // cannot commit if conflicts with touched KV set
Runners[idx].Status = types.FAILED_TO_COMMIT
Expand All @@ -590,13 +664,13 @@ func (exec *txEngine) checkTxDepsAndUptStandbyQ(txRange *TxRange, txBundle []typ
}
})
if canCommit { // record the dirty KVs written by a committable TX into toucchedSet
Runners[idx].Ctx.Rbt.ScanAllShortKeys(func(key [rabbit.KeySize]byte, dirty bool) (stop bool) {
if dirty {
k := binary.LittleEndian.Uint64(key[:])
touchedSet[k] = struct{}{}
}
return false
})
rwList.updateTouchedSet(touchedSet)
}
if exec.checkRWInLoading {
exec.rwListMap[Runners[idx].Tx.HashID] = rwList
rwList = newRWList()
} else {
rwList.reset()
}
idxChan <- indexAndBool{idx, canCommit}
}
Expand All @@ -622,6 +696,14 @@ func (exec *txEngine) checkTxDepsAndUptStandbyQ(txRange *TxRange, txBundle []typ
Runners[idx] = nil
}
}
for _, tx := range ignoreList {
k := types.GetStandbyTxKey(txRange.start)
store.Delete(k)
txRange.start++
newK := types.GetStandbyTxKey(txRange.end)
txRange.end++
store.Set(newK, tx.ToBytes())
}
})
}

Expand Down
Loading
Loading