Skip to content

Commit

Permalink
Merge branch 'master' into AndriiSlisarchuk/6573-pending-should-retur…
Browse files Browse the repository at this point in the history
…n-immediately
  • Loading branch information
Guitarheroua authored Jan 20, 2025
2 parents d1d0466 + a3c2cea commit 4aa5d14
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 155 deletions.
10 changes: 5 additions & 5 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (exeNode *ExecutionNode) LoadExecutionMetrics(node *NodeConfig) error {
// the root block as executed block
var height uint64
var blockID flow.Identifier
err := node.DB.View(procedure.GetHighestExecutedBlock(&height, &blockID))
err := node.DB.View(procedure.GetLastExecutedBlock(&height, &blockID))
if err != nil {
// database has not been bootstrapped yet
if errors.Is(err, storageerr.ErrNotFound) {
Expand Down Expand Up @@ -590,7 +590,7 @@ func (exeNode *ExecutionNode) LoadProviderEngine(

// Get latest executed block and a view at that block
ctx := context.Background()
height, blockID, err := exeNode.executionState.GetHighestExecutedBlockID(ctx)
height, blockID, err := exeNode.executionState.GetLastExecutedBlockID(ctx)
if err != nil {
return nil, fmt.Errorf(
"cannot get the latest executed block id at height %v: %w",
Expand Down Expand Up @@ -762,12 +762,12 @@ func (exeNode *ExecutionNode) LoadExecutionState(
exeNode.exeConf.enableStorehouse,
)

height, _, err := exeNode.executionState.GetHighestExecutedBlockID(context.Background())
height, _, err := exeNode.executionState.GetLastExecutedBlockID(context.Background())
if err != nil {
return nil, fmt.Errorf("could not get highest executed block: %w", err)
return nil, fmt.Errorf("could not get last executed block: %w", err)
}

log.Info().Msgf("execution state highest executed block height: %v", height)
log.Info().Msgf("execution state last executed block height: %v", height)
exeNode.collector.ExecutionLastExecutedBlockHeight(height)

return &module.NoopReadyDoneAware{}, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/find-inconsistent-result/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func findLastExecutedAndSealedHeight(state protocol.State, db *badger.DB) (uint6

var blockID flow.Identifier
var lastExecuted uint64
err = db.View(procedure.GetHighestExecutedBlock(&lastExecuted, &blockID))
err = db.View(procedure.GetLastExecutedBlock(&lastExecuted, &blockID))
if err != nil {
return 0, err
}
Expand Down
67 changes: 20 additions & 47 deletions engine/access/rpc/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestProxyAccessAPI(t *testing.T) {
metrics := metrics.NewNoopCollector()

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -75,15 +75,15 @@ func TestProxyAccessAPI(t *testing.T) {
// make the call to the collection node
resp, err := client.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

func TestProxyExecutionAPI(t *testing.T) {
logger := unittest.Logger()
metrics := metrics.NewNoopCollector()

// create an execution node
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -124,15 +124,15 @@ func TestProxyExecutionAPI(t *testing.T) {
// make the call to the execution node
resp, err := client.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

func TestProxyAccessAPIConnectionReuse(t *testing.T) {
logger := unittest.Logger()
metrics := metrics.NewNoopCollector()

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -186,15 +186,15 @@ func TestProxyAccessAPIConnectionReuse(t *testing.T) {
ctx := context.Background()
resp, err := accessAPIClient.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

func TestProxyExecutionAPIConnectionReuse(t *testing.T) {
logger := unittest.Logger()
metrics := metrics.NewNoopCollector()

// create an execution node
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -248,7 +248,7 @@ func TestProxyExecutionAPIConnectionReuse(t *testing.T) {
ctx := context.Background()
resp, err := executionAPIClient.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

// TestExecutionNodeClientTimeout tests that the execution API client times out after the timeout duration
Expand All @@ -259,7 +259,7 @@ func TestExecutionNodeClientTimeout(t *testing.T) {
timeout := 10 * time.Millisecond

// create an execution node
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -316,7 +316,7 @@ func TestCollectionNodeClientTimeout(t *testing.T) {
timeout := 10 * time.Millisecond

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -371,31 +371,14 @@ func TestConnectionPoolFull(t *testing.T) {
metrics := metrics.NewNoopCollector()

// create a collection node
cn1, cn2, cn3 := new(collectionNode), new(collectionNode), new(collectionNode)
cn1, cn2, cn3 := newCollectionNode(t), newCollectionNode(t), newCollectionNode(t)
cn1.start(t)
cn2.start(t)
cn3.start(t)
defer cn1.stop(t)
defer cn2.stop(t)
defer cn3.stop(t)

expected := &access.PingResponse{}
cn1.handler.
On("Ping",
testifymock.Anything,
testifymock.AnythingOfType("*access.PingRequest")).
Return(expected, nil)
cn2.handler.
On("Ping",
testifymock.Anything,
testifymock.AnythingOfType("*access.PingRequest")).
Return(expected, nil)
cn3.handler.
On("Ping",
testifymock.Anything,
testifymock.AnythingOfType("*access.PingRequest")).
Return(expected, nil)

// create the factory
connectionFactory := new(ConnectionFactoryImpl)
// set the collection grpc port
Expand Down Expand Up @@ -467,7 +450,7 @@ func TestConnectionPoolStale(t *testing.T) {
metrics := metrics.NewNoopCollector()

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -534,7 +517,7 @@ func TestConnectionPoolStale(t *testing.T) {
ctx = context.Background()
resp, err := accessAPIClient.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

// TestExecutionNodeClientClosedGracefully tests the scenario where the execution node client is closed gracefully.
Expand All @@ -550,7 +533,7 @@ func TestExecutionNodeClientClosedGracefully(t *testing.T) {

// Add createExecNode function to recreate it each time for rapid test
createExecNode := func() (*executionNode, func()) {
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
return en, func() {
en.stop(t)
Expand Down Expand Up @@ -650,7 +633,7 @@ func TestEvictingCacheClients(t *testing.T) {
metrics := metrics.NewNoopCollector()

// Create a new collection node for testing
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand All @@ -674,10 +657,6 @@ func TestEvictingCacheClients(t *testing.T) {
func(context.Context, *access.PingRequest) error { return nil },
)

netReq := &access.GetNetworkParametersRequest{}
netResp := &access.GetNetworkParametersResponse{}
cn.handler.On("GetNetworkParameters", testifymock.Anything, netReq).Return(netResp, nil)

// Create the connection factory
connectionFactory := new(ConnectionFactoryImpl)
// Set the gRPC port
Expand Down Expand Up @@ -740,7 +719,7 @@ func TestEvictingCacheClients(t *testing.T) {
}, 100*time.Millisecond, 10*time.Millisecond, "client timed out closing connection")

// Call a gRPC method on the client, requests should be blocked since the connection is invalidated
resp, err := client.GetNetworkParameters(ctx, netReq)
resp, err := client.GetNetworkParameters(ctx, &access.GetNetworkParametersRequest{})
assert.Equal(t, status.Errorf(codes.Unavailable, "the connection to %s was closed", clientAddress), err)
assert.Nil(t, resp)

Expand All @@ -749,9 +728,7 @@ func TestEvictingCacheClients(t *testing.T) {

// Call a gRPC method on the client
_, err = client.Ping(ctx, pingReq)
// Check that Ping was called
cn.handler.AssertCalled(t, "Ping", testifymock.Anything, pingReq)
assert.NoError(t, err)
require.NoError(t, err)

// Wait for the client connection to change state from "Ready" to "Shutdown" as connection was closed.
require.Eventually(t, func() bool {
Expand All @@ -770,7 +747,7 @@ func TestConcurrentConnections(t *testing.T) {

// Add createExecNode function to recreate it each time for rapid test
createExecNode := func() (*executionNode, func()) {
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
return en, func() {
en.stop(t)
Expand Down Expand Up @@ -886,7 +863,7 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {
circuitBreakerRestoreTimeout := 1500 * time.Millisecond

// Create an execution node for testing.
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -934,8 +911,6 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {

// Make the call to the execution node.
_, err = client.Ping(ctx, req)
en.handler.AssertCalled(t, "Ping", testifymock.Anything, req)

return time.Since(start), err
}

Expand Down Expand Up @@ -1005,7 +980,7 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {
circuitBreakerRestoreTimeout := 1500 * time.Millisecond

// Create a collection node for testing.
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -1053,8 +1028,6 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {

// Make the call to the collection node.
_, err = client.Ping(ctx, req)
cn.handler.AssertCalled(t, "Ping", testifymock.Anything, req)

return time.Since(start), err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func BenchmarkWithDeflateCompression(b *testing.B) {
// runBenchmark is a helper function that performs the benchmarking for different compressors.
func runBenchmark(b *testing.B, compressorName string) {
// create an execution node
en := new(executionNode)
en := newExecutionNode(b)
en.start(b)
defer en.stop(b)

Expand Down
38 changes: 27 additions & 11 deletions engine/access/rpc/connection/node_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,19 @@ type executionNode struct {
handler *mock.ExecutionAPIServer
}

func newExecutionNode(tb testing.TB) *executionNode {
return &executionNode{
handler: mock.NewExecutionAPIServer(tb),
}
}

func (en *executionNode) start(tb testing.TB) {
if en.handler == nil {
tb.Fatalf("executionNode must be initialized using newExecutionNode")
}

en.setupNode(tb)
handler := new(mock.ExecutionAPIServer)
execution.RegisterExecutionAPIServer(en.server, handler)
en.handler = handler
execution.RegisterExecutionAPIServer(en.server, en.handler)
en.node.start(tb)
}

Expand All @@ -81,14 +89,22 @@ type collectionNode struct {
handler *mock.AccessAPIServer
}

func (cn *collectionNode) start(t *testing.T) {
cn.setupNode(t)
handler := new(mock.AccessAPIServer)
access.RegisterAccessAPIServer(cn.server, handler)
cn.handler = handler
cn.node.start(t)
func newCollectionNode(tb testing.TB) *collectionNode {
return &collectionNode{
handler: mock.NewAccessAPIServer(tb),
}
}

func (cn *collectionNode) start(tb testing.TB) {
if cn.handler == nil {
tb.Fatalf("collectionNode must be initialized using newCollectionNode")
}

cn.setupNode(tb)
access.RegisterAccessAPIServer(cn.server, cn.handler)
cn.node.start(tb)
}

func (cn *collectionNode) stop(t *testing.T) {
cn.node.stop(t)
func (cn *collectionNode) stop(tb testing.TB) {
cn.node.stop(tb)
}
2 changes: 1 addition & 1 deletion engine/execution/checker/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *Core) findLastSealedBlock() (*flow.Header, *flow.Header, *flow.Seal, er

// findLastExecutedBlockHeight finds the last executed block height
func (c *Core) findLastExecutedBlockHeight() (uint64, error) {
height, _, err := c.execState.GetHighestExecutedBlockID(context.Background())
height, _, err := c.execState.GetLastExecutedBlockID(context.Background())
if err != nil {
return 0, fmt.Errorf("could not get the last executed block: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions engine/execution/checker/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestCheckPassIfLastSealedIsNotExecutedAndLastExecutedMatch(t *testing.T) {
mockUnexecutedBlock(t, es, lastSealed)

// mock the last sealed and is also executed
es.On("GetHighestExecutedBlockID", mock.Anything).Return(lastExecuted.Height, lastExecuted.ID(), nil)
es.On("GetLastExecutedBlockID", mock.Anything).Return(lastExecuted.Height, lastExecuted.ID(), nil)
lastSealedResultAtExecutedHeight, _ := mockSealedBlockAtHeight(t, state, lastExecuted.Height, lastSealedExecuted)
mockAtBlockID(t, state, lastSealedExecuted)

Expand All @@ -143,7 +143,7 @@ func TestCheckFailIfLastSealedIsNotExecutedAndLastExecutedMismatch(t *testing.T)
mockUnexecutedBlock(t, es, lastSealed)

// mock the last sealed and is also executed
es.On("GetHighestExecutedBlockID", mock.Anything).Return(lastExecuted.Height, lastExecuted.ID(), nil)
es.On("GetLastExecutedBlockID", mock.Anything).Return(lastExecuted.Height, lastExecuted.ID(), nil)
mockSealedBlockAtHeight(t, state, lastExecuted.Height, lastSealedExecuted)
mockAtBlockID(t, state, lastSealedExecuted)

Expand Down
2 changes: 1 addition & 1 deletion engine/execution/ingestion/loader/unexecuted_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (e *UnexecutedLoader) LoadUnexecuted(ctx context.Context) ([]flow.Identifie
// a root block will fail, because the root block doesn't have a parent block, and could not
// get the result of it.
// TODO: remove this, when saving a executed block is transactional
lastExecutedHeight, lastExecutedID, err := e.execState.GetHighestExecutedBlockID(ctx)
lastExecutedHeight, lastExecutedID, err := e.execState.GetLastExecutedBlockID(ctx)
if err != nil {
return nil, fmt.Errorf("could not get last executed: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newMockExecutionState(seal *flow.Seal, genesis *flow.Header) *mockExecution
es := &mockExecutionState{
commits: commits,
}
es.On("GetHighestExecutedBlockID", mock.Anything).Return(genesis.Height, genesis.ID(), nil)
es.On("GetLastExecutedBlockID", mock.Anything).Return(genesis.Height, genesis.ID(), nil)
return es
}

Expand Down
Loading

0 comments on commit 4aa5d14

Please sign in to comment.