Skip to content

Commit

Permalink
Merge pull request #4851 from onflow/petera/4751-use-local-events
Browse files Browse the repository at this point in the history
[Access] Use local event for AccessAPI get events endpoints
  • Loading branch information
peterargue authored Dec 22, 2023
2 parents 7a2318a + f8dc637 commit a1b17ac
Show file tree
Hide file tree
Showing 25 changed files with 999 additions and 657 deletions.
31 changes: 24 additions & 7 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
MaxFailures: 5,
MaxRequests: 1,
},
ScriptExecutionMode: backend.ScriptExecutionModeExecutionNodesOnly.String(), // default to ENs only for now
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly.String(), // default to ENs only for now
EventQueryMode: backend.IndexQueryModeExecutionNodesOnly.String(), // default to ENs only for now
},
RestConfig: rest.Config{
ListenAddress: "",
Expand Down Expand Up @@ -669,10 +670,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Module("transaction results storage", func(node *cmd.NodeConfig) error {
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)
return nil
Expand Down Expand Up @@ -825,7 +822,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
broadcaster,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
builder.RegistersAsyncStore)
builder.RegistersAsyncStore,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
}
Expand Down Expand Up @@ -1063,6 +1061,11 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.StringVar(&builder.registersDBPath, "execution-state-dir", defaultConfig.registersDBPath, "directory to use for execution-state database")
flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file")

flags.StringVar(&builder.rpcConf.BackendConfig.EventQueryMode,
"event-query-mode",
defaultConfig.rpcConf.BackendConfig.EventQueryMode,
"mode to use when querying events. one of [local-only, execution-nodes-only(default), failover]")

// Script Execution
flags.StringVar(&builder.rpcConf.BackendConfig.ScriptExecutionMode,
"script-execution-mode",
Expand Down Expand Up @@ -1402,6 +1405,10 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.RegistersAsyncStore = execution.NewRegistersAsyncStore()
return nil
}).
Module("events storage", func(node *cmd.NodeConfig) error {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down Expand Up @@ -1434,17 +1441,26 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
),
}

scriptExecMode, err := backend.ParseScriptExecutionMode(config.BackendConfig.ScriptExecutionMode)
scriptExecMode, err := backend.ParseIndexQueryMode(config.BackendConfig.ScriptExecutionMode)
if err != nil {
return nil, fmt.Errorf("could not parse script execution mode: %w", err)
}

eventQueryMode, err := backend.ParseIndexQueryMode(config.BackendConfig.EventQueryMode)
if err != nil {
return nil, fmt.Errorf("could not parse script execution mode: %w", err)
}
if eventQueryMode == backend.IndexQueryModeCompare {
return nil, fmt.Errorf("event query mode 'compare' is not supported")
}

nodeBackend, err := backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Events: node.Storage.Events,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
Expand All @@ -1463,6 +1479,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
TxErrorMessagesCacheSize: builder.TxErrorMessagesCacheSize,
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
EventQueryMode: eventQueryMode,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func (suite *Suite) TestExecuteScript() {
Log: suite.log,
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
ScriptExecutionMode: backend.ScriptExecutionModeExecutionNodesOnly,
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly,
TxErrorMessagesCacheSize: 1000,
})
require.NoError(suite.T(), err)
Expand Down
2 changes: 2 additions & 0 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type SameGRPCPortTestSuite struct {
// storage
blocks *storagemock.Blocks
headers *storagemock.Headers
events *storagemock.Events
collections *storagemock.Collections
transactions *storagemock.Transactions
receipts *storagemock.ExecutionReceipts
Expand Down Expand Up @@ -101,6 +102,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
suite.snapshot.On("Epochs").Return(suite.epochQuery).Maybe()
suite.blocks = new(storagemock.Blocks)
suite.headers = new(storagemock.Headers)
suite.events = new(storagemock.Events)
suite.transactions = new(storagemock.Transactions)
suite.collections = new(storagemock.Collections)
suite.receipts = new(storagemock.ExecutionReceipts)
Expand Down
26 changes: 14 additions & 12 deletions engine/access/rest/routes/subscribe_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ type testType struct {
headers http.Header
}

var chainID = flow.Testnet
var testEventTypes = []flow.EventType{
"A.0123456789abcdef.flow.event",
"B.0123456789abcdef.flow.event",
"C.0123456789abcdef.flow.event",
unittest.EventTypeFixture(chainID),
unittest.EventTypeFixture(chainID),
unittest.EventTypeFixture(chainID),
}

type SubscribeEventsSuite struct {
Expand Down Expand Up @@ -83,6 +84,8 @@ func (s *SubscribeEventsSuite) SetupTest() {
// update payloads with valid CCF encoded data
for i := range blockEvents.Events {
blockEvents.Events[i].Payload = eventsGenerator.New().Payload

s.T().Logf("block events %d %v => %v", block.Header.Height, block.ID(), blockEvents.Events[i].Type)
}

s.blocks = append(s.blocks, block)
Expand Down Expand Up @@ -143,7 +146,6 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() {
},
},
}
chain := flow.MonotonicEmulator.Chain()

// create variations for each of the base test
tests := make([]testType, 0, len(testVectors)*2)
Expand All @@ -159,7 +161,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() {

t3 := test
t3.name = fmt.Sprintf("%s - non existing events", test.name)
t3.eventTypes = []string{"A.0123456789abcdff.flow.event"}
t3.eventTypes = []string{fmt.Sprintf("%s_new", testEventTypes[0])}
tests = append(tests, t3)
}

Expand All @@ -170,7 +172,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() {

filter, err := state_stream.NewEventFilter(
state_stream.DefaultEventFilterConfig,
chain,
chainID.Chain(),
test.eventTypes,
test.addresses,
test.contracts)
Expand Down Expand Up @@ -245,9 +247,9 @@ func (s *SubscribeEventsSuite) TestSubscribeEvents() {
// closing the connection after 1 second
go func() {
time.Sleep(1 * time.Second)
close(respRecorder.closed)
respRecorder.Close()
}()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireResponse(s.T(), respRecorder, expectedEventsResponses)
})
}
Expand All @@ -259,7 +261,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEventsHandlesErrors() {
req, err := getSubscribeEventsRequest(s.T(), s.blocks[0].ID(), s.blocks[0].Header.Height, nil, nil, nil, 1, nil)
require.NoError(s.T(), err)
respRecorder := newTestHijackResponseRecorder()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireError(s.T(), respRecorder, "can only provide either block ID or start height")
})

Expand All @@ -284,7 +286,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEventsHandlesErrors() {
req, err := getSubscribeEventsRequest(s.T(), invalidBlock.ID(), request.EmptyHeight, nil, nil, nil, 1, nil)
require.NoError(s.T(), err)
respRecorder := newTestHijackResponseRecorder()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireError(s.T(), respRecorder, "stream encountered an error: subscription error")
})

Expand All @@ -293,7 +295,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEventsHandlesErrors() {
req, err := getSubscribeEventsRequest(s.T(), s.blocks[0].ID(), request.EmptyHeight, []string{"foo"}, nil, nil, 1, nil)
require.NoError(s.T(), err)
respRecorder := newTestHijackResponseRecorder()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireError(s.T(), respRecorder, "invalid event type format")
})

Expand All @@ -318,7 +320,7 @@ func (s *SubscribeEventsSuite) TestSubscribeEventsHandlesErrors() {
req, err := getSubscribeEventsRequest(s.T(), s.blocks[0].ID(), request.EmptyHeight, nil, nil, nil, 1, nil)
require.NoError(s.T(), err)
respRecorder := newTestHijackResponseRecorder()
executeWsRequest(req, stateStreamBackend, respRecorder)
executeWsRequest(req, stateStreamBackend, respRecorder, chainID.Chain())
requireError(s.T(), respRecorder, "subscription channel closed")
})
}
Expand Down
14 changes: 12 additions & 2 deletions engine/access/rest/routes/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (c fakeNetConn) Close() error {
}
return nil
}

func (c fakeNetConn) LocalAddr() net.Addr { return localAddr }
func (c fakeNetConn) RemoteAddr() net.Addr { return remoteAddr }
func (c fakeNetConn) SetDeadline(t time.Time) error { return nil }
Expand Down Expand Up @@ -101,6 +102,15 @@ func (w *testHijackResponseRecorder) Hijack() (net.Conn, *bufio.ReadWriter, erro
return fakeNetConn{w.responseBuff, w.closed}, bufio.NewReadWriter(br, bw), nil
}

func (w *testHijackResponseRecorder) Close() error {
select {
case <-w.closed:
default:
close(w.closed)
}
return nil
}

// newTestHijackResponseRecorder creates a new instance of testHijackResponseRecorder.
func newTestHijackResponseRecorder() *testHijackResponseRecorder {
return &testHijackResponseRecorder{
Expand All @@ -122,7 +132,7 @@ func executeRequest(req *http.Request, backend access.API) *httptest.ResponseRec
return rr
}

func executeWsRequest(req *http.Request, stateStreamApi state_stream.API, responseRecorder *testHijackResponseRecorder) {
func executeWsRequest(req *http.Request, stateStreamApi state_stream.API, responseRecorder *testHijackResponseRecorder, chain flow.Chain) {
restCollector := metrics.NewNoopCollector()

config := backend.Config{
Expand All @@ -133,7 +143,7 @@ func executeWsRequest(req *http.Request, stateStreamApi state_stream.API, respon

router := NewRouterBuilder(unittest.Logger(), restCollector).AddWsRoutes(
stateStreamApi,
flow.Testnet.Chain(), config).Build()
chain, config).Build()
router.ServeHTTP(responseRecorder, req)
}

Expand Down
15 changes: 10 additions & 5 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Params struct {
HistoricalAccessNodes []accessproto.AccessAPIClient
Blocks storage.Blocks
Headers storage.Headers
Events storage.Events
Collections storage.Collections
Transactions storage.Transactions
ExecutionReceipts storage.ExecutionReceipts
Expand All @@ -101,7 +102,8 @@ type Params struct {
TxResultCacheSize uint
TxErrorMessagesCacheSize uint
ScriptExecutor execution.ScriptExecutor
ScriptExecutionMode ScriptExecutionMode
ScriptExecutionMode IndexQueryMode
EventQueryMode IndexQueryMode
}

// New creates backend instance
Expand Down Expand Up @@ -146,18 +148,19 @@ func New(params Params) (*Backend, error) {
state: params.State,
// create the sub-backends
backendScripts: backendScripts{
log: params.Log,
headers: params.Headers,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
state: params.State,
log: params.Log,
metrics: params.AccessMetrics,
loggedScripts: loggedScripts,
nodeCommunicator: params.Communicator,
scriptExecutor: params.ScriptExecutor,
scriptExecMode: params.ScriptExecutionMode,
},
backendTransactions: backendTransactions{
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
state: params.State,
chainID: params.ChainID,
Expand All @@ -171,19 +174,21 @@ func New(params Params) (*Backend, error) {
retry: retry,
connFactory: params.ConnFactory,
previousAccessNodes: params.HistoricalAccessNodes,
log: params.Log,
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
},
backendEvents: backendEvents{
log: params.Log,
chain: params.ChainID.Chain(),
state: params.State,
headers: params.Headers,
events: params.Events,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
log: params.Log,
maxHeightRange: params.MaxHeightRange,
nodeCommunicator: params.Communicator,
queryMode: params.EventQueryMode,
},
backendBlockHeaders: backendBlockHeaders{
headers: params.Headers,
Expand All @@ -194,11 +199,11 @@ func New(params Params) (*Backend, error) {
state: params.State,
},
backendAccounts: backendAccounts{
log: params.Log,
state: params.State,
headers: params.Headers,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
log: params.Log,
nodeCommunicator: params.Communicator,
scriptExecutor: params.ScriptExecutor,
scriptExecMode: params.ScriptExecutionMode,
Expand Down
10 changes: 5 additions & 5 deletions engine/access/rpc/backend/backend_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type backendAccounts struct {
connFactory connection.ConnectionFactory
nodeCommunicator Communicator
scriptExecutor execution.ScriptExecutor
scriptExecMode ScriptExecutionMode
scriptExecMode IndexQueryMode
}

// GetAccount returns the account details at the latest sealed block.
Expand Down Expand Up @@ -93,13 +93,13 @@ func (b *backendAccounts) getAccountAtBlock(
height uint64,
) (*flow.Account, error) {
switch b.scriptExecMode {
case ScriptExecutionModeExecutionNodesOnly:
case IndexQueryModeExecutionNodesOnly:
return b.getAccountFromAnyExeNode(ctx, address, blockID)

case ScriptExecutionModeLocalOnly:
case IndexQueryModeLocalOnly:
return b.getAccountFromLocalStorage(ctx, address, height)

case ScriptExecutionModeFailover:
case IndexQueryModeFailover:
localResult, localErr := b.getAccountFromLocalStorage(ctx, address, height)
if localErr == nil {
return localResult, nil
Expand All @@ -110,7 +110,7 @@ func (b *backendAccounts) getAccountAtBlock(

return execResult, execErr

case ScriptExecutionModeCompare:
case IndexQueryModeCompare:
execResult, execErr := b.getAccountFromAnyExeNode(ctx, address, blockID)
// Only compare actual get account errors from the EN, not system errors
if execErr != nil && !isInvalidArgumentError(execErr) {
Expand Down
Loading

0 comments on commit a1b17ac

Please sign in to comment.