Skip to content

Commit

Permalink
Merge pull request #6909 from onflow/leo/refactor-last-executed-block
Browse files Browse the repository at this point in the history
Replace HighestExecutedBlock with LastExecutedBlock
  • Loading branch information
zhangchiqing authored Jan 17, 2025
2 parents 8c170e3 + 0615fbb commit 48984c8
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 96 deletions.
10 changes: 5 additions & 5 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (exeNode *ExecutionNode) LoadExecutionMetrics(node *NodeConfig) error {
// the root block as executed block
var height uint64
var blockID flow.Identifier
err := node.DB.View(procedure.GetHighestExecutedBlock(&height, &blockID))
err := node.DB.View(procedure.GetLastExecutedBlock(&height, &blockID))
if err != nil {
// database has not been bootstrapped yet
if errors.Is(err, storageerr.ErrNotFound) {
Expand Down Expand Up @@ -590,7 +590,7 @@ func (exeNode *ExecutionNode) LoadProviderEngine(

// Get latest executed block and a view at that block
ctx := context.Background()
height, blockID, err := exeNode.executionState.GetHighestExecutedBlockID(ctx)
height, blockID, err := exeNode.executionState.GetLastExecutedBlockID(ctx)
if err != nil {
return nil, fmt.Errorf(
"cannot get the latest executed block id at height %v: %w",
Expand Down Expand Up @@ -762,12 +762,12 @@ func (exeNode *ExecutionNode) LoadExecutionState(
exeNode.exeConf.enableStorehouse,
)

height, _, err := exeNode.executionState.GetHighestExecutedBlockID(context.Background())
height, _, err := exeNode.executionState.GetLastExecutedBlockID(context.Background())
if err != nil {
return nil, fmt.Errorf("could not get highest executed block: %w", err)
return nil, fmt.Errorf("could not get last executed block: %w", err)
}

log.Info().Msgf("execution state highest executed block height: %v", height)
log.Info().Msgf("execution state last executed block height: %v", height)
exeNode.collector.ExecutionLastExecutedBlockHeight(height)

return &module.NoopReadyDoneAware{}, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/find-inconsistent-result/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func findLastExecutedAndSealedHeight(state protocol.State, db *badger.DB) (uint6

var blockID flow.Identifier
var lastExecuted uint64
err = db.View(procedure.GetHighestExecutedBlock(&lastExecuted, &blockID))
err = db.View(procedure.GetLastExecutedBlock(&lastExecuted, &blockID))
if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion engine/execution/checker/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *Core) findLastSealedBlock() (*flow.Header, *flow.Header, *flow.Seal, er

// findLastExecutedBlockHeight finds the last executed block height
func (c *Core) findLastExecutedBlockHeight() (uint64, error) {
height, _, err := c.execState.GetHighestExecutedBlockID(context.Background())
height, _, err := c.execState.GetLastExecutedBlockID(context.Background())
if err != nil {
return 0, fmt.Errorf("could not get the last executed block: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions engine/execution/checker/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestCheckPassIfLastSealedIsNotExecutedAndLastExecutedMatch(t *testing.T) {
mockUnexecutedBlock(t, es, lastSealed)

// mock the last sealed and is also executed
es.On("GetHighestExecutedBlockID", mock.Anything).Return(lastExecuted.Height, lastExecuted.ID(), nil)
es.On("GetLastExecutedBlockID", mock.Anything).Return(lastExecuted.Height, lastExecuted.ID(), nil)
lastSealedResultAtExecutedHeight, _ := mockSealedBlockAtHeight(t, state, lastExecuted.Height, lastSealedExecuted)
mockAtBlockID(t, state, lastSealedExecuted)

Expand All @@ -143,7 +143,7 @@ func TestCheckFailIfLastSealedIsNotExecutedAndLastExecutedMismatch(t *testing.T)
mockUnexecutedBlock(t, es, lastSealed)

// mock the last sealed and is also executed
es.On("GetHighestExecutedBlockID", mock.Anything).Return(lastExecuted.Height, lastExecuted.ID(), nil)
es.On("GetLastExecutedBlockID", mock.Anything).Return(lastExecuted.Height, lastExecuted.ID(), nil)
mockSealedBlockAtHeight(t, state, lastExecuted.Height, lastSealedExecuted)
mockAtBlockID(t, state, lastSealedExecuted)

Expand Down
2 changes: 1 addition & 1 deletion engine/execution/ingestion/loader/unexecuted_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (e *UnexecutedLoader) LoadUnexecuted(ctx context.Context) ([]flow.Identifie
// a root block will fail, because the root block doesn't have a parent block, and could not
// get the result of it.
// TODO: remove this, when saving a executed block is transactional
lastExecutedHeight, lastExecutedID, err := e.execState.GetHighestExecutedBlockID(ctx)
lastExecutedHeight, lastExecutedID, err := e.execState.GetLastExecutedBlockID(ctx)
if err != nil {
return nil, fmt.Errorf("could not get last executed: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newMockExecutionState(seal *flow.Seal, genesis *flow.Header) *mockExecution
es := &mockExecutionState{
commits: commits,
}
es.On("GetHighestExecutedBlockID", mock.Anything).Return(genesis.Height, genesis.ID(), nil)
es.On("GetLastExecutedBlockID", mock.Anything).Return(genesis.Height, genesis.ID(), nil)
return es
}

Expand Down
68 changes: 34 additions & 34 deletions engine/execution/state/mock/execution_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions engine/execution/state/mock/read_only_execution_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 8 additions & 13 deletions engine/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ReadOnlyExecutionState interface {

GetExecutionResultID(context.Context, flow.Identifier) (flow.Identifier, error)

GetHighestExecutedBlockID(context.Context) (uint64, flow.Identifier, error)
GetLastExecutedBlockID(context.Context) (uint64, flow.Identifier, error)
}

// ScriptExecutionState is a subset of the `state.ExecutionState` interface purposed to only access the state
Expand Down Expand Up @@ -79,7 +79,7 @@ type FinalizedExecutionState interface {
type ExecutionState interface {
ReadOnlyExecutionState

UpdateHighestExecutedBlockIfHigher(context.Context, *flow.Header) error
UpdateLastExecutedBlock(context.Context, *flow.Header) error

SaveExecutionResults(
ctx context.Context,
Expand Down Expand Up @@ -385,7 +385,7 @@ func (s *state) SaveExecutionResults(
}

//outside batch because it requires read access
err = s.UpdateHighestExecutedBlockIfHigher(childCtx, result.ExecutableBlock.Block.Header)
err = s.UpdateLastExecutedBlock(childCtx, result.ExecutableBlock.Block.Header)
if err != nil {
return fmt.Errorf("cannot update highest executed block: %w", err)
}
Expand Down Expand Up @@ -473,17 +473,12 @@ func (s *state) saveExecutionResults(
return nil
}

func (s *state) UpdateHighestExecutedBlockIfHigher(ctx context.Context, header *flow.Header) error {
if s.tracer != nil {
span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXEUpdateHighestExecutedBlockIfHigher)
defer span.End()
}

return operation.RetryOnConflict(s.db.Update, procedure.UpdateHighestExecutedBlockIfHigher(header))
func (s *state) UpdateLastExecutedBlock(ctx context.Context, header *flow.Header) error {
return operation.RetryOnConflict(s.db.Update, procedure.UpdateLastExecutedBlock(header))
}

// deprecated by storehouse's GetHighestFinalizedExecuted
func (s *state) GetHighestExecutedBlockID(ctx context.Context) (uint64, flow.Identifier, error) {
func (s *state) GetLastExecutedBlockID(ctx context.Context) (uint64, flow.Identifier, error) {
if s.enableRegisterStore {
// when storehouse is enabled, the highest executed block is consisted as
// the highest finalized and executed block
Expand All @@ -501,7 +496,7 @@ func (s *state) GetHighestExecutedBlockID(ctx context.Context) (uint64, flow.Ide

var blockID flow.Identifier
var height uint64
err := s.db.View(procedure.GetHighestExecutedBlock(&height, &blockID))
err := s.db.View(procedure.GetLastExecutedBlock(&height, &blockID))
if err != nil {
return 0, flow.ZeroID, err
}
Expand All @@ -522,7 +517,7 @@ func (s *state) GetHighestFinalizedExecuted() (uint64, error) {
}

// last executed height
executedHeight, _, err := s.GetHighestExecutedBlockID(context.Background())
executedHeight, _, err := s.GetLastExecutedBlockID(context.Background())
if err != nil {
return 0, fmt.Errorf("could not get highest executed block: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion engine/testutil/mock/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (en ExecutionNode) Done(cancelFunc context.CancelFunc) {
}

func (en ExecutionNode) AssertHighestExecutedBlock(t *testing.T, header *flow.Header) {
height, blockID, err := en.ExecutionState.GetHighestExecutedBlockID(context.Background())
height, blockID, err := en.ExecutionState.GetLastExecutedBlockID(context.Background())
require.NoError(t, err)

require.Equal(t, header.ID(), blockID)
Expand Down
28 changes: 5 additions & 23 deletions storage/badger/procedure/executed.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,10 @@ import (
"github.com/onflow/flow-go/storage/badger/operation"
)

// UpdateHighestExecutedBlockIfHigher updates the latest executed block to be the input block
// if the input block has a greater height than the currently stored latest executed block.
// The executed block index must have been initialized before calling this function.
// Returns storage.ErrNotFound if the input block does not exist in storage.
func UpdateHighestExecutedBlockIfHigher(header *flow.Header) func(txn *badger.Txn) error {
// UpdateLastExecutedBlock updates the latest executed block to be the input block
func UpdateLastExecutedBlock(header *flow.Header) func(txn *badger.Txn) error {
return func(txn *badger.Txn) error {
var blockID flow.Identifier
err := operation.RetrieveExecutedBlock(&blockID)(txn)
if err != nil {
return fmt.Errorf("cannot lookup executed block: %w", err)
}

var highest flow.Header
err = operation.RetrieveHeader(blockID, &highest)(txn)
if err != nil {
return fmt.Errorf("cannot retrieve executed header: %w", err)
}

if header.Height <= highest.Height {
return nil
}
err = operation.UpdateExecutedBlock(header.ID())(txn)
err := operation.UpdateExecutedBlock(header.ID())(txn)
if err != nil {
return fmt.Errorf("cannot update highest executed block: %w", err)
}
Expand All @@ -41,9 +23,9 @@ func UpdateHighestExecutedBlockIfHigher(header *flow.Header) func(txn *badger.Tx
}
}

// GetHighestExecutedBlock retrieves the height and ID of the latest block executed by this node.
// GetLastExecutedBlock retrieves the height and ID of the latest block executed by this node.
// Returns storage.ErrNotFound if no latest executed block has been stored.
func GetHighestExecutedBlock(height *uint64, blockID *flow.Identifier) func(tx *badger.Txn) error {
func GetLastExecutedBlock(height *uint64, blockID *flow.Identifier) func(tx *badger.Txn) error {
return func(tx *badger.Txn) error {
var highest flow.Header
err := operation.RetrieveExecutedBlock(blockID)(tx)
Expand Down
22 changes: 11 additions & 11 deletions storage/badger/procedure/executed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestInsertExecuted(t *testing.T) {
var height uint64
var blockID flow.Identifier
require.NoError(t,
db.View(GetHighestExecutedBlock(&height, &blockID)),
db.View(GetLastExecutedBlock(&height, &blockID)),
)

require.Equal(t, root.ID(), blockID)
Expand All @@ -37,13 +37,13 @@ func TestInsertExecuted(t *testing.T) {
t.Run("insert and get", func(t *testing.T) {
header1 := chain[1].Header
require.NoError(t,
db.Update(UpdateHighestExecutedBlockIfHigher(header1)),
db.Update(UpdateLastExecutedBlock(header1)),
)

var height uint64
var blockID flow.Identifier
require.NoError(t,
db.View(GetHighestExecutedBlock(&height, &blockID)),
db.View(GetLastExecutedBlock(&height, &blockID)),
)

require.Equal(t, header1.ID(), blockID)
Expand All @@ -54,15 +54,15 @@ func TestInsertExecuted(t *testing.T) {
header2 := chain[2].Header
header3 := chain[3].Header
require.NoError(t,
db.Update(UpdateHighestExecutedBlockIfHigher(header2)),
db.Update(UpdateLastExecutedBlock(header2)),
)
require.NoError(t,
db.Update(UpdateHighestExecutedBlockIfHigher(header3)),
db.Update(UpdateLastExecutedBlock(header3)),
)
var height uint64
var blockID flow.Identifier
require.NoError(t,
db.View(GetHighestExecutedBlock(&height, &blockID)),
db.View(GetLastExecutedBlock(&height, &blockID)),
)

require.Equal(t, header3.ID(), blockID)
Expand All @@ -73,19 +73,19 @@ func TestInsertExecuted(t *testing.T) {
header5 := chain[5].Header
header4 := chain[4].Header
require.NoError(t,
db.Update(UpdateHighestExecutedBlockIfHigher(header5)),
db.Update(UpdateLastExecutedBlock(header5)),
)
require.NoError(t,
db.Update(UpdateHighestExecutedBlockIfHigher(header4)),
db.Update(UpdateLastExecutedBlock(header4)),
)
var height uint64
var blockID flow.Identifier
require.NoError(t,
db.View(GetHighestExecutedBlock(&height, &blockID)),
db.View(GetLastExecutedBlock(&height, &blockID)),
)

require.Equal(t, header5.ID(), blockID)
require.Equal(t, header5.Height, height)
require.Equal(t, header4.ID(), blockID)
require.Equal(t, header4.Height, height)
})
})
}

0 comments on commit 48984c8

Please sign in to comment.