From be7cc48ec05b755178acec6988b8b81cb7e7bf9d Mon Sep 17 00:00:00 2001 From: ptrus Date: Wed, 27 Nov 2024 21:10:59 +0100 Subject: [PATCH] storage: split runtime events related accounts into a separate table --- analyzer/consensus/consensus.go | 5 +- analyzer/consensus/messages.go | 12 +- analyzer/evmabibackfill/evm_abi_backfill.go | 18 +- analyzer/queries/queries.go | 18 +- analyzer/runtime/extract.go | 158 ++++++++++-------- analyzer/runtime/runtime.go | 15 +- analyzer/runtime/runtime_test.go | 4 +- analyzer/runtime/visitors.go | 23 +-- analyzer/util/addresses/addresses.go | 2 +- analyzer/util/addresses/registration.go | 72 +------- storage/client/queries/queries.go | 10 +- storage/migrations/01_runtimes.up.sql | 28 +++- .../12_runtime_events_related_accounts.up.sql | 28 ++++ storage/oasis/nodeapi/api.go | 6 +- storage/oasis/nodeapi/file/runtime.go | 32 +++- storage/oasis/nodeapi/universal_runtime.go | 5 +- 16 files changed, 257 insertions(+), 179 deletions(-) create mode 100644 storage/migrations/12_runtime_events_related_accounts.up.sql diff --git a/analyzer/consensus/consensus.go b/analyzer/consensus/consensus.go index 5b3abb4a5..ecbc52ccb 100644 --- a/analyzer/consensus/consensus.go +++ b/analyzer/consensus/consensus.go @@ -797,12 +797,15 @@ func (m *processor) queueRootHashMessageUpserts(batch *storage.QueryBatch, data // The runtime has its own staking account, which is what // performs these actions, e.g. when sending or receiving the // consensus token. Register that as related to the message. - if _, err := addresses.RegisterRelatedRuntimeAddress(messageData.addressPreimages, messageData.relatedAddresses, event.RoothashExecutorCommitted.RuntimeID); err != nil { + if runtimeAddr, err := addresses.RegisterRuntimeAddress(messageData.addressPreimages, event.RoothashExecutorCommitted.RuntimeID); err != nil { logger.Info("register runtime address failed", "runtime_id", event.RoothashExecutorCommitted.RuntimeID, "err", err, ) + } else { + messageData.relatedAddresses[runtimeAddr] = struct{}{} } + for addr, preimageData := range messageData.addressPreimages { batch.Queue(queries.AddressPreimageInsert, addr, diff --git a/analyzer/consensus/messages.go b/analyzer/consensus/messages.go index 5a9c43d39..4c1e0cfa8 100644 --- a/analyzer/consensus/messages.go +++ b/analyzer/consensus/messages.go @@ -35,13 +35,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData { break } messageData.body = body - _, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Transfer.To) + to, err := addresses.FromOCSAddress(m.Staking.Transfer.To) if err != nil { logger.Info("register related address 'to' failed", "message_type", messageData.messageType, "err", err, ) } + messageData.relatedAddresses[to] = struct{}{} case m.Staking.Withdraw != nil: messageData.messageType = apiTypes.RoothashMessageTypeStakingWithdraw body, err := json.Marshal(m.Staking.Withdraw) @@ -53,13 +54,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData { break } messageData.body = body - _, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Withdraw.From) + from, err := addresses.FromOCSAddress(m.Staking.Withdraw.From) if err != nil { logger.Info("register related address 'from' failed", "message_type", messageData.messageType, "err", err, ) } + messageData.relatedAddresses[from] = struct{}{} case m.Staking.AddEscrow != nil: messageData.messageType = apiTypes.RoothashMessageTypeStakingAddEscrow body, err := json.Marshal(m.Staking.AddEscrow) @@ -71,13 +73,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData { break } messageData.body = body - _, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.AddEscrow.Account) + account, err := addresses.FromOCSAddress(m.Staking.AddEscrow.Account) if err != nil { logger.Info("register related address 'account' failed", "message_type", messageData.messageType, "err", err, ) } + messageData.relatedAddresses[account] = struct{}{} case m.Staking.ReclaimEscrow != nil: messageData.messageType = apiTypes.RoothashMessageTypeStakingReclaimEscrow body, err := json.Marshal(m.Staking.ReclaimEscrow) @@ -89,13 +92,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData { break } messageData.body = body - _, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.ReclaimEscrow.Account) + account, err := addresses.FromOCSAddress(m.Staking.ReclaimEscrow.Account) if err != nil { logger.Info("register related address 'account' failed", "message_type", messageData.messageType, "err", err, ) } + messageData.relatedAddresses[account] = struct{}{} default: logger.Info("unhandled staking message", "staking_message", m.Staking, diff --git a/analyzer/evmabibackfill/evm_abi_backfill.go b/analyzer/evmabibackfill/evm_abi_backfill.go index 1fcf307a5..a5166f406 100644 --- a/analyzer/evmabibackfill/evm_abi_backfill.go +++ b/analyzer/evmabibackfill/evm_abi_backfill.go @@ -42,8 +42,13 @@ type abiEncodedTx struct { } type abiEncodedEvent struct { + // Event primary key. + Runtime common.Runtime Round uint64 TxIndex *int + Type string + TypeIndex int + EventBody sdkEVM.Event } @@ -122,8 +127,11 @@ func (p *processor) GetItems(ctx context.Context, limit uint64) ([]*abiEncodedIt if err = eventRows.Scan( &item.ContractAddr, &item.Abi, + &ev.Runtime, &ev.Round, &ev.TxIndex, + &ev.Type, + &ev.TypeIndex, &ev.EventBody, ); err != nil { return nil, fmt.Errorf("scanning verified contract event: %w", err) @@ -236,10 +244,11 @@ func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, if item.Event != nil { batch.Queue( queries.RuntimeEventEvmParsedFieldsUpdate, - p.runtime, + item.Event.Runtime, item.Event.Round, item.Event.TxIndex, - item.Event.EventBody, + item.Event.Type, + item.Event.TypeIndex, nil, nil, nil, @@ -267,10 +276,11 @@ func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, } batch.Queue( queries.RuntimeEventEvmParsedFieldsUpdate, - p.runtime, + item.Event.Runtime, item.Event.Round, item.Event.TxIndex, - item.Event.EventBody, + item.Event.Type, + item.Event.TypeIndex, eventName, eventArgs, eventSig, diff --git a/analyzer/queries/queries.go b/analyzer/queries/queries.go index c999aec46..e94424c17 100644 --- a/analyzer/queries/queries.go +++ b/analyzer/queries/queries.go @@ -636,22 +636,27 @@ var ( tx_hash = $2` RuntimeEventInsert = ` - INSERT INTO chain.runtime_events (runtime, round, tx_index, tx_hash, tx_eth_hash, timestamp, type, body, related_accounts, evm_log_name, evm_log_params, evm_log_signature) + INSERT INTO chain.runtime_events (runtime, round, tx_index, type, type_index, tx_hash, tx_eth_hash, timestamp, body, evm_log_name, evm_log_params, evm_log_signature) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)` + RuntimeEventRelatedAccountsInsert = ` + INSERT INTO chain.runtime_events_related_accounts (runtime, round, tx_index, type, type_index, account_address) + SELECT $1, $2, $3, $4, $5, unnest($6::text[])` + // We use COALESCE here to avoid overwriting existing data with null values. RuntimeEventEvmParsedFieldsUpdate = ` UPDATE chain.runtime_events SET - evm_log_name = COALESCE($5, evm_log_name), - evm_log_params = COALESCE($6, evm_log_params), - evm_log_signature = COALESCE($7, evm_log_signature), + evm_log_name = COALESCE($6, evm_log_name), + evm_log_params = COALESCE($7, evm_log_params), + evm_log_signature = COALESCE($8, evm_log_signature), abi_parsed_at = CURRENT_TIMESTAMP WHERE runtime = $1 AND round = $2 AND tx_index = $3 AND - body = $4` + type = $4 AND + type_index = $5` RuntimeMintInsert = ` INSERT INTO chain.runtime_transfers (runtime, round, sender, receiver, symbol, amount) @@ -1134,8 +1139,11 @@ var ( SELECT abi_contracts.addr, abi_contracts.abi, + evs.runtime, evs.round, evs.tx_index, + evs.type, + evs.type_index, evs.body FROM abi_contracts JOIN chain.address_preimages as preimages ON diff --git a/analyzer/runtime/extract.go b/analyzer/runtime/extract.go index 38441ae2a..8a74789c1 100644 --- a/analyzer/runtime/extract.go +++ b/analyzer/runtime/extract.go @@ -30,7 +30,6 @@ import ( "github.com/oasisprotocol/nexus/analyzer/runtime/encryption" evm "github.com/oasisprotocol/nexus/analyzer/runtime/evm" uncategorized "github.com/oasisprotocol/nexus/analyzer/uncategorized" - "github.com/oasisprotocol/nexus/analyzer/util" "github.com/oasisprotocol/nexus/analyzer/util/addresses" "github.com/oasisprotocol/nexus/analyzer/util/eth" apiTypes "github.com/oasisprotocol/nexus/api/v1/types" @@ -96,6 +95,7 @@ type EventData struct { TxHash *string // nil for non-tx events TxEthHash *string // nil for non-evm-tx events Type apiTypes.RuntimeEventType + TypeIndex int // Events of the same type within the block are ordered by index. Body EventBody WithScope ScopedSdkEvent EvmLogName *string @@ -232,25 +232,23 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime SwapSyncs: map[apiTypes.Address]*PossibleSwapSync{}, } - // Extract info from non-tx events. - rawNonTxEvents := []nodeapi.RuntimeEvent{} - for _, e := range rawEvents { - if e.TxHash.String() == util.ZeroTxHash { - rawNonTxEvents = append(rawNonTxEvents, e) - } - } - nonTxEvents, err := extractEvents(&blockData, map[apiTypes.Address]struct{}{}, rawNonTxEvents) + extractedEvents, err := extractEvents(&blockData, rawEvents) if err != nil { return nil, fmt.Errorf("extract non-tx events: %w", err) } - blockData.EventData = nonTxEvents + for _, event := range extractedEvents { + if event.TxIndex == nil { + blockData.EventData = append(blockData.EventData, event) + } + } // Extract info from transactions. for txIndex, txr := range txrs { txr := txr // For safe usage of `&txr` inside this long loop. var blockTransactionData BlockTransactionData blockTransactionData.Index = txIndex - blockTransactionData.Hash = txr.Tx.Hash().Hex() + txHash := txr.Tx.Hash() + blockTransactionData.Hash = txHash.Hex() if len(txr.Tx.AuthProofs) == 1 && txr.Tx.AuthProofs[0].Module == "evm.ethereum.v0" { ethHash := hex.EncodeToString(eth.Keccak256(txr.Tx.Body)) blockTransactionData.EthHash = ðHash @@ -277,10 +275,11 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime si := si // we have no dangerous uses of &si, but capture the variable just in case (and to make the linter happy) var blockTransactionSignerData BlockTransactionSignerData blockTransactionSignerData.Index = j - addr, err1 := addresses.RegisterRelatedAddressSpec(blockData.AddressPreimages, blockTransactionData.RelatedAccountAddresses, &si.AddressSpec) + addr, err1 := addresses.RegisterAddressSpec(blockData.AddressPreimages, &si.AddressSpec) if err1 != nil { return nil, fmt.Errorf("tx %d signer %d visit address spec: %w", txIndex, j, err1) } + blockTransactionData.RelatedAccountAddresses[addr] = struct{}{} blockTransactionSignerData.Address = addr blockTransactionSignerData.Nonce = int(si.Nonce) blockTransactionData.SignerData = append(blockTransactionData.SignerData, &blockTransactionSignerData) @@ -323,9 +322,10 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime blockTransactionData.Body = body amount = body.Amount.Amount blockTransactionData.AmountSymbol = common.Ptr(stringifyDenomination(sdkPT, body.Amount.Denomination)) - if to, err = addresses.RegisterRelatedSdkAddress(blockTransactionData.RelatedAccountAddresses, &body.To); err != nil { + if to, err = addresses.FromSdkAddress(&body.To); err != nil { return fmt.Errorf("to: %w", err) } + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} return nil }, ConsensusAccountsDeposit: func(body *consensusaccounts.Deposit) error { @@ -333,7 +333,7 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime amount = body.Amount.Amount blockTransactionData.AmountSymbol = common.Ptr(stringifyDenomination(sdkPT, body.Amount.Denomination)) if body.To != nil { - if to, err = addresses.RegisterRelatedSdkAddress(blockTransactionData.RelatedAccountAddresses, body.To); err != nil { + if to, err = addresses.FromSdkAddress(body.To); err != nil { return fmt.Errorf("to: %w", err) } } else { @@ -344,7 +344,7 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime } // Set the 'Success' field to 'Pending' for deposits. This is because the outcome of the Deposit tx is only known in the next block. blockTransactionData.Success = nil - + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} return nil }, ConsensusAccountsWithdraw: func(body *consensusaccounts.Withdraw) error { @@ -409,9 +409,10 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime // In Undelegate semantics, the inexistent `body.To` is implicitly the account that created this tx, i.e. the delegator R. // Ref: https://github.com/oasisprotocol/oasis-sdk/blob/eb97a8162f84ae81d11d805e6dceeeb016841c27/runtime-sdk/src/modules/consensus_accounts/mod.rs#L465-L465 // However, we instead expose `body.From` as the DB/API `to` for consistency with `Delegate`, and because it is more useful: the delegator R is already indexed in the tx sender field. - if to, err = addresses.RegisterRelatedSdkAddress(blockTransactionData.RelatedAccountAddresses, &body.From); err != nil { + if to, err = addresses.FromSdkAddress(&body.From); err != nil { return fmt.Errorf("from: %w", err) } + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} // The `amount` (of tokens) is not contained in the body, only `shares` is. There isn't sufficient information // to convert `shares` to `amount` until the undelegation actually happens (= UndelegateDone event); in the meantime, // the validator's token pool might change, e.g. because of slashing. @@ -428,9 +429,10 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime if !txr.Result.IsUnknown() && txr.Result.IsSuccess() && len(*ok) == 20 { // Decode address of newly-created contract // todo: is this rigorous enough? - if to, err = addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, blockTransactionData.RelatedAccountAddresses, *ok); err != nil { + if to, err = addresses.RegisterEthAddress(blockData.AddressPreimages, *ok); err != nil { return fmt.Errorf("created contract: %w", err) } + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} blockTransactionData.EVMContract = &evm.EVMContractData{ Address: to, CreationBytecode: body.InitCode, @@ -471,9 +473,10 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime EVMCall: func(body *sdkEVM.Call, ok *[]byte) error { blockTransactionData.Body = body amount = uncategorized.QuantityFromBytes(body.Value) - if to, err = addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, blockTransactionData.RelatedAccountAddresses, body.Address); err != nil { + if to, err = addresses.RegisterEthAddress(blockData.AddressPreimages, body.Address); err != nil { return fmt.Errorf("address: %w", err) } + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} if evmEncrypted, failedCallResult, err2 := evm.EVMMaybeUnmarshalEncryptedData(body.Data, ok); err2 == nil { blockTransactionData.EVMEncrypted = evmEncrypted // For non-evm txs as well as older Sapphire txs, the outer CallResult may @@ -524,9 +527,11 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime }, RoflUpdate: func(body *rofl.Update) error { blockTransactionData.Body = body - if _, err = addresses.RegisterRelatedSdkAddress(blockTransactionData.RelatedAccountAddresses, body.Admin); err != nil { - return fmt.Errorf("rofl.Update admin address: %w", err) + admin, err := addresses.FromSdkAddress(body.Admin) + if err != nil { + return fmt.Errorf("to: %w", err) } + blockTransactionData.RelatedAccountAddresses[admin] = struct{}{} return nil }, RoflRemove: func(body *rofl.Remove) error { @@ -549,13 +554,17 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime } blockTransactionData.Amount = common.Ptr(common.BigIntFromQuantity(amount)) } - txEvents := make([]nodeapi.RuntimeEvent, len(txr.Events)) - for i, e := range txr.Events { - txEvents[i] = (nodeapi.RuntimeEvent)(*e) - } - extractedTxEvents, err := extractEvents(&blockData, blockTransactionData.RelatedAccountAddresses, txEvents) - if err != nil { - return nil, fmt.Errorf("tx %d: %w", txIndex, err) + + // Find extracted events for this tx. + var extractedTxEvents []*EventData + for _, event := range extractedEvents { + if event.TxIndex != nil && *event.TxIndex == txIndex { + extractedTxEvents = append(extractedTxEvents, event) + // Register related addresses found in the event for the transaction as well. + for addr := range event.RelatedAddresses { + blockTransactionData.RelatedAccountAddresses[addr] = struct{}{} + } + } } txGasUsed, foundGasUsedEvent := sumGasUsed(extractedTxEvents) // Populate eventData with tx-specific data. @@ -694,13 +703,14 @@ func tryParseErrorMessage(errorModule string, errorCode uint32, msg string) *str return &sanitizedMsg } -func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Address]struct{}, eventsRaw []nodeapi.RuntimeEvent) ([]*EventData, error) { //nolint:gocyclo +func extractEvents(blockData *BlockData, eventsRaw []nodeapi.RuntimeEvent) ([]*EventData, error) { //nolint:gocyclo extractedEvents := []*EventData{} if err := VisitSdkEvents(eventsRaw, &SdkEventHandler{ - Core: func(event *core.Event) error { + Core: func(event *core.Event, eventIdx int) error { if event.GasUsed != nil { eventData := EventData{ Type: apiTypes.RuntimeEventTypeCoreGasUsed, + TypeIndex: eventIdx, Body: event.GasUsed, WithScope: ScopedSdkEvent{Core: event}, } @@ -708,18 +718,19 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad } return nil }, - Accounts: func(event *accounts.Event) error { + Accounts: func(event *accounts.Event, eventIdx int) error { if event.Transfer != nil { - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Transfer.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.Transfer.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Transfer.To) + toAddr, err1 := addresses.FromSdkAddress(&event.Transfer.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ Type: apiTypes.RuntimeEventTypeAccountsTransfer, + TypeIndex: eventIdx, Body: event.Transfer, WithScope: ScopedSdkEvent{Accounts: event}, RelatedAddresses: map[apiTypes.Address]struct{}{fromAddr: {}, toAddr: {}}, @@ -727,12 +738,13 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.Burn != nil { - ownerAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Burn.Owner) + ownerAddr, err1 := addresses.FromSdkAddress(&event.Burn.Owner) if err1 != nil { return fmt.Errorf("owner: %w", err1) } eventData := EventData{ Type: apiTypes.RuntimeEventTypeAccountsBurn, + TypeIndex: eventIdx, Body: event.Burn, WithScope: ScopedSdkEvent{Accounts: event}, RelatedAddresses: map[apiTypes.Address]struct{}{ownerAddr: {}}, @@ -740,12 +752,13 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.Mint != nil { - ownerAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Mint.Owner) + ownerAddr, err1 := addresses.FromSdkAddress(&event.Mint.Owner) if err1 != nil { return fmt.Errorf("owner: %w", err1) } eventData := EventData{ Type: apiTypes.RuntimeEventTypeAccountsMint, + TypeIndex: eventIdx, Body: event.Mint, WithScope: ScopedSdkEvent{Accounts: event}, RelatedAddresses: map[apiTypes.Address]struct{}{ownerAddr: {}}, @@ -754,19 +767,20 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad } return nil }, - ConsensusAccounts: func(event *consensusaccounts.Event) error { + ConsensusAccounts: func(event *consensusaccounts.Event, eventIndex int) error { if event.Deposit != nil { // NOTE: .From is a _consensus_ addr (not runtime). It's still related though. - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Deposit.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.Deposit.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Deposit.To) + toAddr, err1 := addresses.FromSdkAddress(&event.Deposit.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ Type: apiTypes.RuntimeEventTypeConsensusAccountsDeposit, + TypeIndex: eventIndex, Body: event.Deposit, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, RelatedAddresses: map[apiTypes.Address]struct{}{fromAddr: {}, toAddr: {}}, @@ -774,17 +788,18 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.Withdraw != nil { - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Withdraw.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.Withdraw.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } // NOTE: .To is a _consensus_ addr (not runtime). It's still related though. - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Withdraw.To) + toAddr, err1 := addresses.FromSdkAddress(&event.Withdraw.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ Type: apiTypes.RuntimeEventTypeConsensusAccountsWithdraw, + TypeIndex: eventIndex, Body: event.Withdraw, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, RelatedAddresses: map[apiTypes.Address]struct{}{fromAddr: {}, toAddr: {}}, @@ -794,16 +809,17 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad if event.Delegate != nil { // No dead reckoning needed; balance changes are signalled by other, co-emitted events. // See "LESSON" comment in the code that handles the Delegate tx. - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Delegate.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.Delegate.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Delegate.To) + toAddr, err1 := addresses.FromSdkAddress(&event.Delegate.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ Type: apiTypes.RuntimeEventTypeConsensusAccountsDelegate, + TypeIndex: eventIndex, Body: event.Delegate, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, RelatedAddresses: map[apiTypes.Address]struct{}{fromAddr: {}, toAddr: {}}, @@ -811,16 +827,17 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.UndelegateStart != nil { - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.UndelegateStart.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.UndelegateStart.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.UndelegateStart.To) + toAddr, err1 := addresses.FromSdkAddress(&event.UndelegateStart.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ Type: apiTypes.RuntimeEventTypeConsensusAccountsUndelegateStart, + TypeIndex: eventIndex, Body: event.UndelegateStart, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, RelatedAddresses: map[apiTypes.Address]struct{}{fromAddr: {}, toAddr: {}}, @@ -829,16 +846,17 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.UndelegateDone != nil { - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.UndelegateDone.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.UndelegateDone.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.UndelegateDone.To) + toAddr, err1 := addresses.FromSdkAddress(&event.UndelegateDone.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ Type: apiTypes.RuntimeEventTypeConsensusAccountsUndelegateDone, + TypeIndex: eventIndex, Body: event.UndelegateDone, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, RelatedAddresses: map[apiTypes.Address]struct{}{fromAddr: {}, toAddr: {}}, @@ -847,13 +865,14 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad } return nil }, - EVM: func(event *sdkEVM.Event) error { - eventAddr, err1 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, event.Address) + EVM: func(event *sdkEVM.Event, eventIndex int) error { + eventAddr, err1 := addresses.RegisterEthAddress(blockData.AddressPreimages, event.Address) if err1 != nil { return fmt.Errorf("event address: %w", err1) } eventData := EventData{ Type: apiTypes.RuntimeEventTypeEvmLog, + TypeIndex: eventIndex, Body: event, WithScope: ScopedSdkEvent{EVM: event}, RelatedAddresses: map[apiTypes.Address]struct{}{eventAddr: {}}, @@ -863,7 +882,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad fromZero := bytes.Equal(fromECAddr.Bytes(), eth.ZeroEthAddr) toZero := bytes.Equal(toECAddr.Bytes(), eth.ZeroEthAddr) if !fromZero { - fromAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, fromECAddr.Bytes()) + fromAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, fromECAddr.Bytes()) if err2 != nil { return fmt.Errorf("from: %w", err2) } @@ -871,7 +890,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad registerTokenDecrease(blockData.TokenBalanceChanges, eventAddr, fromAddr, value) } if !toZero { - toAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, toECAddr.Bytes()) + toAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, toECAddr.Bytes()) if err2 != nil { return fmt.Errorf("to: %w", err2) } @@ -915,14 +934,14 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad }, ERC20Approval: func(ownerECAddr ethCommon.Address, spenderECAddr ethCommon.Address, value *big.Int) error { if !bytes.Equal(ownerECAddr.Bytes(), eth.ZeroEthAddr) { - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } eventData.RelatedAddresses[ownerAddr] = struct{}{} } if !bytes.Equal(spenderECAddr.Bytes(), eth.ZeroEthAddr) { - spenderAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, spenderECAddr.Bytes()) + spenderAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, spenderECAddr.Bytes()) if err2 != nil { return fmt.Errorf("spender: %w", err2) } @@ -960,7 +979,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad var fromAddr, toAddr apiTypes.Address if !fromZero { var err2 error - fromAddr, err2 = addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, fromECAddr.Bytes()) + fromAddr, err2 = addresses.RegisterEthAddress(blockData.AddressPreimages, fromECAddr.Bytes()) if err2 != nil { return fmt.Errorf("from: %w", err2) } @@ -969,7 +988,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad } if !toZero { var err2 error - toAddr, err2 = addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, toECAddr.Bytes()) + toAddr, err2 = addresses.RegisterEthAddress(blockData.AddressPreimages, toECAddr.Bytes()) if err2 != nil { return fmt.Errorf("to: %w", err2) } @@ -1023,14 +1042,14 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad }, ERC721Approval: func(ownerECAddr ethCommon.Address, approvedECAddr ethCommon.Address, tokenID *big.Int) error { if !bytes.Equal(ownerECAddr.Bytes(), eth.ZeroEthAddr) { - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } eventData.RelatedAddresses[ownerAddr] = struct{}{} } if !bytes.Equal(approvedECAddr.Bytes(), eth.ZeroEthAddr) { - approvedAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, approvedECAddr.Bytes()) + approvedAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, approvedECAddr.Bytes()) if err2 != nil { return fmt.Errorf("approved: %w", err2) } @@ -1065,14 +1084,14 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad }, ERC721ApprovalForAll: func(ownerECAddr ethCommon.Address, operatorECAddr ethCommon.Address, approved bool) error { if !bytes.Equal(ownerECAddr.Bytes(), eth.ZeroEthAddr) { - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } eventData.RelatedAddresses[ownerAddr] = struct{}{} } if !bytes.Equal(operatorECAddr.Bytes(), eth.ZeroEthAddr) { - operatorAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, operatorECAddr.Bytes()) + operatorAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, operatorECAddr.Bytes()) if err2 != nil { return fmt.Errorf("operator: %w", err2) } @@ -1103,17 +1122,17 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad return nil }, IUniswapV2FactoryPairCreated: func(token0ECAddr ethCommon.Address, token1ECAddr ethCommon.Address, pairECAddr ethCommon.Address, allPairsLength *big.Int) error { - token0Addr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, token0ECAddr.Bytes()) + token0Addr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, token0ECAddr.Bytes()) if err != nil { return fmt.Errorf("token0: %w", err) } eventData.RelatedAddresses[token0Addr] = struct{}{} - token1Addr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, token1ECAddr.Bytes()) + token1Addr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, token1ECAddr.Bytes()) if err != nil { return fmt.Errorf("token1: %w", err) } eventData.RelatedAddresses[token1Addr] = struct{}{} - pairAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, pairECAddr.Bytes()) + pairAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, pairECAddr.Bytes()) if err != nil { return fmt.Errorf("pair: %w", err) } @@ -1154,7 +1173,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad return nil }, IUniswapV2PairMint: func(senderECAddr ethCommon.Address, amount0 *big.Int, amount1 *big.Int) error { - senderAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, senderECAddr.Bytes()) + senderAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, senderECAddr.Bytes()) if err != nil { return fmt.Errorf("sender: %w", err) } @@ -1185,12 +1204,12 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad return nil }, IUniswapV2PairBurn: func(senderECAddr ethCommon.Address, amount0 *big.Int, amount1 *big.Int, toECAddr ethCommon.Address) error { - senderAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, senderECAddr.Bytes()) + senderAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, senderECAddr.Bytes()) if err != nil { return fmt.Errorf("sender: %w", err) } eventData.RelatedAddresses[senderAddr] = struct{}{} - toAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, toECAddr.Bytes()) + toAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, toECAddr.Bytes()) if err != nil { return fmt.Errorf("to: %w", err) } @@ -1226,12 +1245,12 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad return nil }, IUniswapV2PairSwap: func(senderECAddr ethCommon.Address, amount0In *big.Int, amount1In *big.Int, amount0Out *big.Int, amount1Out *big.Int, toECAddr ethCommon.Address) error { - senderAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, senderECAddr.Bytes()) + senderAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, senderECAddr.Bytes()) if err != nil { return fmt.Errorf("sender: %w", err) } eventData.RelatedAddresses[senderAddr] = struct{}{} - toAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, toECAddr.Bytes()) + toAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, toECAddr.Bytes()) if err != nil { return fmt.Errorf("to: %w", err) } @@ -1308,7 +1327,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad WROSEDeposit: func(ownerECAddr ethCommon.Address, amount *big.Int) error { wrapperAddr := eventAddr // the WROSE wrapper contract is implicitly the address that emitted the contract - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } @@ -1355,7 +1374,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad WROSEWithdrawal: func(ownerECAddr ethCommon.Address, amount *big.Int) error { wrapperAddr := eventAddr // the WROSE wrapper contract is implicitly the address that emitted the contract - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } @@ -1392,10 +1411,11 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) return nil }, - Rofl: func(event *rofl.Event) error { + Rofl: func(event *rofl.Event, eventIndex int) error { if event.AppCreated != nil { eventData := EventData{ Type: apiTypes.RuntimeEventTypeRoflAppCreated, + TypeIndex: eventIndex, Body: event.AppCreated, WithScope: ScopedSdkEvent{Rofl: event}, } @@ -1404,6 +1424,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad if event.AppRemoved != nil { eventData := EventData{ Type: apiTypes.RuntimeEventTypeRoflAppRemoved, + TypeIndex: eventIndex, Body: event.AppRemoved, WithScope: ScopedSdkEvent{Rofl: event}, } @@ -1412,6 +1433,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad if event.AppUpdated != nil { eventData := EventData{ Type: apiTypes.RuntimeEventTypeRoflAppUpdated, + TypeIndex: eventIndex, Body: event.AppUpdated, WithScope: ScopedSdkEvent{Rofl: event}, } diff --git a/analyzer/runtime/runtime.go b/analyzer/runtime/runtime.go index 62511ccfc..10fb521d2 100644 --- a/analyzer/runtime/runtime.go +++ b/analyzer/runtime/runtime.go @@ -406,22 +406,31 @@ func (m *processor) queueDbUpdates(batch *storage.QueryBatch, data *BlockData) { // Insert events. for _, eventData := range data.EventData { - eventRelatedAddresses := addresses.SliceFromSet(eventData.RelatedAddresses) batch.Queue( queries.RuntimeEventInsert, m.runtime, data.Header.Round, eventData.TxIndex, + eventData.Type, + eventData.TypeIndex, eventData.TxHash, eventData.TxEthHash, data.Header.Timestamp, - eventData.Type, eventData.Body, - eventRelatedAddresses, eventData.EvmLogName, eventData.EvmLogParams, eventData.EvmLogSignature, ) + + batch.Queue( + queries.RuntimeEventRelatedAccountsInsert, + m.runtime, + data.Header.Round, + eventData.TxIndex, + eventData.Type, + eventData.TypeIndex, + addresses.SliceFromSet(eventData.RelatedAddresses), + ) } // Insert address preimages. diff --git a/analyzer/runtime/runtime_test.go b/analyzer/runtime/runtime_test.go index ae201d42e..cccdd4d64 100644 --- a/analyzer/runtime/runtime_test.go +++ b/analyzer/runtime/runtime_test.go @@ -97,9 +97,9 @@ func (mock *mockNode) GetEventsRaw(ctx context.Context, round uint64) ([]nodeapi // Include events that were part of transactions. txrs := mock.Txs[round] - for _, tx := range txrs { + for i, tx := range txrs { for _, ev := range tx.Events { - events = append(events, nodeapi.RuntimeEvent(*ev)) + events = append(events, nodeapi.RuntimeEvent{Event: ev, Index: uint64(i)}) } } diff --git a/analyzer/runtime/visitors.go b/analyzer/runtime/visitors.go index 29bdb28c5..176d0fa81 100644 --- a/analyzer/runtime/visitors.go +++ b/analyzer/runtime/visitors.go @@ -175,11 +175,11 @@ func VisitCall(call *sdkTypes.Call, result *sdkTypes.CallResult, handler *CallHa } type SdkEventHandler struct { - Core func(event *core.Event) error - Accounts func(event *accounts.Event) error - ConsensusAccounts func(event *consensusaccounts.Event) error - EVM func(event *evm.Event) error - Rofl func(event *rofl.Event) error + Core func(event *core.Event, eventIdx int) error + Accounts func(event *accounts.Event, eventIdx int) error + ConsensusAccounts func(event *consensusaccounts.Event, eventIdx int) error + EVM func(event *evm.Event, eventIdx int) error + Rofl func(event *rofl.Event, eventIdx int) error } func VisitSdkEvent(event *nodeapi.RuntimeEvent, handler *SdkEventHandler) error { @@ -189,7 +189,7 @@ func VisitSdkEvent(event *nodeapi.RuntimeEvent, handler *SdkEventHandler) error return fmt.Errorf("decode core: %w", err) } for i := range coreEvents { - if err = handler.Core(&coreEvents[i]); err != nil { + if err = handler.Core(&coreEvents[i], i); err != nil { return fmt.Errorf("decoded event %d core: %w", i, err) } } @@ -200,7 +200,7 @@ func VisitSdkEvent(event *nodeapi.RuntimeEvent, handler *SdkEventHandler) error return fmt.Errorf("decode accounts: %w", err) } for i := range accountEvents { - if err = handler.Accounts(&accountEvents[i]); err != nil { + if err = handler.Accounts(&accountEvents[i], i); err != nil { return fmt.Errorf("decoded event %d accounts: %w", i, err) } } @@ -211,7 +211,7 @@ func VisitSdkEvent(event *nodeapi.RuntimeEvent, handler *SdkEventHandler) error return fmt.Errorf("decode consensus accounts: %w", err) } for i := range consensusAccountsEvents { - if err = handler.ConsensusAccounts(&consensusAccountsEvents[i]); err != nil { + if err = handler.ConsensusAccounts(&consensusAccountsEvents[i], i); err != nil { return fmt.Errorf("decoded event %d consensus accounts: %w", i, err) } } @@ -222,7 +222,7 @@ func VisitSdkEvent(event *nodeapi.RuntimeEvent, handler *SdkEventHandler) error return fmt.Errorf("decode evm: %w", err) } for i := range evmEvents { - if err = handler.EVM(&evmEvents[i]); err != nil { + if err = handler.EVM(&evmEvents[i], i); err != nil { return fmt.Errorf("decoded event %d evm: %w", i, err) } } @@ -233,7 +233,7 @@ func VisitSdkEvent(event *nodeapi.RuntimeEvent, handler *SdkEventHandler) error return fmt.Errorf("decode rofl: %w", err) } for i := range roflEvents { - if err = handler.Rofl(&roflEvents[i]); err != nil { + if err = handler.Rofl(&roflEvents[i], i); err != nil { return fmt.Errorf("decoded event %d rofl: %w", i, err) } } @@ -242,8 +242,9 @@ func VisitSdkEvent(event *nodeapi.RuntimeEvent, handler *SdkEventHandler) error } func VisitSdkEvents(events []nodeapi.RuntimeEvent, handler *SdkEventHandler) error { + var err error for i := range events { - if err := VisitSdkEvent(&events[i], handler); err != nil { + if err = VisitSdkEvent(&events[i], handler); err != nil { return fmt.Errorf("event %d: %w; raw event: %+v", i, err, events[i]) } } diff --git a/analyzer/util/addresses/addresses.go b/analyzer/util/addresses/addresses.go index 6d19ff654..026ecaea6 100644 --- a/analyzer/util/addresses/addresses.go +++ b/analyzer/util/addresses/addresses.go @@ -60,7 +60,7 @@ func FromRuntimeID(id coreCommon.Namespace) (apiTypes.Address, error) { return FromOCSAddress(ocsAddr) } -func SliceFromSet(accounts map[apiTypes.Address]struct{}) []string { +func SliceFromSet(accounts map[apiTypes.Address]struct{}) []apiTypes.Address { addrs := make([]string, len(accounts)) i := 0 for a := range accounts { diff --git a/analyzer/util/addresses/registration.go b/analyzer/util/addresses/registration.go index 979aedefa..2575d86ec 100644 --- a/analyzer/util/addresses/registration.go +++ b/analyzer/util/addresses/registration.go @@ -58,7 +58,7 @@ func extractAddressPreimage(as *sdkTypes.AddressSpec) (*PreimageData, error) { }, nil } -func registerAddressSpec(addressPreimages map[apiTypes.Address]*PreimageData, as *sdkTypes.AddressSpec) (apiTypes.Address, error) { +func RegisterAddressSpec(addressPreimages map[apiTypes.Address]*PreimageData, as *sdkTypes.AddressSpec) (apiTypes.Address, error) { addr, err := FromAddressSpec(as) if err != nil { return "", err @@ -75,7 +75,7 @@ func registerAddressSpec(addressPreimages map[apiTypes.Address]*PreimageData, as return addr, nil } -func registerEthAddress(addressPreimages map[apiTypes.Address]*PreimageData, ethAddr []byte) (apiTypes.Address, error) { +func RegisterEthAddress(addressPreimages map[apiTypes.Address]*PreimageData, ethAddr []byte) (apiTypes.Address, error) { addr, err := FromEthAddress(ethAddr) if err != nil { return "", err @@ -92,7 +92,7 @@ func registerEthAddress(addressPreimages map[apiTypes.Address]*PreimageData, eth return addr, nil } -func registerRuntimeAddress(addressPreimages map[apiTypes.Address]*PreimageData, id coreCommon.Namespace) (apiTypes.Address, error) { +func RegisterRuntimeAddress(addressPreimages map[apiTypes.Address]*PreimageData, id coreCommon.Namespace) (apiTypes.Address, error) { addr, err := FromRuntimeID(id) if err != nil { return "", err @@ -112,69 +112,3 @@ func registerRuntimeAddress(addressPreimages map[apiTypes.Address]*PreimageData, return addr, nil } - -func RegisterRelatedSdkAddress(relatedAddresses map[apiTypes.Address]struct{}, sdkAddr *sdkTypes.Address) (apiTypes.Address, error) { - addr, err := FromSdkAddress(sdkAddr) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedAddressSpec(addressPreimages map[apiTypes.Address]*PreimageData, relatedAddresses map[apiTypes.Address]struct{}, as *sdkTypes.AddressSpec) (apiTypes.Address, error) { - addr, err := registerAddressSpec(addressPreimages, as) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedOCAddress(relatedAddresses map[apiTypes.Address]struct{}, ocAddr address.Address) (apiTypes.Address, error) { - addr, err := FromOCAddress(ocAddr) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedOCSAddress(relatedAddresses map[apiTypes.Address]struct{}, ocsAddr staking.Address) (apiTypes.Address, error) { - addr, err := FromOCSAddress(ocsAddr) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedEthAddress(addressPreimages map[apiTypes.Address]*PreimageData, relatedAddresses map[apiTypes.Address]struct{}, ethAddr []byte) (apiTypes.Address, error) { - addr, err := registerEthAddress(addressPreimages, ethAddr) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedRuntimeAddress(addressPreimages map[apiTypes.Address]*PreimageData, relatedAddresses map[apiTypes.Address]struct{}, id coreCommon.Namespace) (apiTypes.Address, error) { - addr, err := registerRuntimeAddress(addressPreimages, id) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} diff --git a/storage/client/queries/queries.go b/storage/client/queries/queries.go index 19c542819..c2449011d 100644 --- a/storage/client/queries/queries.go +++ b/storage/client/queries/queries.go @@ -590,6 +590,14 @@ const ( (evs.runtime=tokens.runtime) AND (preimages.address=tokens.token_address) AND (tokens.token_type IS NOT NULL) -- exclude token _candidates_ that we haven't inspected yet; we have no info about them (name, decimals, etc) + LEFT JOIN chain.runtime_events_related_accounts as rel ON + evs.runtime = rel.runtime AND + evs.round = rel.round AND + evs.tx_index = rel.tx_index AND + evs.type = rel.type AND + evs.type_index = rel.type_index AND + -- When related_address ($7) is NULL and hence we do no filtering on it, avoid the join altogether. + ($7::text IS NOT NULL) WHERE (evs.runtime = $1) AND ($2::bigint IS NULL OR evs.round = $2::bigint) AND @@ -597,7 +605,7 @@ const ( ($4::text IS NULL OR evs.tx_hash = $4::text OR evs.tx_eth_hash = $4::text) AND ($5::text IS NULL OR evs.type = $5::text) AND ($6::bytea IS NULL OR evs.evm_log_signature = $6::bytea) AND - ($7::text IS NULL OR evs.related_accounts @> ARRAY[$7::text]) AND + ($7::text IS NULL OR rel.account_address = $7::text) AND ($8::text IS NULL OR ( -- Currently this only supports EVM smart contracts. evs.type = 'evm.log' AND diff --git a/storage/migrations/01_runtimes.up.sql b/storage/migrations/01_runtimes.up.sql index dc9763702..113239991 100644 --- a/storage/migrations/01_runtimes.up.sql +++ b/storage/migrations/01_runtimes.up.sql @@ -136,18 +136,21 @@ CREATE TABLE chain.runtime_events ( runtime runtime NOT NULL, round UINT63 NOT NULL, + tx_index UINT31, - FOREIGN KEY (runtime, round, tx_index) REFERENCES chain.runtime_transactions(runtime, round, tx_index) DEFERRABLE INITIALLY DEFERRED, + type TEXT NOT NULL, + type_index UINT31 NOT NULL, -- The index within events of the same type emitted within the block. + + PRIMARY KEY (runtime, round, tx_index, type, type_index), + tx_hash HEX64, tx_eth_hash HEX64, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, - -- TODO: add link to openapi spec section with runtime event types. - type TEXT NOT NULL, + -- The raw event, as returned by the oasis-sdk runtime client. body JSONB NOT NULL, - related_accounts TEXT[], -- The events of type `evm.log` are further parsed into known event types, e.g. (ERC20) Transfer, -- to populate the `evm_log_name`, `evm_log_params`, and `evm_log_signature` fields. @@ -159,12 +162,14 @@ CREATE TABLE chain.runtime_events -- Internal tracking for parsing evm.Call events using the contract -- abi when available. - abi_parsed_at TIMESTAMP WITH TIME ZONE + abi_parsed_at TIMESTAMP WITH TIME ZONE, + + related_accounts TEXT[] -- Removed in 07_runtime_events_related_accounts.up.sql. ); CREATE INDEX ix_runtime_events_round ON chain.runtime_events(runtime, round); -- for sorting by round, when there are no filters applied CREATE INDEX ix_runtime_events_tx_hash ON chain.runtime_events USING hash (tx_hash); CREATE INDEX ix_runtime_events_tx_eth_hash ON chain.runtime_events USING hash (tx_eth_hash); -CREATE INDEX ix_runtime_events_related_accounts ON chain.runtime_events USING gin(related_accounts); -- for fetching account activity for a given account +CREATE INDEX ix_runtime_events_related_accounts ON chain.runtime_events USING gin(related_accounts); -- for fetching account activity for a given account -- Deleted in 08_runtime_events_related_accounts.up.sql. CREATE INDEX ix_runtime_events_evm_log_signature ON chain.runtime_events(runtime, evm_log_signature, round); -- for fetching a certain event type, eg Transfers CREATE INDEX ix_runtime_events_evm_log_params ON chain.runtime_events USING gin(evm_log_params); CREATE INDEX ix_runtime_events_type ON chain.runtime_events (runtime, type); @@ -173,12 +178,23 @@ CREATE INDEX ix_runtime_events_nft_transfers ON chain.runtime_events (runtime, ( type = 'evm.log' AND evm_log_signature = '\xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' AND jsonb_array_length(body -> 'topics') = 4; + -- Added in 07_runtime_events_evm_contracts_events.up.sql -- Index used for fetching events emitted by a specific contract. -- CREATE INDEX ix_runtime_events_evm_contract_events ON chain.runtime_events (runtime, (body ->> 'address'), evm_log_signature, round) -- WHERE -- type = 'evm.log'; +-- Added in 12_runtime_events_related_accounts.up.sql. +-- CREATE TABLE chain.runtime_events_related_accounts +-- ( +-- runtime runtime NOT NULL, +-- round UINT63 NOT NULL, +-- event_index UINT31 NOT NULL, +-- related_account TEXT NOT NULL +-- PRIMARY KEY (runtime, round, event_index, related_account), +-- ); + -- Roothash messages are small structures that a runtime can send to -- communicate with the consensus layer. They are agreed upon for each runtime -- block. We'll see the messages themselves in the proposal for that block, diff --git a/storage/migrations/12_runtime_events_related_accounts.up.sql b/storage/migrations/12_runtime_events_related_accounts.up.sql new file mode 100644 index 000000000..e0001715f --- /dev/null +++ b/storage/migrations/12_runtime_events_related_accounts.up.sql @@ -0,0 +1,28 @@ +BEGIN; + +CREATE TABLE chain.runtime_events_related_accounts +( + runtime runtime NOT NULL, + round UINT63 NOT NULL, + tx_index UINT31 NOT NULL, + type TEXT NOT NULL, + type_index UINT31 NOT NULL, + + account_address oasis_addr NOT NULL, + FOREIGN KEY (runtime, round, tx_index, type, type_index) REFERENCES chain.runtime_events(runtime, round, tx_index, type, type_index) DEFERRABLE INITIALLY DEFERRED +); + +-- Used for fetching all events related to an account (sorted by round). +CREATE INDEX ix_runtime_events_related_accounts_related_account_round ON chain.runtime_events_related_accounts(runtime, account_address, round); + +DROP INDEX IF EXISTS chain.ix_runtime_events_related_accounts; + +-- TODO: we need a more high-level ("go") migration for this. Basically do a re-index but just of the runtime_events table. +-- This is not something our migration framework currently supports. +-- If we plan to avoid the need for many reindexing in the future, we should consider implementing support for this. +-- Alternatively, we plan on doing more reindexing in future, then we can just wait with this change until we do the next reindexing. + +ALTER TABLE chain.runtime_events DROP COLUMN related_accounts; + + +COMMIT; diff --git a/storage/oasis/nodeapi/api.go b/storage/oasis/nodeapi/api.go index 49f72da77..4b78429c2 100644 --- a/storage/oasis/nodeapi/api.go +++ b/storage/oasis/nodeapi/api.go @@ -284,10 +284,14 @@ type RuntimeApiLite interface { } type ( - RuntimeEvent sdkTypes.Event RuntimeTransactionWithResults sdkClient.TransactionWithResults ) +type RuntimeEvent struct { + *sdkTypes.Event + Index uint64 +} + // Derived from oasis-core: roothash/api/block/header.go // Expanded to include the precomputed hash of the header; // we mustn't compute it on the fly because depending on the diff --git a/storage/oasis/nodeapi/file/runtime.go b/storage/oasis/nodeapi/file/runtime.go index a5f7e3fc1..664b6d89a 100644 --- a/storage/oasis/nodeapi/file/runtime.go +++ b/storage/oasis/nodeapi/file/runtime.go @@ -70,12 +70,40 @@ func (r *FileRuntimeApiLite) GetTransactionsWithResults(ctx context.Context, rou ) } +// nodeapi.RuntimeEvent changed from being an alias of sdkTypes.Event, to a struct which additionally contains the index of the event. +// To avoid invalidating the cache (causing the need to re-index all events), we keep using the old type in the caching backend and convert between +// the types on the fly. This is possible since the index of the event is the order in which the event is returned by the GetEventsRaw method. func (r *FileRuntimeApiLite) GetEventsRaw(ctx context.Context, round uint64) ([]nodeapi.RuntimeEvent, error) { - return kvstore.GetSliceFromCacheOrCall( + type CachedRuntimeEvent = sdkTypes.Event + + cachedEvents, err := kvstore.GetSliceFromCacheOrCall( r.db, round == roothash.RoundLatest, kvstore.GenerateCacheKey("GetEventsRaw", r.runtime, round), - func() ([]nodeapi.RuntimeEvent, error) { return r.runtimeApi.GetEventsRaw(ctx, round) }, + func() ([]CachedRuntimeEvent, error) { + rawEvents, err := r.runtimeApi.GetEventsRaw(ctx, round) + if err != nil { + return nil, err + } + cachedEvents := make([]CachedRuntimeEvent, len(rawEvents)) + for i, ev := range rawEvents { + cachedEvents[i] = *ev.Event + } + return cachedEvents, nil + }, ) + if err != nil { + return nil, err + } + + // Convert to nexus-internal type. + evs := make([]nodeapi.RuntimeEvent, len(cachedEvents)) + for i, ev := range cachedEvents { + evs[i] = nodeapi.RuntimeEvent{ + Event: &ev, + Index: uint64(i), + } + } + return evs, nil } func (r *FileRuntimeApiLite) GetBalances(ctx context.Context, round uint64, addr nodeapi.Address) (map[sdkTypes.Denomination]common.BigInt, error) { diff --git a/storage/oasis/nodeapi/universal_runtime.go b/storage/oasis/nodeapi/universal_runtime.go index 56e0c017e..fb66986ea 100644 --- a/storage/oasis/nodeapi/universal_runtime.go +++ b/storage/oasis/nodeapi/universal_runtime.go @@ -130,7 +130,10 @@ func (rc *UniversalRuntimeApiLite) GetEventsRaw(ctx context.Context, round uint6 // Convert to nexus-internal type. evs := make([]RuntimeEvent, len(rsp)) for i, ev := range rsp { - evs[i] = (RuntimeEvent)(*ev) + evs[i] = RuntimeEvent{ + Event: ev, + Index: uint64(i), + } } return evs, nil