Skip to content

Commit

Permalink
refactor: cleanup scroll_worker.go (#823)
Browse files Browse the repository at this point in the history
  • Loading branch information
omerfirmak authored Jun 14, 2024
1 parent 3161380 commit 94c6fc0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 80 deletions.
87 changes: 9 additions & 78 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ const (

// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10

// chainSideChanSize is the size of channel listening to ChainSideEvent.
chainSideChanSize = 10

// minRecommitInterval is the minimal time interval to recreate the mining block with
// any newly arrived transactions.
minRecommitInterval = 1 * time.Second
)

var (
Expand All @@ -77,12 +70,6 @@ var (
commitGasCounter = metrics.NewRegisteredCounter("miner/commit_gas", nil)
)

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
type newWorkReq struct {
noempty bool
timestamp int64
}

// prioritizedTransaction represents a single transaction that
// should be processed as the first transaction in the next block.
type prioritizedTransaction struct {
Expand All @@ -108,13 +95,10 @@ type worker struct {
txsSub event.Subscription
chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription

// Channels
newWorkCh chan *newWorkReq
startCh chan struct{}
exitCh chan struct{}
startCh chan struct{}
exitCh chan struct{}

wg sync.WaitGroup

Expand All @@ -131,9 +115,8 @@ type worker struct {
snapshotState *state.StateDB

// atomic status counters
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.
newL1Msgs int32 // New arrival L1 message count since last sealing work submitting.
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.

// noempty is the flag used to control whether the feature of pre-seal empty
// block is enabled. The default value is false(pre-seal is enabled by default).
Expand Down Expand Up @@ -163,8 +146,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
isLocalBlock: isLocalBlock,
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
circuitCapacityChecker: circuitcapacitychecker.NewCircuitCapacityChecker(true),
Expand All @@ -176,24 +157,15 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus

// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)

// Sanitize recommit interval if the user-specified one is too short.
recommit := worker.config.Recommit
if recommit < minRecommitInterval {
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval
}

// Sanitize account fetch limit.
if worker.config.MaxAccountsNum == 0 {
log.Warn("Sanitizing miner account fetch limit", "provided", worker.config.MaxAccountsNum, "updated", math.MaxInt)
worker.config.MaxAccountsNum = math.MaxInt
}

worker.wg.Add(2)
worker.wg.Add(1)
go worker.mainLoop()
go worker.newWorkLoop(recommit)

// Submit first work to initialize pending state.
if init {
Expand Down Expand Up @@ -289,44 +261,11 @@ func (w *worker) close() {
w.wg.Wait()
}

// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
var (
timestamp int64 // timestamp for each round of mining.
)

// commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool) {
select {
case w.newWorkCh <- &newWorkReq{noempty: noempty, timestamp: timestamp}:
case <-w.exitCh:
return
}
atomic.StoreInt32(&w.newTxs, 0)
atomic.StoreInt32(&w.newL1Msgs, 0)
}

for {
select {
case <-w.startCh:
timestamp = time.Now().Unix()
commit(false)
case <-w.chainHeadCh:
timestamp = time.Now().Unix()
commit(true)
case <-w.exitCh:
return
}
}
}

// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
defer w.wg.Done()
defer w.txsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
defer w.chainSideSub.Unsubscribe()

deadCh := make(chan *pipeline.Result)
pipelineResultCh := func() <-chan *pipeline.Result {
Expand All @@ -338,8 +277,10 @@ func (w *worker) mainLoop() {

for {
select {
case req := <-w.newWorkCh:
w.startNewPipeline(req.timestamp)
case <-w.startCh:
w.startNewPipeline(time.Now().Unix())
case <-w.chainHeadCh:
w.startNewPipeline(time.Now().Unix())
case result := <-pipelineResultCh():
w.handlePipelineResult(result)
case ev := <-w.txsCh:
Expand Down Expand Up @@ -369,8 +310,6 @@ func (w *worker) mainLoop() {
return
case <-w.chainHeadSub.Err():
return
case <-w.chainSideSub.Err():
return
}
}
}
Expand Down Expand Up @@ -798,14 +737,6 @@ func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
return result
}

// postSideBlock fires a side chain event, only use it for testing.
func (w *worker) postSideBlock(event core.ChainSideEvent) {
select {
case w.chainSideCh <- event:
case <-w.exitCh:
}
}

func (w *worker) onTxFailingInPipeline(txIndex int, tx *types.Transaction, err error) bool {
if !w.isRunning() {
return false
Expand Down
2 changes: 0 additions & 2 deletions miner/scroll_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
for i := 0; i < 5; i++ {
b.txPool.AddLocal(b.newRandomTx(true))
b.txPool.AddLocal(b.newRandomTx(false))
w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})
w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})

select {
case ev := <-sub.Chan():
Expand Down

0 comments on commit 94c6fc0

Please sign in to comment.