From 96fe69c9ab20a6872867dd5034e98e333bb2b03c Mon Sep 17 00:00:00 2001 From: Warren He Date: Fri, 5 Jan 2024 16:43:51 -0800 Subject: [PATCH] consensus: store finalized roothash messages --- analyzer/consensus/consensus.go | 46 +++++++++++++++++++++++++++++++++ analyzer/queries/queries.go | 11 ++++++++ 2 files changed, 57 insertions(+) diff --git a/analyzer/consensus/consensus.go b/analyzer/consensus/consensus.go index 03dd48a9f..b13ca6218 100644 --- a/analyzer/consensus/consensus.go +++ b/analyzer/consensus/consensus.go @@ -627,8 +627,15 @@ func (m *processor) queueRegistryEventInserts(batch *storage.QueryBatch, data *r } func (m *processor) queueRootHashMessageUpserts(batch *storage.QueryBatch, data *rootHashData) error { + finalized := map[coreCommon.Namespace]uint64{} + var roothashMessageEvents []nodeapi.Event for _, event := range data.Events { switch { + case event.RoothashMisc != nil: + switch event.Type { //nolint:gocritic,exhaustive // singleCaseSwitch, no special handling for other types + case apiTypes.ConsensusEventTypeRoothashFinalized: + finalized[event.RoothashMisc.RuntimeID] = *event.RoothashMisc.Round + } case event.RoothashExecutorCommitted != nil: runtime := RuntimeFromID(event.RoothashExecutorCommitted.RuntimeID, m.network) if runtime == nil { @@ -655,6 +662,45 @@ func (m *processor) queueRootHashMessageUpserts(batch *storage.QueryBatch, data relatedAddresses, ) } + case event.RoothashMessage != nil: + // Save these for after we collect all roothash finalized events. + roothashMessageEvents = append(roothashMessageEvents, event) + } + } + for _, event := range roothashMessageEvents { + runtime := RuntimeFromID(event.RoothashMessage.RuntimeID, m.network) + if runtime == nil { + continue + } + batch.Queue(queries.ConsensusRoothashMessageFinalizeUpsert, + runtime, + finalized[event.RoothashMessage.RuntimeID], + event.RoothashMessage.Index, + event.RoothashMessage.Module, + event.RoothashMessage.Code, + nil, + ) + } + for rtid, results := range data.LastRoundResults { + runtime := RuntimeFromID(rtid, m.network) + if runtime == nil { + // We shouldn't even have gathered last round results for unknown + // runtimes. But prevent nil-runtime inserts anyway. + continue + } + round, ok := finalized[rtid] + if !ok { + continue + } + for _, message := range results.Messages { + batch.Queue(queries.ConsensusRoothashMessageFinalizeUpsert, + runtime, + round, + message.Index, + message.Module, + message.Code, + cbor.Marshal(message.Result), + ) } } diff --git a/analyzer/queries/queries.go b/analyzer/queries/queries.go index 11985c8d9..7b090cbfe 100644 --- a/analyzer/queries/queries.go +++ b/analyzer/queries/queries.go @@ -225,6 +225,17 @@ var ( body = excluded.body, related_accounts = excluded.related_accounts` + ConsensusRoothashMessageFinalizeUpsert = ` + INSERT INTO chain.roothash_messages + (runtime, round, message_index, error_module, error_code, result) + VALUES + ($1, $2, $3, $4, $5, $6) + ON CONFLICT (runtime, round, message_index) DO UPDATE + SET + error_module = excluded.error_module, + error_code = excluded.error_code, + result = excluded.result` + ConsensusAccountRelatedTransactionInsert = ` INSERT INTO chain.accounts_related_transactions (account_address, tx_block, tx_index) VALUES ($1, $2, $3)`