diff --git a/access/api.go b/access/api.go index 2e1bf8d4425..76cc6f6abde 100644 --- a/access/api.go +++ b/access/api.go @@ -203,41 +203,19 @@ type API interface { // // If invalid parameters will be supplied SubscribeBlockDigestsFromLatest will return a failed subscription. SubscribeBlockDigestsFromLatest(ctx context.Context, blockStatus flow.BlockStatus) subscription.Subscription - // SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID. - // Monitoring begins from the specified block ID. The subscription streams status updates until the transaction - // reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of - // these final statuses, the subscription will automatically terminate. - // - // Parameters: - // - ctx: The context to manage the subscription's lifecycle, including cancellation. - // - txID: The identifier of the transaction to monitor. - // - startBlockID: The block ID from which to start monitoring. - // - requiredEventEncodingVersion: The version of event encoding required for the subscription. - SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription - // SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID. - // Monitoring begins from the specified block height. The subscription streams status updates until the transaction - // reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of - // these final statuses, the subscription will automatically terminate. + // SubscribeTransactionStatuses subscribes to transaction status updates for a given transaction ID. Monitoring begins + // from the last block ID. The subscription streams status updates until the transaction reaches the final state + // ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). When the transaction reaches one of these + // final states, the subscription will automatically terminate. // // Parameters: // - ctx: The context to manage the subscription's lifecycle, including cancellation. // - txID: The unique identifier of the transaction to monitor. - // - startHeight: The block height from which to start monitoring. // - requiredEventEncodingVersion: The version of event encoding required for the subscription. - SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription - // SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID. - // Monitoring begins from the latest block. The subscription streams status updates until the transaction - // reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of - // these final statuses, the subscription will automatically terminate. - // - // Parameters: - // - ctx: The context to manage the subscription's lifecycle, including cancellation. - // - txID: The unique identifier of the transaction to monitor. - // - requiredEventEncodingVersion: The version of event encoding required for the subscription. - SubscribeTransactionStatusesFromLatest(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription - // SendAndSubscribeTransactionStatuses sends a transaction to the network and subscribes to its status updates. + SubscribeTransactionStatuses(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription + // SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates. // Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction - // reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription + // reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). Once the final status has been reached, the subscription // automatically terminates. // // Parameters: @@ -261,6 +239,14 @@ type TransactionResult struct { BlockHeight uint64 } +func (r *TransactionResult) IsExecuted() bool { + return r.Status == flow.TransactionStatusExecuted || r.Status == flow.TransactionStatusSealed +} + +func (r *TransactionResult) IsFinal() bool { + return r.Status == flow.TransactionStatusSealed || r.Status == flow.TransactionStatusExpired +} + func TransactionResultToMessage(result *TransactionResult) *access.TransactionResultResponse { return &access.TransactionResultResponse{ Status: entities.TransactionStatus(result.Status), diff --git a/access/mock/api.go b/access/mock/api.go index 13c35b293d3..5dd3065c1be 100644 --- a/access/mock/api.go +++ b/access/mock/api.go @@ -1363,12 +1363,12 @@ func (_m *API) SubscribeBlocksFromStartHeight(ctx context.Context, startHeight u return r0 } -// SubscribeTransactionStatusesFromLatest provides a mock function with given fields: ctx, txID, requiredEventEncodingVersion -func (_m *API) SubscribeTransactionStatusesFromLatest(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { +// SubscribeTransactionStatuses provides a mock function with given fields: ctx, txID, requiredEventEncodingVersion +func (_m *API) SubscribeTransactionStatuses(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { ret := _m.Called(ctx, txID, requiredEventEncodingVersion) if len(ret) == 0 { - panic("no return value specified for SubscribeTransactionStatusesFromLatest") + panic("no return value specified for SubscribeTransactionStatuses") } var r0 subscription.Subscription @@ -1383,46 +1383,6 @@ func (_m *API) SubscribeTransactionStatusesFromLatest(ctx context.Context, txID return r0 } -// SubscribeTransactionStatusesFromStartBlockID provides a mock function with given fields: ctx, txID, startBlockID, requiredEventEncodingVersion -func (_m *API) SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { - ret := _m.Called(ctx, txID, startBlockID, requiredEventEncodingVersion) - - if len(ret) == 0 { - panic("no return value specified for SubscribeTransactionStatusesFromStartBlockID") - } - - var r0 subscription.Subscription - if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier, flow.Identifier, entities.EventEncodingVersion) subscription.Subscription); ok { - r0 = rf(ctx, txID, startBlockID, requiredEventEncodingVersion) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(subscription.Subscription) - } - } - - return r0 -} - -// SubscribeTransactionStatusesFromStartHeight provides a mock function with given fields: ctx, txID, startHeight, requiredEventEncodingVersion -func (_m *API) SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { - ret := _m.Called(ctx, txID, startHeight, requiredEventEncodingVersion) - - if len(ret) == 0 { - panic("no return value specified for SubscribeTransactionStatusesFromStartHeight") - } - - var r0 subscription.Subscription - if rf, ok := ret.Get(0).(func(context.Context, flow.Identifier, uint64, entities.EventEncodingVersion) subscription.Subscription); ok { - r0 = rf(ctx, txID, startHeight, requiredEventEncodingVersion) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(subscription.Subscription) - } - } - - return r0 -} - // NewAPI creates a new instance of API. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewAPI(t interface { diff --git a/cmd/util/cmd/run-script/cmd.go b/cmd/util/cmd/run-script/cmd.go index 59646d0687a..98b5b0a3def 100644 --- a/cmd/util/cmd/run-script/cmd.go +++ b/cmd/util/cmd/run-script/cmd.go @@ -27,6 +27,8 @@ import ( "github.com/onflow/flow-go/module/metrics" ) +var ErrNotImplemented = errors.New("not implemented") + var ( flagPayloads string flagState string @@ -532,35 +534,18 @@ func (*api) SubscribeBlockDigestsFromLatest( return nil } -func (a *api) SubscribeTransactionStatusesFromStartBlockID( - _ context.Context, - _ flow.Identifier, - _ flow.Identifier, - _ entities.EventEncodingVersion, -) subscription.Subscription { - return nil -} - -func (a *api) SubscribeTransactionStatusesFromStartHeight( +func (a *api) SubscribeTransactionStatuses( _ context.Context, _ flow.Identifier, - _ uint64, _ entities.EventEncodingVersion, ) subscription.Subscription { - return nil + return subscription.NewFailedSubscription(ErrNotImplemented, "failed to call SubscribeTransactionStatuses") } -func (a *api) SubscribeTransactionStatusesFromLatest( - _ context.Context, - _ flow.Identifier, - _ entities.EventEncodingVersion, -) subscription.Subscription { - return nil -} func (a *api) SendAndSubscribeTransactionStatuses( _ context.Context, _ *flow.TransactionBody, _ entities.EventEncodingVersion, ) subscription.Subscription { - return nil + return subscription.NewFailedSubscription(ErrNotImplemented, "failed to call SendAndSubscribeTransactionStatuses") } diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index 918daf5e49b..f40448bb7a5 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -136,7 +136,7 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() { topic: TransactionStatusesTopic, arguments: models.Arguments{}, setupSubscription: func() { - s.setupSubscription(s.accessApi.On("SubscribeTransactionStatusesFromLatest", mock.Anything, mock.Anything, mock.Anything)) + s.setupSubscription(s.accessApi.On("SubscribeTransactionStatuses", mock.Anything, mock.Anything, mock.Anything)) }, assertExpectations: func() { s.stateStreamApi.AssertExpectations(s.T()) diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index bacbe18da4d..dc0a970a406 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -11,7 +11,6 @@ import ( "github.com/onflow/flow-go/access" commonmodels "github.com/onflow/flow-go/engine/access/rest/common/models" "github.com/onflow/flow-go/engine/access/rest/common/parser" - "github.com/onflow/flow-go/engine/access/rest/http/request" "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/model/flow" @@ -22,9 +21,7 @@ import ( // transactionStatusesArguments contains the arguments required for subscribing to transaction statuses type transactionStatusesArguments struct { - TxID flow.Identifier // ID of the transaction to monitor. - StartBlockID flow.Identifier // ID of the block to start subscription from - StartBlockHeight uint64 // Height of the block to start subscription from + TxID flow.Identifier // ID of the transaction to monitor. } // TransactionStatusesDataProvider is responsible for providing tx statuses @@ -86,15 +83,7 @@ func (p *TransactionStatusesDataProvider) createSubscription( ctx context.Context, args transactionStatusesArguments, ) subscription.Subscription { - if args.StartBlockID != flow.ZeroID { - return p.api.SubscribeTransactionStatusesFromStartBlockID(ctx, args.TxID, args.StartBlockID, entities.EventEncodingVersion_JSON_CDC_V0) - } - - if args.StartBlockHeight != request.EmptyHeight { - return p.api.SubscribeTransactionStatusesFromStartHeight(ctx, args.TxID, args.StartBlockHeight, entities.EventEncodingVersion_JSON_CDC_V0) - } - - return p.api.SubscribeTransactionStatusesFromLatest(ctx, args.TxID, entities.EventEncodingVersion_JSON_CDC_V0) + return p.api.SubscribeTransactionStatuses(ctx, args.TxID, entities.EventEncodingVersion_JSON_CDC_V0) } // handleResponse processes a tx statuses and sends the formatted response. @@ -130,14 +119,6 @@ func parseTransactionStatusesArguments( ) (transactionStatusesArguments, error) { var args transactionStatusesArguments - // Parse block arguments - startBlockID, startBlockHeight, err := ParseStartBlock(arguments) - if err != nil { - return args, err - } - args.StartBlockID = startBlockID - args.StartBlockHeight = startBlockHeight - if txIDIn, ok := arguments["tx_id"]; ok && txIDIn != "" { result, ok := txIDIn.(string) if !ok { diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index 82667d288b9..1c59bc4203d 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -2,8 +2,6 @@ package data_providers import ( "context" - "fmt" - "strconv" "testing" "time" @@ -95,43 +93,11 @@ func (s *TransactionStatusesProviderSuite) subscribeTransactionStatusesDataProvi return []testType{ { - name: "SubscribeTransactionStatusesFromStartBlockID happy path", - arguments: models.Arguments{ - "start_block_id": s.rootBlock.ID().String(), - }, - setupBackend: func(sub *ssmock.Subscription) { - s.api.On( - "SubscribeTransactionStatusesFromStartBlockID", - mock.Anything, - mock.Anything, - s.rootBlock.ID(), - entities.EventEncodingVersion_JSON_CDC_V0, - ).Return(sub).Once() - }, - expectedResponses: expectedResponses, - }, - { - name: "SubscribeTransactionStatusesFromStartHeight happy path", - arguments: models.Arguments{ - "start_block_height": strconv.FormatUint(s.rootBlock.Header.Height, 10), - }, - setupBackend: func(sub *ssmock.Subscription) { - s.api.On( - "SubscribeTransactionStatusesFromStartHeight", - mock.Anything, - mock.Anything, - s.rootBlock.Header.Height, - entities.EventEncodingVersion_JSON_CDC_V0, - ).Return(sub).Once() - }, - expectedResponses: expectedResponses, - }, - { - name: "SubscribeTransactionStatusesFromLatest happy path", + name: "SubscribeTransactionStatuses happy path", arguments: models.Arguments{}, setupBackend: func(sub *ssmock.Subscription) { s.api.On( - "SubscribeTransactionStatusesFromLatest", + "SubscribeTransactionStatuses", mock.Anything, mock.Anything, entities.EventEncodingVersion_JSON_CDC_V0, @@ -198,6 +164,52 @@ func (s *TransactionStatusesProviderSuite) expectedTransactionStatusesResponses( return expectedResponses } +// TestTransactionStatusesDataProvider_InvalidArguments tests the behavior of the transaction statuses data provider +// when invalid arguments are provided. It verifies that appropriate errors are returned +// for missing or conflicting arguments. +func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_InvalidArguments() { + ctx := context.Background() + send := make(chan interface{}) + + topic := TransactionStatusesTopic + + for _, test := range invalidTransactionStatusesArgumentsTestCases() { + s.Run(test.name, func() { + provider, err := NewTransactionStatusesDataProvider( + ctx, + s.log, + s.api, + "dummy-id", + s.linkGenerator, + topic, + test.arguments, + send, + ) + s.Require().Nil(provider) + s.Require().Error(err) + s.Require().Contains(err.Error(), test.expectedErrorMsg) + }) + } +} + +// invalidTransactionStatusesArgumentsTestCases returns a list of test cases with invalid argument combinations +// for testing the behavior of transaction statuses data providers. Each test case includes a name, +// a set of input arguments, and the expected error message that should be returned. +// +// The test cases cover scenarios such as: +// 1. Providing invalid 'tx_id' value. +func invalidTransactionStatusesArgumentsTestCases() []testErrType { + return []testErrType{ + { + name: "invalid 'tx_id' argument", + arguments: map[string]interface{}{ + "tx_id": "invalid_tx_id", + }, + expectedErrorMsg: "invalid ID format", + }, + } +} + // TestMessageIndexTransactionStatusesProviderResponse_HappyPath tests that MessageIndex values in response are strictly increasing. func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesProviderResponse_HappyPath() { ctx := context.Background() @@ -214,8 +226,7 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr sub.On("Err").Return(nil).Once() s.api.On( - "SubscribeTransactionStatusesFromStartBlockID", - mock.Anything, + "SubscribeTransactionStatuses", mock.Anything, mock.Anything, entities.EventEncodingVersion_JSON_CDC_V0, @@ -295,74 +306,3 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1") } } - -// TestTransactionStatusesDataProvider_InvalidArguments tests the behavior of the transaction statuses data provider -// when invalid arguments are provided. It verifies that appropriate errors are returned -// for missing or conflicting arguments. -func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_InvalidArguments() { - ctx := context.Background() - send := make(chan interface{}) - - topic := TransactionStatusesTopic - - for _, test := range invalidTransactionStatusesArgumentsTestCases() { - s.Run(test.name, func() { - provider, err := NewTransactionStatusesDataProvider( - ctx, - s.log, - s.api, - "dummy-id", - s.linkGenerator, - topic, - test.arguments, - send, - ) - s.Require().Nil(provider) - s.Require().Error(err) - s.Require().Contains(err.Error(), test.expectedErrorMsg) - }) - } -} - -// invalidTransactionStatusesArgumentsTestCases returns a list of test cases with invalid argument combinations -// for testing the behavior of transaction statuses data providers. Each test case includes a name, -// a set of input arguments, and the expected error message that should be returned. -// -// The test cases cover scenarios such as: -// 1. Providing both 'start_block_id' and 'start_block_height' simultaneously. -// 2. Providing invalid 'tx_id' value. -// 3. Providing invalid 'start_block_id' value. -// 4. Invalid 'start_block_id' argument. -func invalidTransactionStatusesArgumentsTestCases() []testErrType { - return []testErrType{ - { - name: "provide both 'start_block_id' and 'start_block_height' arguments", - arguments: models.Arguments{ - "start_block_id": unittest.BlockFixture().ID().String(), - "start_block_height": fmt.Sprintf("%d", unittest.BlockFixture().Header.Height), - }, - expectedErrorMsg: "can only provide either 'start_block_id' or 'start_block_height'", - }, - { - name: "invalid 'tx_id' argument", - arguments: map[string]interface{}{ - "tx_id": "invalid_tx_id", - }, - expectedErrorMsg: "invalid ID format", - }, - { - name: "invalid 'start_block_id' argument", - arguments: map[string]interface{}{ - "start_block_id": "invalid_block_id", - }, - expectedErrorMsg: "invalid ID format", - }, - { - name: "invalid 'start_block_height' argument", - arguments: map[string]interface{}{ - "start_block_height": "-1", - }, - expectedErrorMsg: "value must be an unsigned 64 bit integer", - }, - } -} diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 5ed2232b87b..855971e6d9f 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -143,16 +143,6 @@ func New(params Params) (*Backend, error) { } systemTxID := systemTx.ID() - transactionsLocalDataProvider := &TransactionsLocalDataProvider{ - state: params.State, - collections: params.Collections, - blocks: params.Blocks, - eventsIndex: params.EventsIndex, - txResultsIndex: params.TxResultsIndex, - systemTxID: systemTxID, - lastFullBlockHeight: params.LastFullBlockHeight, - } - b := &Backend{ state: params.State, BlockTracker: params.BlockTracker, @@ -231,33 +221,39 @@ func New(params Params) (*Backend, error) { } b.backendTransactions = backendTransactions{ - TransactionsLocalDataProvider: transactionsLocalDataProvider, - log: params.Log, - staticCollectionRPC: params.CollectionRPC, - chainID: params.ChainID, - transactions: params.Transactions, - txResultErrorMessages: params.TxResultErrorMessages, - transactionValidator: txValidator, - transactionMetrics: params.AccessMetrics, - retry: retry, - connFactory: params.ConnFactory, - previousAccessNodes: params.HistoricalAccessNodes, - nodeCommunicator: params.Communicator, - txResultCache: txResCache, - txResultQueryMode: params.TxResultQueryMode, - systemTx: systemTx, - systemTxID: systemTxID, - execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, + TransactionsLocalDataProvider: &TransactionsLocalDataProvider{ + state: params.State, + collections: params.Collections, + blocks: params.Blocks, + eventsIndex: params.EventsIndex, + txResultsIndex: params.TxResultsIndex, + systemTxID: systemTxID, + lastFullBlockHeight: params.LastFullBlockHeight, + }, + log: params.Log, + staticCollectionRPC: params.CollectionRPC, + chainID: params.ChainID, + transactions: params.Transactions, + txResultErrorMessages: params.TxResultErrorMessages, + transactionValidator: txValidator, + transactionMetrics: params.AccessMetrics, + retry: retry, + connFactory: params.ConnFactory, + previousAccessNodes: params.HistoricalAccessNodes, + nodeCommunicator: params.Communicator, + txResultCache: txResCache, + txResultQueryMode: params.TxResultQueryMode, + systemTx: systemTx, + systemTxID: systemTxID, + execNodeIdentitiesProvider: params.ExecNodeIdentitiesProvider, } // TODO: The TransactionErrorMessage interface should be reorganized in future, as it is implemented in backendTransactions but used in TransactionsLocalDataProvider, and its initialization is somewhat quirky. b.backendTransactions.txErrorMessages = b b.backendSubscribeTransactions = backendSubscribeTransactions{ - txLocalDataProvider: transactionsLocalDataProvider, backendTransactions: &b.backendTransactions, log: params.Log, - executionResults: params.ExecutionResults, subscriptionHandler: params.SubscriptionHandler, blockTracker: params.BlockTracker, sendTransaction: b.SendTransaction, diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go index ae6678e0a8b..a0eea742d3d 100644 --- a/engine/access/rpc/backend/backend_stream_transactions.go +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -5,307 +5,231 @@ import ( "errors" "fmt" + "go.uber.org/atomic" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/rs/zerolog" + "github.com/onflow/flow/protobuf/go/flow/entities" + "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/subscription" - "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" - "github.com/onflow/flow-go/state" - "github.com/onflow/flow-go/storage" - - "github.com/onflow/flow/protobuf/go/flow/entities" ) // sendTransaction defines a function type for sending a transaction. type sendTransaction func(ctx context.Context, tx *flow.TransactionBody) error -// backendSubscribeTransactions handles transaction subscriptions. +// backendSubscribeTransactions manages transaction subscriptions for monitoring transaction statuses. +// It provides functionalities to send transactions, subscribe to transaction status updates, +// and handle subscription lifecycles. type backendSubscribeTransactions struct { - txLocalDataProvider *TransactionsLocalDataProvider - backendTransactions *backendTransactions - executionResults storage.ExecutionResults log zerolog.Logger - + backendTransactions *backendTransactions subscriptionHandler *subscription.SubscriptionHandler blockTracker subscription.BlockTracker sendTransaction sendTransaction } -// transactionSubscriptionMetadata holds data representing the status state for each transaction subscription. -type transactionSubscriptionMetadata struct { - *access.TransactionResult - txReferenceBlockID flow.Identifier - blockWithTx *flow.Header - txExecuted bool - eventEncodingVersion entities.EventEncodingVersion - shouldTriggerPending bool -} - // SendAndSubscribeTransactionStatuses sends a transaction and subscribes to its status updates. -// It starts monitoring the status from the transaction's reference block ID. -// If the transaction cannot be sent or an error occurs during subscription creation, a failed subscription is returned. +// +// The subscription begins monitoring from the reference block specified in the transaction itself and +// streams updates until the transaction reaches a final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). +// Upon reaching a final state, the subscription automatically terminates. +// +// Parameters: +// - ctx: The context to manage the transaction sending and subscription lifecycle, including cancellation. +// - tx: The transaction body to be sent and monitored. +// - requiredEventEncodingVersion: The version of event encoding required for the subscription. +// +// If the transaction cannot be sent, the subscription will fail and return a failed subscription. func (b *backendSubscribeTransactions) SendAndSubscribeTransactionStatuses( ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion, ) subscription.Subscription { if err := b.sendTransaction(ctx, tx); err != nil { - b.log.Error().Err(err).Str("tx_id", tx.ID().String()).Msg("failed to send transaction") + b.log.Err(err).Str("tx_id", tx.ID().String()).Msg("failed to send transaction") return subscription.NewFailedSubscription(err, "failed to send transaction") } - return b.createSubscription(ctx, tx.ID(), tx.ReferenceBlockID, 0, tx.ReferenceBlockID, requiredEventEncodingVersion, true) + return b.createSubscription(ctx, tx.ID(), tx.ReferenceBlockID, tx.ReferenceBlockID, requiredEventEncodingVersion) } -// SubscribeTransactionStatusesFromStartHeight subscribes to the status updates of a transaction. -// Monitoring starts from the specified block height. -// If the block height cannot be determined or an error occurs during subscription creation, a failed subscription is returned. -func (b *backendSubscribeTransactions) SubscribeTransactionStatusesFromStartHeight( - ctx context.Context, - txID flow.Identifier, - startHeight uint64, - requiredEventEncodingVersion entities.EventEncodingVersion, -) subscription.Subscription { - return b.createSubscription(ctx, txID, flow.ZeroID, startHeight, flow.ZeroID, requiredEventEncodingVersion, false) -} - -// SubscribeTransactionStatusesFromStartBlockID subscribes to the status updates of a transaction. -// Monitoring starts from the specified block ID. -// If the block ID cannot be determined or an error occurs during subscription creation, a failed subscription is returned. -func (b *backendSubscribeTransactions) SubscribeTransactionStatusesFromStartBlockID( - ctx context.Context, - txID flow.Identifier, - startBlockID flow.Identifier, - requiredEventEncodingVersion entities.EventEncodingVersion, -) subscription.Subscription { - return b.createSubscription(ctx, txID, startBlockID, 0, flow.ZeroID, requiredEventEncodingVersion, false) -} - -// SubscribeTransactionStatusesFromLatest subscribes to the status updates of a transaction. -// Monitoring starts from the latest block. -// If the block cannot be retrieved or an error occurs during subscription creation, a failed subscription is returned. -func (b *backendSubscribeTransactions) SubscribeTransactionStatusesFromLatest( +// SubscribeTransactionStatuses subscribes to status updates for a given transaction ID. +// +// The subscription starts monitoring from the last sealed block. Updates are streamed +// until the transaction reaches a final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). +// The subscription terminates automatically once the final state is reached. +// +// Parameters: +// - ctx: The context to manage the subscription's lifecycle, including cancellation. +// - txID: The unique identifier of the transaction to monitor. +// - requiredEventEncodingVersion: The version of event encoding required for the subscription. +func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion, ) subscription.Subscription { - header, err := b.txLocalDataProvider.state.Sealed().Head() + header, err := b.backendTransactions.state.Sealed().Head() if err != nil { - b.log.Error().Err(err).Msg("failed to retrieve latest block") - return subscription.NewFailedSubscription(err, "failed to retrieve latest block") + // throw the exception as the node must have the current sealed block in storage + irrecoverable.Throw(ctx, err) + return subscription.NewFailedSubscription(err, "failed to subscribe to transaction") } - return b.createSubscription(ctx, txID, header.ID(), 0, flow.ZeroID, requiredEventEncodingVersion, false) + return b.createSubscription(ctx, txID, header.ID(), flow.ZeroID, requiredEventEncodingVersion) } -// createSubscription initializes a subscription for monitoring a transaction's status. -// If the start height cannot be determined, a failed subscription is returned. +// createSubscription initializes a transaction subscription for monitoring status updates. +// +// The subscription monitors the transaction's progress starting from the specified block ID. +// It streams updates until the transaction reaches a final state or an error occurs. +// +// Parameters: +// - ctx: Context to manage the subscription lifecycle. +// - txID: The unique identifier of the transaction to monitor. +// - startBlockID: The ID of the block to start monitoring from. +// - referenceBlockID: The ID of the transaction's reference block. +// - requiredEventEncodingVersion: The required version of event encoding. +// +// Returns: +// - subscription.Subscription: A subscription for monitoring transaction status updates. +// +// If the start height cannot be determined or current transaction state cannot be determined, a failed subscription is returned. func (b *backendSubscribeTransactions) createSubscription( ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, - startBlockHeight uint64, referenceBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion, - shouldTriggerPending bool, ) subscription.Subscription { - var nextHeight uint64 - var err error - - // Get height to start subscription from - if startBlockID == flow.ZeroID { - if nextHeight, err = b.blockTracker.GetStartHeightFromHeight(startBlockHeight); err != nil { - b.log.Error().Err(err).Uint64("block_height", startBlockHeight).Msg("failed to get start height") - return subscription.NewFailedSubscription(err, "failed to get start height") - } - } else { - if nextHeight, err = b.blockTracker.GetStartHeightFromBlockID(startBlockID); err != nil { - b.log.Error().Err(err).Str("block_id", startBlockID.String()).Msg("failed to get start height") - return subscription.NewFailedSubscription(err, "failed to get start height") - } - } - - // choose initial transaction status - initialStatus := flow.TransactionStatusUnknown - if shouldTriggerPending { - // The status of the first pending transaction should be returned immediately, as the transaction has already been sent. - // This should occur only once for each subscription. - initialStatus = flow.TransactionStatusPending + // Determine the height of the block to start the subscription from. + startHeight, err := b.blockTracker.GetStartHeightFromBlockID(startBlockID) + if err != nil { + b.log.Err(err).Str("block_id", startBlockID.String()).Msg("failed to get start height") + return subscription.NewFailedSubscription(err, "failed to get start height") } - txInfo := transactionSubscriptionMetadata{ - TransactionResult: &access.TransactionResult{ - TransactionID: txID, - BlockID: flow.ZeroID, - Status: initialStatus, - }, - txReferenceBlockID: referenceBlockID, - blockWithTx: nil, - eventEncodingVersion: requiredEventEncodingVersion, - shouldTriggerPending: shouldTriggerPending, + // Retrieve the current state of the transaction. + txInfo, err := newTransactionSubscriptionMetadata(ctx, b.backendTransactions, txID, referenceBlockID, requiredEventEncodingVersion) + if err != nil { + b.log.Err(err).Str("tx_id", txID.String()).Msg("failed to get current transaction state") + return subscription.NewFailedSubscription(err, "failed to get tx reference block ID") } - return b.subscriptionHandler.Subscribe(ctx, nextHeight, b.getTransactionStatusResponse(&txInfo)) + return b.subscriptionHandler.Subscribe(ctx, startHeight, b.getTransactionStatusResponse(txInfo)) } // getTransactionStatusResponse returns a callback function that produces transaction status // subscription responses based on new blocks. -func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *transactionSubscriptionMetadata) func(context.Context, uint64) (interface{}, error) { +func (b *backendSubscribeTransactions) getTransactionStatusResponse( + txInfo *transactionSubscriptionMetadata, +) func(context.Context, uint64) (interface{}, error) { + triggerMissingStatusesOnce := atomic.NewBool(false) + return func(ctx context.Context, height uint64) (interface{}, error) { err := b.checkBlockReady(height) if err != nil { return nil, err } - if txInfo.shouldTriggerPending { - return b.handlePendingStatus(txInfo) - } - - if b.isTransactionFinalStatus(txInfo) { - return nil, fmt.Errorf("transaction final status %s already reported: %w", txInfo.Status.String(), subscription.ErrEndOfData) + if triggerMissingStatusesOnce.CompareAndSwap(false, true) { + return b.generateResultsStatuses(txInfo.txResult, flow.TransactionStatusUnknown) } - // If on this step transaction block not available, search for it. - if txInfo.blockWithTx == nil { - // Search for transaction`s block information. - txInfo.blockWithTx, - txInfo.BlockID, - txInfo.BlockHeight, - txInfo.CollectionID, - err = b.searchForTransactionBlockInfo(height, txInfo) - - if err != nil { - if errors.Is(err, storage.ErrNotFound) { - return nil, fmt.Errorf("could not find block %d in storage: %w", height, subscription.ErrBlockNotReady) - } - - if !errors.Is(err, ErrTransactionNotInBlock) { - return nil, status.Errorf(codes.Internal, "could not get block %d: %v", height, err) - } - } + if txInfo.txResult.IsFinal() { + return nil, fmt.Errorf("transaction final status %s already reported: %w", txInfo.txResult.Status.String(), subscription.ErrEndOfData) } // Get old status here, as it could be replaced by status from founded tx result - prevTxStatus := txInfo.Status + prevTxStatus := txInfo.txResult.Status - // Check, if transaction executed and transaction result already available - if txInfo.blockWithTx != nil && !txInfo.txExecuted { - txResult, err := b.searchForTransactionResult(ctx, txInfo) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to get execution result for block %s: %v", txInfo.BlockID, err) + if err = txInfo.Refresh(ctx, height); err != nil { + if errors.Is(err, subscription.ErrBlockNotReady) { + return nil, err } - // If transaction result was found, fully replace it in metadata. New transaction status already included in result. - if txResult != nil { - txInfo.TransactionResult = txResult - //Fill in execution status for future usages - txInfo.txExecuted = true - } - } - - // If block with transaction was not found, get transaction status to check if it different from last status - if txInfo.Status, err = b.getTransactionStatus(ctx, txInfo, prevTxStatus); err != nil { - return nil, err + return nil, status.Errorf(codes.Internal, "failed to refresh transaction information: %v", err) } - // If the old and new transaction statuses are still the same, the status change should not be reported, so - // return here with no response. - if prevTxStatus == txInfo.Status { - return nil, nil - } - - return b.generateResultsWithMissingStatuses(txInfo, prevTxStatus) + return b.generateResultsStatuses(txInfo.txResult, prevTxStatus) } } -// handlePendingStatus handles the initial pending status for a transaction. -func (b *backendSubscribeTransactions) handlePendingStatus(txInfo *transactionSubscriptionMetadata) (interface{}, error) { - txInfo.shouldTriggerPending = false - return b.generateResultsWithMissingStatuses(txInfo, flow.TransactionStatusUnknown) -} - -// isTransactionFinalStatus checks if a transaction has reached a final state (Sealed or Expired). -func (b *backendSubscribeTransactions) isTransactionFinalStatus(txInfo *transactionSubscriptionMetadata) bool { - return txInfo.Status == flow.TransactionStatusSealed || txInfo.Status == flow.TransactionStatusExpired -} - -// getTransactionStatus determines the current status of a transaction based on its metadata -// and previous status. It derives the transaction status by analyzing the transaction's -// execution block, if available, or its reference block. -// -// No errors expected during normal operations. -func (b *backendSubscribeTransactions) getTransactionStatus(ctx context.Context, txInfo *transactionSubscriptionMetadata, prevTxStatus flow.TransactionStatus) (flow.TransactionStatus, error) { - txStatus := txInfo.Status - var err error - - if txInfo.blockWithTx == nil { - txStatus, err = b.txLocalDataProvider.DeriveUnknownTransactionStatus(txInfo.txReferenceBlockID) - } else if txStatus == prevTxStatus { - // When a block with the transaction is available, it is possible to receive a new transaction status while - // searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction - // statuses are the same, the current transaction status should be retrieved. - txStatus, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockWithTx.Height, txInfo.txExecuted) +// checkBlockReady checks if the given block height is valid and available based on the expected block status. +// Expected errors during normal operation: +// - subscription.ErrBlockNotReady: block for the given block height is not available. +func (b *backendSubscribeTransactions) checkBlockReady(height uint64) error { + // Get the highest available finalized block height + highestHeight, err := b.blockTracker.GetHighestHeight(flow.BlockStatusFinalized) + if err != nil { + return fmt.Errorf("could not get highest height for block %d: %w", height, err) } - if err != nil { - if !errors.Is(err, state.ErrUnknownSnapshotReference) { - irrecoverable.Throw(ctx, err) - } - return flow.TransactionStatusUnknown, rpc.ConvertStorageError(err) + // Fail early if no block finalized notification has been received for the given height. + // Note: It's possible that the block is locally finalized before the notification is + // received. This ensures a consistent view is available to all streams. + if height > highestHeight { + return fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady) } - return txStatus, nil + return nil } -// generateResultsWithMissingStatuses checks if the current result differs from the previous result by more than one step. +// generateResultsStatuses checks if the current result differs from the previous result by more than one step. // If yes, it generates results for the missing transaction statuses. This is done because the subscription should send // responses for each of the statuses in the transaction lifecycle, and the message should be sent in the order of transaction statuses. // Possible orders of transaction statuses: // 1. pending(1) -> finalized(2) -> executed(3) -> sealed(4) // 2. pending(1) -> expired(5) // No errors expected during normal operations. -func (b *backendSubscribeTransactions) generateResultsWithMissingStatuses( - txInfo *transactionSubscriptionMetadata, +func (b *backendSubscribeTransactions) generateResultsStatuses( + txResult *access.TransactionResult, prevTxStatus flow.TransactionStatus, ) ([]*access.TransactionResult, error) { + // If the old and new transaction statuses are still the same, the status change should not be reported, so + // return here with no response. + if prevTxStatus == txResult.Status { + return nil, nil + } + // If the previous status is pending and the new status is expired, which is the last status, return its result. // If the previous status is anything other than pending, return an error, as this transition is unexpected. - if txInfo.Status == flow.TransactionStatusExpired { + if txResult.Status == flow.TransactionStatusExpired { if prevTxStatus == flow.TransactionStatusPending { return []*access.TransactionResult{ - txInfo.TransactionResult, + txResult, }, nil } else { - return nil, fmt.Errorf("unexpected transition from %s to %s transaction status", prevTxStatus.String(), txInfo.Status.String()) + return nil, fmt.Errorf("unexpected transition from %s to %s transaction status", prevTxStatus.String(), txResult.Status.String()) } } var results []*access.TransactionResult // If the difference between statuses' values is more than one step, fill in the missing results. - if (txInfo.Status - prevTxStatus) > 1 { - for missingStatus := prevTxStatus + 1; missingStatus < txInfo.Status; missingStatus++ { + if (txResult.Status - prevTxStatus) > 1 { + for missingStatus := prevTxStatus + 1; missingStatus < txResult.Status; missingStatus++ { switch missingStatus { case flow.TransactionStatusPending: results = append(results, &access.TransactionResult{ Status: missingStatus, - TransactionID: txInfo.TransactionID, + TransactionID: txResult.TransactionID, }) case flow.TransactionStatusFinalized: results = append(results, &access.TransactionResult{ Status: missingStatus, - TransactionID: txInfo.TransactionID, - BlockID: txInfo.BlockID, - BlockHeight: txInfo.BlockHeight, - CollectionID: txInfo.CollectionID, + TransactionID: txResult.TransactionID, + BlockID: txResult.BlockID, + BlockHeight: txResult.BlockHeight, + CollectionID: txResult.CollectionID, }) case flow.TransactionStatusExecuted: - missingTxResult := *txInfo.TransactionResult + missingTxResult := *txResult missingTxResult.Status = missingStatus results = append(results, &missingTxResult) default: @@ -314,88 +238,6 @@ func (b *backendSubscribeTransactions) generateResultsWithMissingStatuses( } } - results = append(results, txInfo.TransactionResult) + results = append(results, txResult) return results, nil } - -// checkBlockReady checks if the given block height is valid and available based on the expected block status. -// Expected errors during normal operation: -// - subscription.ErrBlockNotReady: block for the given block height is not available. -func (b *backendSubscribeTransactions) checkBlockReady(height uint64) error { - // Get the highest available finalized block height - highestHeight, err := b.blockTracker.GetHighestHeight(flow.BlockStatusFinalized) - if err != nil { - return fmt.Errorf("could not get highest height for block %d: %w", height, err) - } - - // Fail early if no block finalized notification has been received for the given height. - // Note: It's possible that the block is locally finalized before the notification is - // received. This ensures a consistent view is available to all streams. - if height > highestHeight { - return fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady) - } - - return nil -} - -// searchForTransactionBlockInfo searches for the block containing the specified transaction. -// It retrieves the block at the given height and checks if the transaction is included in that block. -// Expected errors: -// - ErrTransactionNotInBlock when unable to retrieve the collection -// - codes.Internal when other errors occur during block or collection lookup -func (b *backendSubscribeTransactions) searchForTransactionBlockInfo( - height uint64, - txInfo *transactionSubscriptionMetadata, -) (*flow.Header, flow.Identifier, uint64, flow.Identifier, error) { - block, err := b.txLocalDataProvider.blocks.ByHeight(height) - if err != nil { - return nil, flow.ZeroID, 0, flow.ZeroID, fmt.Errorf("error looking up block: %w", err) - } - - collectionID, err := b.txLocalDataProvider.LookupCollectionIDInBlock(block, txInfo.TransactionID) - if err != nil { - return nil, flow.ZeroID, 0, flow.ZeroID, fmt.Errorf("error looking up transaction in block: %w", err) - } - - if collectionID != flow.ZeroID { - return block.Header, block.ID(), height, collectionID, nil - } - - return nil, flow.ZeroID, 0, flow.ZeroID, nil -} - -// searchForTransactionResult searches for the transaction result of a block. It retrieves the execution result for the specified block ID. -// Expected errors: -// - codes.Internal if an internal error occurs while retrieving execution result. -func (b *backendSubscribeTransactions) searchForTransactionResult( - ctx context.Context, - txInfo *transactionSubscriptionMetadata, -) (*access.TransactionResult, error) { - _, err := b.executionResults.ByBlockID(txInfo.BlockID) - if err != nil { - if errors.Is(err, storage.ErrNotFound) { - return nil, nil - } - return nil, fmt.Errorf("failed to get execution result for block %s: %w", txInfo.BlockID, err) - } - - txResult, err := b.backendTransactions.GetTransactionResult( - ctx, - txInfo.TransactionID, - txInfo.BlockID, - txInfo.CollectionID, - txInfo.eventEncodingVersion, - ) - - if err != nil { - // if either the storage or execution node reported no results or there were not enough execution results - if status.Code(err) == codes.NotFound { - // No result yet, indicate that it has not been executed - return nil, nil - } - // Other Error trying to retrieve the result, return with err - return nil, err - } - - return txResult, nil -} diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index 52049a4b0ef..a81eb4edad6 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -2,11 +2,20 @@ package backend import ( "context" + "errors" "fmt" "os" "testing" "time" + "github.com/onflow/flow-go/module/irrecoverable" + protocolint "github.com/onflow/flow-go/state/protocol" + + "github.com/onflow/flow-go/storage" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/dgraph-io/badger/v2" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -24,13 +33,13 @@ import ( connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" "github.com/onflow/flow-go/engine/access/subscription" subscriptionmock "github.com/onflow/flow-go/engine/access/subscription/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/counters" "github.com/onflow/flow-go/module/metrics" syncmock "github.com/onflow/flow-go/module/state_synchronization/mock" - protocolint "github.com/onflow/flow-go/state/protocol" protocol "github.com/onflow/flow-go/state/protocol/mock" bstorage "github.com/onflow/flow-go/storage/badger" storagemock "github.com/onflow/flow-go/storage/mock" @@ -40,6 +49,7 @@ import ( "github.com/onflow/flow/protobuf/go/flow/entities" ) +// TransactionStatusSuite represents a suite for testing transaction status-related functionality in the Flow blockchain. type TransactionStatusSuite struct { suite.Suite @@ -77,8 +87,7 @@ type TransactionStatusSuite struct { sealedBlock *flow.Block finalizedBlock *flow.Block - blockMap map[uint64]*flow.Block - resultsMap map[flow.Identifier]*flow.ExecutionResult + blockMap map[uint64]*flow.Block backend *Backend @@ -91,7 +100,7 @@ func TestTransactionStatusSuite(t *testing.T) { suite.Run(t, new(TransactionStatusSuite)) } -// SetupTest initializes the test suite with required dependencies. +// SetupTest initializes the test dependencies, configurations, and mock objects for TransactionStatusSuite tests. func (s *TransactionStatusSuite) SetupTest() { s.log = zerolog.New(zerolog.NewConsoleWriter()) s.state = protocol.NewState(s.T()) @@ -100,9 +109,6 @@ func (s *TransactionStatusSuite) SetupTest() { s.tempSnapshot = &protocol.Snapshot{} s.db, s.dbDir = unittest.TempBadgerDB(s.T()) - params := protocol.NewParams(s.T()) - s.state.On("Params").Return(params) - s.blocks = storagemock.NewBlocks(s.T()) s.headers = storagemock.NewHeaders(s.T()) s.transactions = storagemock.NewTransactions(s.T()) @@ -121,7 +127,26 @@ func (s *TransactionStatusSuite) SetupTest() { s.communicator = backendmock.NewCommunicator(s.T()) s.broadcaster = engine.NewBroadcaster() s.blockTracker = subscriptionmock.NewBlockTracker(s.T()) - s.resultsMap = map[flow.Identifier]*flow.ExecutionResult{} + s.reporter = syncmock.NewIndexReporter(s.T()) + s.indexReporter = index.NewReporter() + err := s.indexReporter.Initialize(s.reporter) + require.NoError(s.T(), err) + + s.initializeBackend() +} + +// TearDownTest cleans up the db +func (s *TransactionStatusSuite) TearDownTest() { + err := os.RemoveAll(s.dbDir) + s.Require().NoError(err) +} + +// initializeBackend sets up and initializes the backend with required dependencies, mocks, and configurations for testing. +func (s *TransactionStatusSuite) initializeBackend() { + s.transactions.On("Store", mock.Anything).Return(nil).Maybe() + + s.execClient.On("GetTransactionResult", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "not found")).Maybe() + s.connectionFactory.On("GetExecutionAPIClient", mock.Anything).Return(s.execClient, &mocks.MockCloser{}, nil).Maybe() s.colClient.On( "SendTransaction", @@ -129,12 +154,18 @@ func (s *TransactionStatusSuite) SetupTest() { mock.Anything, ).Return(&accessproto.SendTransactionResponse{}, nil).Maybe() - s.transactions.On("Store", mock.Anything).Return(nil).Maybe() - // generate blockCount consecutive blocks with associated seal, result and execution data s.rootBlock = unittest.BlockFixture() - rootResult := unittest.ExecutionResultFixture(unittest.WithBlock(&s.rootBlock)) - s.resultsMap[s.rootBlock.ID()] = rootResult + + params := protocol.NewParams(s.T()) + params.On("FinalizedRoot").Return(s.rootBlock.Header).Maybe() + s.state.On("Params").Return(params).Maybe() + + var receipts flow.ExecutionReceiptList + executionNodes := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) + receipts = unittest.ReceiptsForBlockFixture(&s.rootBlock, executionNodes.NodeIDs()) + s.receipts.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(receipts, nil).Maybe() + s.finalSnapshot.On("Identities", mock.Anything).Return(executionNodes, nil).Maybe() var err error s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( @@ -145,60 +176,16 @@ func (s *TransactionStatusSuite) SetupTest() { s.sealedBlock = &s.rootBlock s.finalizedBlock = unittest.BlockWithParentFixture(s.sealedBlock.Header) - finalizedResult := unittest.ExecutionResultFixture(unittest.WithBlock(s.finalizedBlock)) - s.resultsMap[s.finalizedBlock.ID()] = finalizedResult s.blockMap = map[uint64]*flow.Block{ s.sealedBlock.Header.Height: s.sealedBlock, s.finalizedBlock.Header.Height: s.finalizedBlock, } - s.reporter = syncmock.NewIndexReporter(s.T()) - s.indexReporter = index.NewReporter() - err = s.indexReporter.Initialize(s.reporter) - require.NoError(s.T(), err) - - s.blocks.On("ByHeight", mock.AnythingOfType("uint64")).Return(mocks.StorageMapGetter(s.blockMap)) - s.state.On("Final").Return(s.finalSnapshot, nil).Maybe() - s.state.On("AtBlockID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) protocolint.Snapshot { - s.tempSnapshot.On("Head").Unset() - s.tempSnapshot.On("Head").Return(func() *flow.Header { - for _, block := range s.blockMap { - if block.ID() == blockID { - return block.Header - } - } - - return nil - }, nil) - - return s.tempSnapshot - }, nil).Maybe() - - s.finalSnapshot.On("Head").Return(func() *flow.Header { - finalizedHeader := s.finalizedBlock.Header - return finalizedHeader - }, nil).Maybe() - - s.blockTracker.On("GetStartHeightFromBlockID", mock.Anything).Return(func(_ flow.Identifier) (uint64, error) { - finalizedHeader := s.finalizedBlock.Header - return finalizedHeader.Height, nil - }, nil) - s.blockTracker.On("GetHighestHeight", flow.BlockStatusFinalized).Return(func(_ flow.BlockStatus) (uint64, error) { - finalizedHeader := s.finalizedBlock.Header - return finalizedHeader.Height, nil - }, nil) - backendParams := s.backendParams() s.backend, err = New(backendParams) require.NoError(s.T(), err) } -// TearDownTest cleans up the db -func (s *TransactionStatusSuite) TearDownTest() { - err := os.RemoveAll(s.dbDir) - s.Require().NoError(err) -} - // backendParams returns the Params configuration for the backend. func (s *TransactionStatusSuite) backendParams() Params { return Params{ @@ -229,62 +216,87 @@ func (s *TransactionStatusSuite) backendParams() Params { TxResultQueryMode: IndexQueryModeLocalOnly, EventsIndex: index.NewEventsIndex(s.indexReporter, s.events), LastFullBlockHeight: s.lastFullBlockHeight, + ExecNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.state, + s.receipts, + nil, + nil, + ), + ConnFactory: s.connectionFactory, } } -func (s *TransactionStatusSuite) addNewFinalizedBlock(parent *flow.Header, notify bool, options ...func(*flow.Block)) { - s.finalizedBlock = unittest.BlockWithParentFixture(parent) - for _, option := range options { - option(s.finalizedBlock) - } +// initializeMainMockInstructions sets up the main mock behaviors for components used in TransactionStatusSuite tests. +func (s *TransactionStatusSuite) initializeMainMockInstructions() { + s.transactions.On("Store", mock.Anything).Return(nil).Maybe() - s.blockMap[s.finalizedBlock.Header.Height] = s.finalizedBlock + s.blocks.On("ByHeight", mock.AnythingOfType("uint64")).Return(mocks.StorageMapGetter(s.blockMap)).Maybe() + s.blocks.On("ByID", mock.Anything).Return( + func(blockID flow.Identifier) *flow.Block { + for _, block := range s.blockMap { + if block.ID() == blockID { + return block + } + } + return nil + }, + func(blockID flow.Identifier) error { + for _, block := range s.blockMap { + if block.ID() == blockID { + return nil + } + } + return errors.New("block not found") + }, + ).Maybe() - if notify { - s.broadcaster.Publish() - } + s.state.On("Final").Return(s.finalSnapshot, nil).Maybe() + s.state.On("AtBlockID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) protocolint.Snapshot { + s.tempSnapshot.On("Head").Unset() + s.tempSnapshot.On("Head").Return(func() *flow.Header { + for _, block := range s.blockMap { + if block.ID() == blockID { + return block.Header + } + } + + return nil + }, nil) + + return s.tempSnapshot + }, nil).Maybe() + + s.finalSnapshot.On("Head").Return(func() *flow.Header { + finalizedHeader := s.finalizedBlock.Header + return finalizedHeader + }, nil).Maybe() + + s.blockTracker.On("GetStartHeightFromBlockID", mock.Anything).Return(func(_ flow.Identifier) (uint64, error) { + finalizedHeader := s.finalizedBlock.Header + return finalizedHeader.Height, nil + }, nil).Maybe() + + s.blockTracker.On("GetHighestHeight", flow.BlockStatusFinalized).Return(func(_ flow.BlockStatus) (uint64, error) { + finalizedHeader := s.finalizedBlock.Header + return finalizedHeader.Height, nil + }, nil).Maybe() } -// TestSubscribeTransactionStatusHappyCase tests the functionality of the SubscribeTransactionStatusesFromStartBlockID method in the Backend. -// It covers the emulation of transaction stages from pending to sealed, and receiving status updates. -func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +// initializeHappyCaseMockInstructions sets up mock behaviors for a happy-case scenario in transaction status testing. +func (s *TransactionStatusSuite) initializeHappyCaseMockInstructions() { + s.initializeMainMockInstructions() - s.reporter.On("LowestIndexedHeight").Return(s.rootBlock.Header.Height, nil) + s.reporter.On("LowestIndexedHeight").Return(s.rootBlock.Header.Height, nil).Maybe() s.reporter.On("HighestIndexedHeight").Return(func() (uint64, error) { finalizedHeader := s.finalizedBlock.Header return finalizedHeader.Height, nil - }, nil) - s.blocks.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) (*flow.Block, error) { - for _, block := range s.blockMap { - if block.ID() == blockID { - return block, nil - } - } + }, nil).Maybe() - return nil, nil - }, nil) s.sealedSnapshot.On("Head").Return(func() *flow.Header { return s.sealedBlock.Header - }, nil) - s.state.On("Sealed").Return(s.sealedSnapshot, nil) - s.results.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(mocks.StorageMapGetter(s.resultsMap)) - - // Generate sent transaction with ref block of the current finalized block - transaction := unittest.TransactionFixture() - transaction.SetReferenceBlockID(s.finalizedBlock.ID()) - s.transactions.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(&transaction.TransactionBody, nil) - - col := flow.CollectionFromTransactions([]*flow.Transaction{&transaction}) - guarantee := col.Guarantee() - light := col.Light() - txId := transaction.ID() - txResult := flow.LightTransactionResult{ - TransactionID: txId, - Failed: false, - ComputationUsed: 0, - } + }, nil).Maybe() + s.state.On("Sealed").Return(s.sealedSnapshot, nil).Maybe() eventsForTx := unittest.EventsFixture(1, flow.EventAccountCreated) eventMessages := make([]*entities.Event, 1) @@ -296,102 +308,163 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { "ByBlockIDTransactionID", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("flow.Identifier"), - ).Return(eventsForTx, nil) + ).Return(eventsForTx, nil).Maybe() +} + +// initializeTransaction generate sent transaction with ref block of the current finalized block +func (s *TransactionStatusSuite) initializeTransaction() flow.Transaction { + transaction := unittest.TransactionFixture() + transaction.SetReferenceBlockID(s.finalizedBlock.ID()) + s.transactions.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(&transaction.TransactionBody, nil).Maybe() + return transaction +} +// addNewFinalizedBlock sets up a new finalized block using the provided parent header and options, and optionally notifies via broadcasting. +func (s *TransactionStatusSuite) addNewFinalizedBlock(parent *flow.Header, notify bool, options ...func(*flow.Block)) { + s.finalizedBlock = unittest.BlockWithParentFixture(parent) + for _, option := range options { + option(s.finalizedBlock) + } + + s.blockMap[s.finalizedBlock.Header.Height] = s.finalizedBlock + + if notify { + s.broadcaster.Publish() + } +} + +func (s *TransactionStatusSuite) mockTransactionResult(transactionID *flow.Identifier, hasTransactionResultInStorage *bool) { s.transactionResults.On( "ByBlockIDTransactionID", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("flow.Identifier"), - ).Return(&txResult, nil) - - // Create a special common function to read subscription messages from the channel and check converting it to transaction info - // and check results for correctness - checkNewSubscriptionMessage := func(sub subscription.Subscription, expectedTxStatus flow.TransactionStatus) { - unittest.RequireReturnsBefore(s.T(), func() { - v, ok := <-sub.Channel() - require.True(s.T(), ok, - "channel closed while waiting for transaction info:\n\t- txID %x\n\t- blockID: %x \n\t- err: %v", - txId, s.finalizedBlock.ID(), sub.Err()) - - txResults, ok := v.([]*accessapi.TransactionResult) - require.True(s.T(), ok, "unexpected response type: %T", v) - require.Len(s.T(), txResults, 1) - - result := txResults[0] + ).Return(func(blockID, txID flow.Identifier) (*flow.LightTransactionResult, error) { + if *hasTransactionResultInStorage { + return &flow.LightTransactionResult{ + TransactionID: *transactionID, + Failed: false, + ComputationUsed: 0, + }, nil + } + return nil, storage.ErrNotFound + }) +} + +func (s *TransactionStatusSuite) addBlockWithTransaction(transaction *flow.Transaction) { + col := flow.CollectionFromTransactions([]*flow.Transaction{transaction}) + colID := col.ID() + guarantee := col.Guarantee() + light := col.Light() + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true, func(block *flow.Block) { + block.SetPayload(unittest.PayloadFixture(unittest.WithGuarantees(&guarantee))) + s.collections.On("LightByID", colID).Return(&light, nil).Maybe() + s.collections.On("LightByTransactionID", transaction.ID()).Return(&light, nil) + s.blocks.On("ByCollectionID", colID).Return(block, nil) + }) +} + +// Create a special common function to read subscription messages from the channel and check converting it to transaction info +// and check results for correctness +func (s *TransactionStatusSuite) checkNewSubscriptionMessage(sub subscription.Subscription, txId flow.Identifier, expectedTxStatuses []flow.TransactionStatus) { + unittest.RequireReturnsBefore(s.T(), func() { + v, ok := <-sub.Channel() + require.True(s.T(), ok, + "channel closed while waiting for transaction info:\n\t- txID %x\n\t- blockID: %x \n\t- err: %v", + txId, s.finalizedBlock.ID(), sub.Err()) + + txResults, ok := v.([]*accessapi.TransactionResult) + require.True(s.T(), ok, "unexpected response type: %T", v) + require.Len(s.T(), txResults, len(expectedTxStatuses)) + + for i, expectedTxStatus := range expectedTxStatuses { + result := txResults[i] assert.Equal(s.T(), txId, result.TransactionID) assert.Equal(s.T(), expectedTxStatus, result.Status) - }, time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) - } + } + + }, time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) +} + +// checkGracefulShutdown ensures the provided subscription shuts down gracefully within a specified timeout duration. +func (s *TransactionStatusSuite) checkGracefulShutdown(sub subscription.Subscription) { + // Ensure subscription shuts down gracefully + unittest.RequireReturnsBefore(s.T(), func() { + <-sub.Channel() + assert.NoError(s.T(), sub.Err()) + }, 100*time.Millisecond, "timed out waiting for subscription to shutdown") +} + +// TestSendAndSubscribeTransactionStatusHappyCase tests the functionality of the SubscribeTransactionStatusesFromStartBlockID method in the Backend. +// It covers the emulation of transaction stages from pending to sealed, and receiving status updates. +func (s *TransactionStatusSuite) TestSendAndSubscribeTransactionStatusHappyCase() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.initializeHappyCaseMockInstructions() + + // Generate sent transaction with ref block of the current finalized block + transaction := s.initializeTransaction() + txId := transaction.ID() + + s.collections.On("LightByTransactionID", txId).Return(nil, storage.ErrNotFound).Once() + + hasTransactionResultInStorage := false + s.mockTransactionResult(&txId, &hasTransactionResultInStorage) // 1. Subscribe to transaction status and receive the first message with pending status sub := s.backend.SendAndSubscribeTransactionStatuses(ctx, &transaction.TransactionBody, entities.EventEncodingVersion_CCF_V0) - checkNewSubscriptionMessage(sub, flow.TransactionStatusPending) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusPending}) // 2. Make transaction reference block sealed, and add a new finalized block that includes the transaction - s.sealedBlock = s.finalizedBlock - s.addNewFinalizedBlock(s.sealedBlock.Header, true, func(block *flow.Block) { - block.SetPayload(unittest.PayloadFixture(unittest.WithGuarantees(&guarantee))) - s.collections.On("LightByID", mock.AnythingOfType("flow.Identifier")).Return(&light, nil).Maybe() - }) - checkNewSubscriptionMessage(sub, flow.TransactionStatusFinalized) + s.addBlockWithTransaction(&transaction) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusFinalized}) // 3. Add one more finalized block on top of the transaction block and add execution results to storage - finalizedResult := unittest.ExecutionResultFixture(unittest.WithBlock(s.finalizedBlock)) - s.resultsMap[s.finalizedBlock.ID()] = finalizedResult + // init transaction result for storage + hasTransactionResultInStorage = true s.addNewFinalizedBlock(s.finalizedBlock.Header, true) - checkNewSubscriptionMessage(sub, flow.TransactionStatusExecuted) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusExecuted}) // 4. Make the transaction block sealed, and add a new finalized block s.sealedBlock = s.finalizedBlock s.addNewFinalizedBlock(s.sealedBlock.Header, true) - checkNewSubscriptionMessage(sub, flow.TransactionStatusSealed) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusSealed}) - //// 5. Stop subscription + // 5. Stop subscription s.sealedBlock = s.finalizedBlock s.addNewFinalizedBlock(s.sealedBlock.Header, true) - // Ensure subscription shuts down gracefully - unittest.RequireReturnsBefore(s.T(), func() { - v, ok := <-sub.Channel() - assert.Nil(s.T(), v) - assert.False(s.T(), ok) - assert.NoError(s.T(), sub.Err()) - }, 100*time.Millisecond, "timed out waiting for subscription to shutdown") + s.checkGracefulShutdown(sub) } -// TestSubscribeTransactionStatusExpired tests the functionality of the SubscribeTransactionStatusesFromStartBlockID method in the Backend +// TestSendAndSubscribeTransactionStatusExpired tests the functionality of the SubscribeTransactionStatusesFromStartBlockID method in the Backend // when transaction become expired -func (s *TransactionStatusSuite) TestSubscribeTransactionStatusExpired() { +func (s *TransactionStatusSuite) TestSendAndSubscribeTransactionStatusExpired() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Generate sent transaction with ref block of the current finalized block - transaction := unittest.TransactionFixture() - transaction.SetReferenceBlockID(s.finalizedBlock.ID()) - txId := transaction.ID() + s.initializeMainMockInstructions() - // Create a special common function to read subscription messages from the channel and check converting it to transaction info - // and check results for correctness - checkNewSubscriptionMessage := func(sub subscription.Subscription, expectedTxStatus flow.TransactionStatus) { - unittest.RequireReturnsBefore(s.T(), func() { - v, ok := <-sub.Channel() - require.True(s.T(), ok, - "channel closed while waiting for transaction info:\n\t- txID %x\n\t- blockID: %x \n\t- err: %v", - txId, s.finalizedBlock.ID(), sub.Err()) - - txResults, ok := v.([]*accessapi.TransactionResult) - require.True(s.T(), ok, "unexpected response type: %T", v) - require.Len(s.T(), txResults, 1) + s.reporter.On("LowestIndexedHeight").Return(s.rootBlock.Header.Height, nil).Maybe() + s.reporter.On("HighestIndexedHeight").Return(func() (uint64, error) { + finalizedHeader := s.finalizedBlock.Header + return finalizedHeader.Height, nil + }, nil).Maybe() + s.transactionResults.On( + "ByBlockIDTransactionID", + mock.AnythingOfType("flow.Identifier"), + mock.AnythingOfType("flow.Identifier"), + ).Return(nil, storage.ErrNotFound).Maybe() - result := txResults[0] - assert.Equal(s.T(), txId, result.TransactionID) - assert.Equal(s.T(), expectedTxStatus, result.Status) - }, time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) - } + // Generate sent transaction with ref block of the current finalized block + transaction := s.initializeTransaction() + txId := transaction.ID() + s.collections.On("LightByTransactionID", txId).Return(nil, storage.ErrNotFound).Once() // Subscribe to transaction status and receive the first message with pending status sub := s.backend.SendAndSubscribeTransactionStatuses(ctx, &transaction.TransactionBody, entities.EventEncodingVersion_CCF_V0) - checkNewSubscriptionMessage(sub, flow.TransactionStatusPending) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusPending}) // Generate 600 blocks without transaction included and check, that transaction still pending startHeight := s.finalizedBlock.Header.Height + 1 @@ -404,17 +477,205 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusExpired() { // Generate final blocks and check transaction expired s.sealedBlock = s.finalizedBlock - s.addNewFinalizedBlock(s.sealedBlock.Header, true) err := s.lastFullBlockHeight.Set(s.sealedBlock.Header.Height) s.Require().NoError(err) + s.addNewFinalizedBlock(s.sealedBlock.Header, true) - checkNewSubscriptionMessage(sub, flow.TransactionStatusExpired) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusExpired}) - // Ensure subscription shuts down gracefully - unittest.RequireReturnsBefore(s.T(), func() { - v, ok := <-sub.Channel() - assert.Nil(s.T(), v) - assert.False(s.T(), ok) - assert.NoError(s.T(), sub.Err()) - }, 100*time.Millisecond, "timed out waiting for subscription to shutdown") + s.checkGracefulShutdown(sub) +} + +// TestSubscribeTransactionStatusWithCurrentPending verifies the subscription behavior for a transaction starting as pending. +func (s *TransactionStatusSuite) TestSubscribeTransactionStatusWithCurrentPending() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.initializeHappyCaseMockInstructions() + + transaction := s.initializeTransaction() + txId := transaction.ID() + s.collections.On("LightByTransactionID", txId).Return(nil, storage.ErrNotFound).Once() + + hasTransactionResultInStorage := false + s.mockTransactionResult(&txId, &hasTransactionResultInStorage) + + sub := s.backend.SubscribeTransactionStatuses(ctx, txId, entities.EventEncodingVersion_CCF_V0) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusPending}) + + s.addBlockWithTransaction(&transaction) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusFinalized}) + + hasTransactionResultInStorage = true + s.addNewFinalizedBlock(s.finalizedBlock.Header, true) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusExecuted}) + + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusSealed}) + + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + + s.checkGracefulShutdown(sub) +} + +// TestSubscribeTransactionStatusWithCurrentFinalized verifies the subscription behavior for a transaction starting as finalized. +func (s *TransactionStatusSuite) TestSubscribeTransactionStatusWithCurrentFinalized() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.initializeHappyCaseMockInstructions() + + transaction := s.initializeTransaction() + txId := transaction.ID() + + hasTransactionResultInStorage := false + s.mockTransactionResult(&txId, &hasTransactionResultInStorage) + + s.addBlockWithTransaction(&transaction) + + sub := s.backend.SubscribeTransactionStatuses(ctx, txId, entities.EventEncodingVersion_CCF_V0) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusPending, flow.TransactionStatusFinalized}) + + hasTransactionResultInStorage = true + s.addNewFinalizedBlock(s.finalizedBlock.Header, true) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusExecuted}) + + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusSealed}) + + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + + s.checkGracefulShutdown(sub) +} + +// TestSubscribeTransactionStatusWithCurrentExecuted verifies the subscription behavior for a transaction starting as executed. +func (s *TransactionStatusSuite) TestSubscribeTransactionStatusWithCurrentExecuted() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.initializeHappyCaseMockInstructions() + + transaction := s.initializeTransaction() + txId := transaction.ID() + + hasTransactionResultInStorage := false + s.mockTransactionResult(&txId, &hasTransactionResultInStorage) + + s.addBlockWithTransaction(&transaction) + + // 3. Add one more finalized block on top of the transaction block and add execution results to storage + // init transaction result for storage + hasTransactionResultInStorage = true + s.addNewFinalizedBlock(s.finalizedBlock.Header, true) + sub := s.backend.SubscribeTransactionStatuses(ctx, txId, entities.EventEncodingVersion_CCF_V0) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusPending, flow.TransactionStatusFinalized, flow.TransactionStatusExecuted}) + + // 4. Make the transaction block sealed, and add a new finalized block + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + s.checkNewSubscriptionMessage(sub, txId, []flow.TransactionStatus{flow.TransactionStatusSealed}) + + //// 5. Stop subscription + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + + s.checkGracefulShutdown(sub) +} + +// TestSubscribeTransactionStatusWithCurrentSealed verifies the subscription behavior for a transaction starting as sealed. +func (s *TransactionStatusSuite) TestSubscribeTransactionStatusWithCurrentSealed() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.initializeHappyCaseMockInstructions() + + transaction := s.initializeTransaction() + txId := transaction.ID() + + hasTransactionResultInStorage := false + s.mockTransactionResult(&txId, &hasTransactionResultInStorage) + + s.addBlockWithTransaction(&transaction) + + // init transaction result for storage + hasTransactionResultInStorage = true + s.addNewFinalizedBlock(s.finalizedBlock.Header, true) + + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + + sub := s.backend.SubscribeTransactionStatuses(ctx, txId, entities.EventEncodingVersion_CCF_V0) + + s.checkNewSubscriptionMessage( + sub, + txId, + []flow.TransactionStatus{ + flow.TransactionStatusPending, + flow.TransactionStatusFinalized, + flow.TransactionStatusExecuted, + flow.TransactionStatusSealed, + }, + ) + + // 5. Stop subscription + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + + s.checkGracefulShutdown(sub) +} + +// TestSubscribeTransactionStatusFailedSubscription verifies the behavior of subscription when transaction status fails. +// Ensures failure scenarios are handled correctly, such as missing sealed header, start height, or transaction by ID. +func (s *TransactionStatusSuite) TestSubscribeTransactionStatusFailedSubscription() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Generate sent transaction with ref block of the current finalized block + transaction := unittest.TransactionFixture() + transaction.SetReferenceBlockID(s.finalizedBlock.ID()) + txId := transaction.ID() + + s.Run("throws irrecoverable if sealed header not available", func() { + expectedError := storage.ErrNotFound + s.state.On("Sealed").Return(s.sealedSnapshot, nil).Once() + s.sealedSnapshot.On("Head").Return(nil, expectedError).Once() + + signalerCtx := irrecoverable.WithSignalerContext(ctx, + irrecoverable.NewMockSignalerContextExpectError(s.T(), ctx, expectedError)) + + sub := s.backend.SubscribeTransactionStatuses(signalerCtx, txId, entities.EventEncodingVersion_CCF_V0) + s.Assert().ErrorContains(sub.Err(), expectedError.Error()) + }) + + s.Run("if could not get start height", func() { + s.sealedSnapshot.On("Head").Return(func() *flow.Header { + return s.sealedBlock.Header + }, nil).Once() + s.state.On("Sealed").Return(s.sealedSnapshot, nil).Once() + expectedError := storage.ErrNotFound + s.blockTracker.On("GetStartHeightFromBlockID", s.sealedBlock.ID()).Return(uint64(0), expectedError).Once() + + sub := s.backend.SubscribeTransactionStatuses(ctx, txId, entities.EventEncodingVersion_CCF_V0) + s.Assert().ErrorContains(sub.Err(), expectedError.Error()) + }) + + s.Run("if could not get transaction by transaction ID", func() { + s.sealedSnapshot.On("Head").Return(func() *flow.Header { + return s.sealedBlock.Header + }, nil).Once() + s.state.On("Sealed").Return(s.sealedSnapshot, nil).Once() + s.blockTracker.On("GetStartHeightFromBlockID", mock.Anything).Return(func(_ flow.Identifier) (uint64, error) { + finalizedHeader := s.finalizedBlock.Header + return finalizedHeader.Height, nil + }, nil).Once() + expectedError := storage.ErrNotFound + s.transactions.On("ByID", txId).Return(nil, expectedError).Once() + + sub := s.backend.SubscribeTransactionStatuses(ctx, txId, entities.EventEncodingVersion_CCF_V0) + s.Assert().ErrorContains(sub.Err(), expectedError.Error()) + }) } diff --git a/engine/access/rpc/backend/backend_transactions.go b/engine/access/rpc/backend/backend_transactions.go index 9481d217f08..703465b4e9e 100644 --- a/engine/access/rpc/backend/backend_transactions.go +++ b/engine/access/rpc/backend/backend_transactions.go @@ -285,7 +285,7 @@ func (b *backendTransactions) GetTransactionResult( var txResult *access.TransactionResult // access node may not have the block if it hasn't yet been finalized, hence block can be nil at this point if block != nil { - txResult, err = b.lookupTransactionResult(ctx, txID, block, requiredEventEncodingVersion) + txResult, err = b.lookupTransactionResult(ctx, txID, block.Header, requiredEventEncodingVersion) if err != nil { return nil, rpc.ConvertError(err, "failed to retrieve result", codes.Internal) } @@ -621,7 +621,7 @@ func (b *backendTransactions) GetSystemTransactionResult(ctx context.Context, bl return nil, rpc.ConvertStorageError(err) } - return b.lookupTransactionResult(ctx, b.systemTxID, block, requiredEventEncodingVersion) + return b.lookupTransactionResult(ctx, b.systemTxID, block.Header, requiredEventEncodingVersion) } // Error returns: @@ -644,21 +644,21 @@ func (b *backendTransactions) lookupBlock(txID flow.Identifier) (*flow.Block, er func (b *backendTransactions) lookupTransactionResult( ctx context.Context, txID flow.Identifier, - block *flow.Block, + block *flow.Header, requiredEventEncodingVersion entities.EventEncodingVersion, ) (*access.TransactionResult, error) { var txResult *access.TransactionResult var err error switch b.txResultQueryMode { case IndexQueryModeExecutionNodesOnly: - txResult, err = b.getTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion) + txResult, err = b.GetTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion) case IndexQueryModeLocalOnly: txResult, err = b.GetTransactionResultFromStorage(ctx, block, txID, requiredEventEncodingVersion) case IndexQueryModeFailover: txResult, err = b.GetTransactionResultFromStorage(ctx, block, txID, requiredEventEncodingVersion) if err != nil { // If any error occurs with local storage - request transaction result from EN - txResult, err = b.getTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion) + txResult, err = b.GetTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion) } default: return nil, status.Errorf(codes.Internal, "unknown transaction result query mode: %v", b.txResultQueryMode) @@ -742,9 +742,9 @@ func (b *backendTransactions) registerTransactionForRetry(tx *flow.TransactionBo b.retry.RegisterTransaction(referenceBlock.Height, tx) } -func (b *backendTransactions) getTransactionResultFromExecutionNode( +func (b *backendTransactions) GetTransactionResultFromExecutionNode( ctx context.Context, - block *flow.Block, + block *flow.Header, transactionID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion, ) (*access.TransactionResult, error) { @@ -773,7 +773,7 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode( } // tx body is irrelevant to status if it's in an executed block - txStatus, err := b.DeriveTransactionStatus(block.Header.Height, true) + txStatus, err := b.DeriveTransactionStatus(block.Height, true) if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { irrecoverable.Throw(ctx, err) @@ -793,7 +793,7 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode( Events: events, ErrorMessage: resp.GetErrorMessage(), BlockID: blockID, - BlockHeight: block.Header.Height, + BlockHeight: block.Height, }, nil } diff --git a/engine/access/rpc/backend/transaction_subscription_metadata.go b/engine/access/rpc/backend/transaction_subscription_metadata.go new file mode 100644 index 00000000000..e8cfb48c5e1 --- /dev/null +++ b/engine/access/rpc/backend/transaction_subscription_metadata.go @@ -0,0 +1,315 @@ +package backend + +import ( + "context" + "errors" + + "github.com/onflow/flow-go/engine/access/subscription" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/common/rpc" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/state" + "github.com/onflow/flow-go/storage" + + "github.com/onflow/flow/protobuf/go/flow/entities" +) + +// transactionSubscriptionMetadata manages the state of a transaction subscription. +// +// This struct contains metadata for tracking a transaction's progress, including +// references to relevant blocks, collections, and transaction results. +type transactionSubscriptionMetadata struct { + blocks storage.Blocks + collections storage.Collections + txResult *access.TransactionResult + txReferenceBlockID flow.Identifier + blockWithTx *flow.Header + eventEncodingVersion entities.EventEncodingVersion + + backendTransactions *backendTransactions +} + +// newTransactionSubscriptionMetadata initializes a new metadata object for a transaction subscription. +// +// This function constructs a transaction metadata object used for tracking the transaction's progress +// and maintaining its state throughout execution. +// +// Parameters: +// - ctx: Context for managing the lifecycle of the operation. +// - backendTransactions: A reference to the backend transaction manager. +// - txID: The unique identifier of the transaction. +// - txReferenceBlockID: The ID of the transaction’s reference block. +// - eventEncodingVersion: The required version of event encoding. +// +// Returns: +// - *transactionSubscriptionMetadata: The initialized transaction metadata object. +// +// No errors expected during normal operations. +func newTransactionSubscriptionMetadata( + ctx context.Context, + backendTransactions *backendTransactions, + txID flow.Identifier, + txReferenceBlockID flow.Identifier, + eventEncodingVersion entities.EventEncodingVersion, +) (*transactionSubscriptionMetadata, error) { + txMetadata := &transactionSubscriptionMetadata{ + backendTransactions: backendTransactions, + txResult: &access.TransactionResult{TransactionID: txID}, + eventEncodingVersion: eventEncodingVersion, + blocks: backendTransactions.blocks, + collections: backendTransactions.collections, + } + + if err := txMetadata.initTransactionReferenceBlockID(txReferenceBlockID); err != nil { + return nil, err + } + + if err := txMetadata.initBlockInfo(); err != nil { + return nil, err + } + + if err := txMetadata.initTransactionResult(ctx); err != nil { + return nil, err + } + + return txMetadata, nil +} + +// initTransactionReferenceBlockID sets the reference block ID for the transaction. +// +// If the reference block ID is unset, it attempts to retrieve it from storage. +// +// Parameters: +// - txReferenceBlockID: The reference block ID of the transaction. +// +// No errors expected during normal operations. +func (tm *transactionSubscriptionMetadata) initTransactionReferenceBlockID(txReferenceBlockID flow.Identifier) error { + // Get referenceBlockID if it is not set + if txReferenceBlockID == flow.ZeroID { + tx, err := tm.backendTransactions.transactions.ByID(tm.txResult.TransactionID) + if err != nil { + return err + } + txReferenceBlockID = tx.ReferenceBlockID + } + + tm.txReferenceBlockID = txReferenceBlockID + + return nil +} + +// initBlockInfo determines the block that contains the transaction and updates metadata accordingly. +// +// This function searches the transaction’s collection and its corresponding block, updating +// relevant fields in the metadata object. +// +// Expected errors during normal operation: +// - `storage.ErrNotFound` if the collection or block cannot be retrieved. +func (tm *transactionSubscriptionMetadata) initBlockInfo() error { + collection, err := tm.collections.LightByTransactionID(tm.txResult.TransactionID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return nil + } + + return err + } + + tm.txResult.CollectionID = collection.ID() + + block, err := tm.blocks.ByCollectionID(tm.txResult.CollectionID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return nil + } + + return err + } + + tm.blockWithTx = block.Header + tm.txResult.BlockID = block.ID() + tm.txResult.BlockHeight = block.Header.Height + + return nil +} + +// initTransactionResult initializes the transaction result. +// +// Parameters: +// - ctx: Context for managing the operation lifecycle. +// +// No errors expected during normal operations. +func (tm *transactionSubscriptionMetadata) initTransactionResult(ctx context.Context) error { + if err := tm.refreshTransactionResult(ctx); err != nil { + return err + } + + // It is possible to receive a new transaction status while searching for the transaction result or do not find the + // transaction result at all, that is why transaction status must be filled after searching the transaction result + if err := tm.refreshStatus(ctx); err != nil { + return err + } + return nil +} + +// Refresh updates the transaction subscription metadata to reflect the latest state. +// +// Parameters: +// - ctx: Context for managing the operation lifecycle. +// - height: The block height used for searching transaction data. +// +// Expected errors during normal operation: +// - `ErrBlockNotReady` if the block at the given height is not found. +func (tm *transactionSubscriptionMetadata) Refresh(ctx context.Context, height uint64) error { + if err := tm.refreshCollection(height); err != nil { + return err + } + + if err := tm.refreshBlock(); err != nil { + if errors.Is(err, storage.ErrNotFound) { + return subscription.ErrBlockNotReady + } + } + + if err := tm.refreshTransactionResult(ctx); err != nil { + return err + } + + if err := tm.refreshStatus(ctx); err != nil { + return err + } + + return nil +} + +// refreshStatus updates the transaction's status based on its execution result. +// +// Parameters: +// - ctx: Context for managing the operation lifecycle. +// +// No errors expected during normal operations. +func (tm *transactionSubscriptionMetadata) refreshStatus(ctx context.Context) error { + var err error + + // Check, if transaction executed and transaction result already available + if tm.blockWithTx == nil { + tm.txResult.Status, err = tm.backendTransactions.DeriveUnknownTransactionStatus(tm.txReferenceBlockID) + if err != nil { + if !errors.Is(err, state.ErrUnknownSnapshotReference) { + irrecoverable.Throw(ctx, err) + } + return rpc.ConvertStorageError(err) + } + return nil + } + + // When a block with the transaction is available, it is possible to receive a new transaction status while + // searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction + // statuses are the same, the current transaction status should be retrieved. + tm.txResult.Status, err = tm.backendTransactions.DeriveTransactionStatus(tm.blockWithTx.Height, tm.txResult.IsExecuted()) + if err != nil { + if !errors.Is(err, state.ErrUnknownSnapshotReference) { + irrecoverable.Throw(ctx, err) + } + return rpc.ConvertStorageError(err) + } + return nil +} + +// refreshBlock updates the block metadata if the transaction has been included in a block. +// +// No errors expected during normal operations. +func (tm *transactionSubscriptionMetadata) refreshBlock() error { + if tm.txResult.CollectionID == flow.ZeroID || tm.blockWithTx != nil { + return nil + } + + block, err := tm.blocks.ByCollectionID(tm.txResult.CollectionID) + if err != nil { + return err + } + + tm.blockWithTx = block.Header + tm.txResult.BlockID = block.ID() + tm.txResult.BlockHeight = block.Header.Height + + return nil +} + +// refreshCollection updates the collection metadata if the transaction is included in a block. +// +// Parameters: +// - height: The block height at which the transaction is expected. +// +// Expected errors during normal operation: +// - `ErrTransactionNotInBlock` if the transaction is not found in the block. +func (tm *transactionSubscriptionMetadata) refreshCollection(height uint64) error { + if tm.txResult.CollectionID != flow.ZeroID { + return nil + } + + block, err := tm.blocks.ByHeight(height) + if err != nil { + return err + } + + collectionID, err := tm.backendTransactions.LookupCollectionIDInBlock(block, tm.txResult.TransactionID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return subscription.ErrBlockNotReady + } + + if !errors.Is(err, ErrTransactionNotInBlock) { + return err + } + } + + if collectionID != flow.ZeroID { + tm.txResult.CollectionID = collectionID + } + + return nil +} + +// refreshTransactionResult attempts to retrieve the transaction result from storage or an execution node. +// +// Parameters: +// - ctx: Context for managing the operation lifecycle. +// +// Expected errors during normal operation: +// - `codes.NotFound` if the transaction result is unavailable. +func (tm *transactionSubscriptionMetadata) refreshTransactionResult(ctx context.Context) error { + // skip check if we already have the result, or if we don't know which block it is in yet + if tm.blockWithTx == nil || tm.txResult.IsExecuted() { + return nil + } + + // Trying to get transaction result from local storage + txResult, err := tm.backendTransactions.GetTransactionResultFromStorage(ctx, tm.blockWithTx, tm.txResult.TransactionID, tm.eventEncodingVersion) + if err != nil { + // If any error occurs with local storage - request transaction result from EN + txResult, err = tm.backendTransactions.GetTransactionResultFromExecutionNode(ctx, tm.blockWithTx, tm.txResult.TransactionID, tm.eventEncodingVersion) + + if err != nil { + // if either the execution node reported no results + if status.Code(err) == codes.NotFound { + // No result yet, indicate that it has not been executed + return nil + } + + return err + } + } + + // If transaction result was found, fully replace it in metadata. New transaction status already included in result. + if txResult != nil { + tm.txResult = txResult + } + + return nil +} diff --git a/engine/access/rpc/backend/transactions_local_data_provider.go b/engine/access/rpc/backend/transactions_local_data_provider.go index 921580452ec..b056d0f2ace 100644 --- a/engine/access/rpc/backend/transactions_local_data_provider.go +++ b/engine/access/rpc/backend/transactions_local_data_provider.go @@ -69,20 +69,20 @@ type TransactionsLocalDataProvider struct { // getter or when deriving transaction status. func (t *TransactionsLocalDataProvider) GetTransactionResultFromStorage( ctx context.Context, - block *flow.Block, + block *flow.Header, transactionID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion, ) (*access.TransactionResult, error) { blockID := block.ID() - txResult, err := t.txResultsIndex.ByBlockIDTransactionID(blockID, block.Header.Height, transactionID) + txResult, err := t.txResultsIndex.ByBlockIDTransactionID(blockID, block.Height, transactionID) if err != nil { - return nil, rpc.ConvertIndexError(err, block.Header.Height, "failed to get transaction result") + return nil, rpc.ConvertIndexError(err, block.Height, "failed to get transaction result") } var txErrorMessage string var txStatusCode uint = 0 if txResult.Failed { - txErrorMessage, err = t.txErrorMessages.LookupErrorMessageByTransactionID(ctx, blockID, block.Header.Height, transactionID) + txErrorMessage, err = t.txErrorMessages.LookupErrorMessageByTransactionID(ctx, blockID, block.Height, transactionID) if err != nil { return nil, err } @@ -94,7 +94,7 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultFromStorage( txStatusCode = 1 // statusCode of 1 indicates an error and 0 indicates no error, the same as on EN } - txStatus, err := t.DeriveTransactionStatus(block.Header.Height, true) + txStatus, err := t.DeriveTransactionStatus(block.Height, true) if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { irrecoverable.Throw(ctx, err) @@ -102,9 +102,9 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultFromStorage( return nil, rpc.ConvertStorageError(err) } - events, err := t.eventsIndex.ByBlockIDTransactionID(blockID, block.Header.Height, transactionID) + events, err := t.eventsIndex.ByBlockIDTransactionID(blockID, block.Height, transactionID) if err != nil { - return nil, rpc.ConvertIndexError(err, block.Header.Height, "failed to get events") + return nil, rpc.ConvertIndexError(err, block.Height, "failed to get events") } // events are encoded in CCF format in storage. convert to JSON-CDC if requested @@ -122,7 +122,7 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultFromStorage( Events: events, ErrorMessage: txErrorMessage, BlockID: blockID, - BlockHeight: block.Header.Height, + BlockHeight: block.Height, }, nil }