Skip to content

Commit

Permalink
eth, miner: use miner for post-merge block production (ethereum#23256)
Browse files Browse the repository at this point in the history
* eth, miner: remove duplicated code

* eth/catalyst: remove unneeded code

* miner: keep update pending state even the Merge is happened

* eth, miner: rebase

* miner: fix tests

* eth, miner: address comments from marius

* miner: use empty zero randomness for pending blocks after the merge

* eth/catalyst: gofmt

* miner: add warning log for state recovery

* miner: ignore uncles for post-merge blocks

Co-authored-by: Péter Szilágyi <[email protected]>
  • Loading branch information
rjl493456442 and karalabe authored Jan 24, 2022
1 parent bd615e0 commit 78636ee
Show file tree
Hide file tree
Showing 8 changed files with 598 additions and 426 deletions.
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}

eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock, merger)
eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))

eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
Expand Down
252 changes: 65 additions & 187 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,14 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
chainParams "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
)
Expand Down Expand Up @@ -83,97 +77,28 @@ type ConsensusAPI struct {
light bool
eth *eth.Ethereum
les *les.LightEthereum
engine consensus.Engine // engine is the post-merge consensus engine, only for block creation
preparedBlocks *payloadQueue // preparedBlocks caches payloads (*ExecutableDataV1) by payload ID (PayloadID)
preparedBlocks *payloadQueue // preparedBlocks caches payloads (*ExecutableDataV1) by payload ID (PayloadID)
}

func NewConsensusAPI(eth *eth.Ethereum, les *les.LightEthereum) *ConsensusAPI {
var engine consensus.Engine
if eth == nil {
if les.BlockChain().Config().TerminalTotalDifficulty == nil {
panic("Catalyst started without valid total difficulty")
}
if b, ok := les.Engine().(*beacon.Beacon); ok {
engine = beacon.New(b.InnerEngine())
} else {
engine = beacon.New(les.Engine())
}
} else {
if eth.BlockChain().Config().TerminalTotalDifficulty == nil {
panic("Catalyst started without valid total difficulty")
}
if b, ok := eth.Engine().(*beacon.Beacon); ok {
engine = beacon.New(b.InnerEngine())
} else {
engine = beacon.New(eth.Engine())
}
}

return &ConsensusAPI{
light: eth == nil,
eth: eth,
les: les,
engine: engine,
preparedBlocks: newPayloadQueue(),
}
}

// blockExecutionEnv gathers all the data required to execute
// a block, either when assembling it or when inserting it.
type blockExecutionEnv struct {
chain *core.BlockChain
state *state.StateDB
tcount int
gasPool *core.GasPool

header *types.Header
txs []*types.Transaction
receipts []*types.Receipt
}

func (env *blockExecutionEnv) commitTransaction(tx *types.Transaction, coinbase common.Address) error {
vmConfig := *env.chain.GetVMConfig()
snap := env.state.Snapshot()
receipt, err := core.ApplyTransaction(env.chain.Config(), env.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vmConfig)
if err != nil {
env.state.RevertToSnapshot(snap)
return err
}
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
return nil
}

func (api *ConsensusAPI) makeEnv(parent *types.Block, header *types.Header) (*blockExecutionEnv, error) {
// The parent state might be missing. It can be the special scenario
// that consensus layer tries to build a new block based on the very
// old side chain block and the relevant state is already pruned. So
// try to retrieve the live state from the chain, if it's not existent,
// do the necessary recovery work.
var (
err error
state *state.StateDB
)
if api.eth.BlockChain().HasState(parent.Root()) {
state, err = api.eth.BlockChain().StateAt(parent.Root())
} else {
// The maximum acceptable reorg depth can be limited by the
// finalised block somehow. TODO(rjl493456442) fix the hard-
// coded number here later.
state, err = api.eth.StateAtBlock(parent, 1000, nil, false, false)
}
if err != nil {
return nil, err
}
env := &blockExecutionEnv{
chain: api.eth.BlockChain(),
state: state,
header: header,
gasPool: new(core.GasPool).AddGas(header.GasLimit),
}
return env, nil
}

func (api *ConsensusAPI) GetPayloadV1(payloadID PayloadID) (*ExecutableDataV1, error) {
log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
data := api.preparedBlocks.get(payloadID)
Expand All @@ -183,36 +108,51 @@ func (api *ConsensusAPI) GetPayloadV1(payloadID PayloadID) (*ExecutableDataV1, e
return data, nil
}

func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, PayloadAttributes *PayloadAttributesV1) (ForkChoiceResponse, error) {
func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, payloadAttributes *PayloadAttributesV1) (ForkChoiceResponse, error) {
log.Trace("Engine API request received", "method", "ForkChoiceUpdated", "head", heads.HeadBlockHash, "finalized", heads.FinalizedBlockHash, "safe", heads.SafeBlockHash)
if heads.HeadBlockHash == (common.Hash{}) {
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil
}
if err := api.checkTerminalTotalDifficulty(heads.HeadBlockHash); err != nil {
if block := api.eth.BlockChain().GetBlockByHash(heads.HeadBlockHash); block == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
if api.light {
if header := api.les.BlockChain().GetHeaderByHash(heads.HeadBlockHash); header == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
}
return INVALID, err
} else {
if block := api.eth.BlockChain().GetBlockByHash(heads.HeadBlockHash); block == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
}
return INVALID, err
}
return INVALID, err
}
// If the finalized block is set, check if it is in our blockchain
if heads.FinalizedBlockHash != (common.Hash{}) {
if block := api.eth.BlockChain().GetBlockByHash(heads.FinalizedBlockHash); block == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
if api.light {
if header := api.les.BlockChain().GetHeaderByHash(heads.FinalizedBlockHash); header == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
}
} else {
if block := api.eth.BlockChain().GetBlockByHash(heads.FinalizedBlockHash); block == nil {
// TODO (MariusVanDerWijden) trigger sync
return SYNCING, nil
}
}
}
// SetHead
if err := api.setHead(heads.HeadBlockHash); err != nil {
return INVALID, err
}
// Assemble block (if needed)
if PayloadAttributes != nil {
data, err := api.assembleBlock(heads.HeadBlockHash, PayloadAttributes)
// Assemble block (if needed). It only works for full node.
if !api.light && payloadAttributes != nil {
data, err := api.assembleBlock(heads.HeadBlockHash, payloadAttributes)
if err != nil {
return INVALID, err
}
id := computePayloadId(heads.HeadBlockHash, PayloadAttributes)
id := computePayloadId(heads.HeadBlockHash, payloadAttributes)
api.preparedBlocks.put(id, data)
log.Info("Created payload", "payloadID", id)
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: &id}, nil
Expand Down Expand Up @@ -247,13 +187,28 @@ func (api *ConsensusAPI) ExecutePayloadV1(params ExecutableDataV1) (ExecutePaylo
return api.invalid(), err
}
if api.light {
if !api.les.BlockChain().HasHeader(block.ParentHash(), block.NumberU64()-1) {
/*
TODO (MariusVanDerWijden) reenable once sync is merged
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil {
return SYNCING, err
}
*/
// TODO (MariusVanDerWijden) we should return nil here not empty hash
return ExecutePayloadResponse{Status: SYNCING.Status, LatestValidHash: common.Hash{}}, nil
}
parent := api.les.BlockChain().GetHeaderByHash(params.ParentHash)
if parent == nil {
return api.invalid(), fmt.Errorf("could not find parent %x", params.ParentHash)
td := api.les.BlockChain().GetTd(parent.Hash(), block.NumberU64()-1)
ttd := api.les.BlockChain().Config().TerminalTotalDifficulty
if td.Cmp(ttd) < 0 {
return api.invalid(), fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd)
}
if err = api.les.BlockChain().InsertHeader(block.Header()); err != nil {
return api.invalid(), err
}
if merger := api.merger(); !merger.TDDReached() {
merger.ReachTTD()
}
return ExecutePayloadResponse{Status: VALID.Status, LatestValidHash: block.Hash()}, nil
}
if !api.eth.BlockChain().HasBlock(block.ParentHash(), block.NumberU64()-1) {
Expand Down Expand Up @@ -290,99 +245,11 @@ func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *PayloadAt
return nil, errors.New("not supported")
}
log.Info("Producing block", "parentHash", parentHash)

bc := api.eth.BlockChain()
parent := bc.GetBlockByHash(parentHash)
if parent == nil {
log.Warn("Cannot assemble block with parent hash to unknown block", "parentHash", parentHash)
return nil, fmt.Errorf("cannot assemble block with unknown parent %s", parentHash)
}

if params.Timestamp <= parent.Time() {
return nil, fmt.Errorf("invalid timestamp: child's %d <= parent's %d", params.Timestamp, parent.Time())
}
if now := uint64(time.Now().Unix()); params.Timestamp > now+1 {
diff := time.Duration(params.Timestamp-now) * time.Second
log.Warn("Producing block too far in the future", "diff", common.PrettyDuration(diff))
}
pending := api.eth.TxPool().Pending(true)
coinbase := params.SuggestedFeeRecipient
num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
Coinbase: coinbase,
GasLimit: parent.GasLimit(), // Keep the gas limit constant in this prototype
Extra: []byte{}, // TODO (MariusVanDerWijden) properly set extra data
Time: params.Timestamp,
MixDigest: params.Random,
}
if config := api.eth.BlockChain().Config(); config.IsLondon(header.Number) {
header.BaseFee = misc.CalcBaseFee(config, parent.Header())
}
if err := api.engine.Prepare(bc, header); err != nil {
return nil, err
}
env, err := api.makeEnv(parent, header)
block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random)
if err != nil {
return nil, err
}
var (
signer = types.MakeSigner(bc.Config(), header.Number)
txHeap = types.NewTransactionsByPriceAndNonce(signer, pending, nil)
transactions []*types.Transaction
)
for {
if env.gasPool.Gas() < chainParams.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", chainParams.TxGas)
break
}
tx := txHeap.Peek()
if tx == nil {
break
}

// The sender is only for logging purposes, and it doesn't really matter if it's correct.
from, _ := types.Sender(signer, tx)

// Execute the transaction
env.state.Prepare(tx.Hash(), env.tcount)
err = env.commitTransaction(tx, coinbase)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
log.Trace("Gas limit exceeded for current block", "sender", from)
txHeap.Pop()

case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
txHeap.Shift()

case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with high nonce", "sender", from, "nonce", tx.Nonce())
txHeap.Pop()

case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
env.tcount++
txHeap.Shift()
transactions = append(transactions, tx)

default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
txHeap.Shift()
}
}
// Create the block.
block, err := api.engine.FinalizeAndAssemble(bc, header, env.state, transactions, nil /* uncles */, env.receipts)
if err != nil {
return nil, err
}
return BlockToExecutableData(block, params.Random), nil
return BlockToExecutableData(block), nil
}

func encodeTransactions(txs []*types.Transaction) [][]byte {
Expand Down Expand Up @@ -413,8 +280,6 @@ func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) {
if len(params.ExtraData) > 32 {
return nil, fmt.Errorf("invalid extradata length: %v", len(params.ExtraData))
}
number := big.NewInt(0)
number.SetUint64(params.Number)
header := &types.Header{
ParentHash: params.ParentHash,
UncleHash: types.EmptyUncleHash,
Expand All @@ -424,7 +289,7 @@ func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) {
ReceiptHash: params.ReceiptsRoot,
Bloom: types.BytesToBloom(params.LogsBloom),
Difficulty: common.Big0,
Number: number,
Number: new(big.Int).SetUint64(params.Number),
GasLimit: params.GasLimit,
GasUsed: params.GasUsed,
Time: params.Timestamp,
Expand All @@ -439,7 +304,9 @@ func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) {
return block, nil
}

func BlockToExecutableData(block *types.Block, random common.Hash) *ExecutableDataV1 {
// BlockToExecutableData constructs the executableDataV1 structure by filling the
// fields from the given block. It assumes the given block is post-merge block.
func BlockToExecutableData(block *types.Block) *ExecutableDataV1 {
return &ExecutableDataV1{
BlockHash: block.Hash(),
ParentHash: block.ParentHash(),
Expand All @@ -453,7 +320,7 @@ func BlockToExecutableData(block *types.Block, random common.Hash) *ExecutableDa
ReceiptsRoot: block.ReceiptHash(),
LogsBloom: block.Bloom().Bytes(),
Transactions: encodeTransactions(block.Transactions()),
Random: random,
Random: block.MixDigest(),
ExtraData: block.Extra(),
}
}
Expand All @@ -471,6 +338,18 @@ func (api *ConsensusAPI) checkTerminalTotalDifficulty(head common.Hash) error {
if api.merger().PoSFinalized() {
return nil
}
if api.light {
// make sure the parent has enough terminal total difficulty
header := api.les.BlockChain().GetHeaderByHash(head)
if header == nil {
return &GenericServerError
}
td := api.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
if td != nil && td.Cmp(api.les.BlockChain().Config().TerminalTotalDifficulty) < 0 {
return &InvalidTB
}
return nil
}
// make sure the parent has enough terminal total difficulty
newHeadBlock := api.eth.BlockChain().GetBlockByHash(head)
if newHeadBlock == nil {
Expand Down Expand Up @@ -499,8 +378,7 @@ func (api *ConsensusAPI) setHead(newHead common.Hash) error {
return err
}
// Trigger the transition if it's the first `NewHead` event.
merger := api.merger()
if !merger.PoSFinalized() {
if merger := api.merger(); !merger.PoSFinalized() {
merger.FinalizePoS()
}
return nil
Expand Down
Loading

0 comments on commit 78636ee

Please sign in to comment.