Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consensus roothash messages #599

Merged
merged 15 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/599.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
consensus: roothash messages
118 changes: 116 additions & 2 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
sdkConfig "github.com/oasisprotocol/oasis-sdk/client-sdk/go/config"

"github.com/oasisprotocol/nexus/analyzer/util/addresses"
"github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api/transaction"
genesis "github.com/oasisprotocol/nexus/coreapi/v22.2.11/genesis/api"
staking "github.com/oasisprotocol/nexus/coreapi/v22.2.11/staking/api"
Expand Down Expand Up @@ -286,8 +287,13 @@ func (m *processor) queueDbUpdates(batch *storage.QueryBatch, data allData) erro
}
}

if err := m.queueRootHashEventInserts(batch, data.RootHashData); err != nil {
return err
for _, f := range []func(*storage.QueryBatch, *rootHashData) error{
m.queueRootHashMessageUpserts,
m.queueRootHashEventInserts,
} {
if err := f(batch, data.RootHashData); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -648,6 +654,114 @@ func (m *processor) queueRegistryEventInserts(batch *storage.QueryBatch, data *r
return nil
}

func (m *processor) queueRootHashMessageUpserts(batch *storage.QueryBatch, data *rootHashData) error {
// Collect (I) roothash messages being scheduled and (II) roothash
// messages being finalized. They're always scheduled in the first
// ExecutorCommitedEvent (i.e. the proposal). They're finalized in (a)
// MessageEvent in Cobalt and in (b) the last round results in Damask and
// later.
finalized := map[coreCommon.Namespace]uint64{}
var roothashMessageEvents []nodeapi.Event
for _, event := range data.Events {
switch {
case event.RoothashMisc != nil:
switch event.Type { //nolint:gocritic,exhaustive // singleCaseSwitch, no special handling for other types
case apiTypes.ConsensusEventTypeRoothashFinalized:
// (II.a) MessageEvent does not have its own Round field, so
// use the value from the FinalizedEvent that happens at the
// same time.
finalized[event.RoothashMisc.RuntimeID] = *event.RoothashMisc.Round
}
case event.RoothashExecutorCommitted != nil:
runtime := RuntimeFromID(event.RoothashExecutorCommitted.RuntimeID, m.network)
if runtime == nil {
break
}
round := event.RoothashExecutorCommitted.Round
// (I) Extract roothash messages from the ExecutorCommittedEvent.
// Only the proposal has the messages, so the other commits will
// harmlessly skip over this part.
for i, message := range event.RoothashExecutorCommitted.Messages {
logger := m.logger.With(
"height", data.Height,
"runtime", runtime,
"round", round,
"message_index", i,
)
messageData := extractMessageData(logger, message)
// The runtime has its own staking account, which is what
// performs these actions, e.g. when sending or receiving the
// consensus token. Register that as related to the message.
if _, err := addresses.RegisterRelatedRuntimeAddress(messageData.addressPreimages, messageData.relatedAddresses, event.RoothashExecutorCommitted.RuntimeID); err != nil {
logger.Info("register runtime address failed",
"runtime_id", event.RoothashExecutorCommitted.RuntimeID,
"err", err,
)
}
for addr, preimageData := range messageData.addressPreimages {
batch.Queue(queries.AddressPreimageInsert,
addr,
preimageData.ContextIdentifier,
preimageData.ContextVersion,
preimageData.Data,
)
}
batch.Queue(queries.ConsensusRoothashMessageScheduleUpsert,
runtime,
round,
i,
messageData.messageType,
messageData.body,
addresses.SliceFromSet(messageData.relatedAddresses),
)
}
case event.RoothashMessage != nil:
// (II.a) Extract message results from the MessageEvents.
// Save these for after we collect all roothash finalized events.
roothashMessageEvents = append(roothashMessageEvents, event)
}
}
for _, event := range roothashMessageEvents {
pro-wh marked this conversation as resolved.
Show resolved Hide resolved
runtime := RuntimeFromID(event.RoothashMessage.RuntimeID, m.network)
if runtime == nil {
continue
}
batch.Queue(queries.ConsensusRoothashMessageFinalizeUpsert,
runtime,
finalized[event.RoothashMessage.RuntimeID],
event.RoothashMessage.Index,
event.RoothashMessage.Module,
event.RoothashMessage.Code,
nil,
)
}
for rtid, results := range data.LastRoundResults {
runtime := RuntimeFromID(rtid, m.network)
if runtime == nil {
// We shouldn't even have gathered last round results for unknown
// runtimes. But prevent nil-runtime inserts anyway.
continue
}
round, ok := finalized[rtid]
if !ok {
continue
}
// (II.b) Extract message results from the last round results.
for _, message := range results.Messages {
batch.Queue(queries.ConsensusRoothashMessageFinalizeUpsert,
runtime,
round,
message.Index,
message.Module,
message.Code,
cbor.Marshal(message.Result),
)
}
}

return nil
}

func (m *processor) queueRootHashEventInserts(batch *storage.QueryBatch, data *rootHashData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
Expand Down
30 changes: 25 additions & 5 deletions analyzer/consensus/data_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

beacon "github.com/oasisprotocol/nexus/coreapi/v22.2.11/beacon/api"
consensus "github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api"
roothash "github.com/oasisprotocol/nexus/coreapi/v22.2.11/roothash/api"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func fetchAllData(ctx context.Context, cc nodeapi.ConsensusApiLite, network sdkC
return nil, err
}

rootHashData, err := fetchRootHashData(ctx, cc, height)
rootHashData, err := fetchRootHashData(ctx, cc, network, height)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -276,16 +277,33 @@ func fetchGovernanceData(ctx context.Context, cc nodeapi.ConsensusApiLite, heigh
}, nil
}

// fetchRootHashData retrieves roothash events at the provided block height.
func fetchRootHashData(ctx context.Context, cc nodeapi.ConsensusApiLite, height int64) (*rootHashData, error) {
// fetchRootHashData retrieves roothash events and last round results at the
// provided block height.
func fetchRootHashData(ctx context.Context, cc nodeapi.ConsensusApiLite, network sdkConfig.Network, height int64) (*rootHashData, error) {
events, err := cc.RoothashEvents(ctx, height)
if err != nil {
return nil, err
}

lastRoundResults := make(map[coreCommon.Namespace]*roothash.RoundResults, len(network.ParaTimes.All))

for name := range network.ParaTimes.All {
var runtimeID coreCommon.Namespace
if err1 := runtimeID.UnmarshalHex(network.ParaTimes.All[name].ID); err1 != nil {
return nil, err1
}

res, err1 := cc.RoothashLastRoundResults(ctx, height, runtimeID)
if err1 != nil {
return nil, err1
}
lastRoundResults[runtimeID] = res
}

return &rootHashData{
Height: height,
Events: events,
Height: height,
Events: events,
LastRoundResults: lastRoundResults,
}, nil
}

Expand Down Expand Up @@ -352,6 +370,8 @@ type rootHashData struct {
Height int64

Events []nodeapi.Event

LastRoundResults map[coreCommon.Namespace]*roothash.RoundResults
}

// schedulerData represents data for elected committees and validators at a given height.
Expand Down
149 changes: 149 additions & 0 deletions analyzer/consensus/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package consensus

import (
"encoding/json"

"github.com/oasisprotocol/nexus/analyzer/util/addresses"
apiTypes "github.com/oasisprotocol/nexus/api/v1/types"
"github.com/oasisprotocol/nexus/coreapi/v22.2.11/roothash/api/message"
"github.com/oasisprotocol/nexus/log"
)

type MessageData struct {
messageType string
body json.RawMessage
addressPreimages map[apiTypes.Address]*addresses.PreimageData
relatedAddresses map[apiTypes.Address]struct{}
}

func extractMessageData(logger *log.Logger, m message.Message) MessageData {
messageData := MessageData{
addressPreimages: map[apiTypes.Address]*addresses.PreimageData{},
relatedAddresses: map[apiTypes.Address]struct{}{},
}
switch {
case m.Staking != nil:
switch {
case m.Staking.Transfer != nil:
messageData.messageType = "staking.transfer"
body, err := json.Marshal(m.Staking.Transfer)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Transfer.To)
if err != nil {
logger.Info("register related address 'to' failed",
"message_type", messageData.messageType,
"err", err,
)
}
case m.Staking.Withdraw != nil:
messageData.messageType = "staking.withdraw"
body, err := json.Marshal(m.Staking.Withdraw)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Withdraw.From)
if err != nil {
logger.Info("register related address 'from' failed",
"message_type", messageData.messageType,
"err", err,
)
}
case m.Staking.AddEscrow != nil:
messageData.messageType = "staking.add_escrow"
body, err := json.Marshal(m.Staking.AddEscrow)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.AddEscrow.Account)
if err != nil {
logger.Info("register related address 'account' failed",
"message_type", messageData.messageType,
"err", err,
)
}
case m.Staking.ReclaimEscrow != nil:
messageData.messageType = "staking.reclaim_escrow"
body, err := json.Marshal(m.Staking.ReclaimEscrow)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.ReclaimEscrow.Account)
if err != nil {
logger.Info("register related address 'account' failed",
"message_type", messageData.messageType,
"err", err,
)
}
default:
logger.Info("unhandled staking message",
"staking_message", m.Staking,
)
}
case m.Registry != nil:
switch {
case m.Registry.UpdateRuntime != nil:
messageData.messageType = "registry.update_runtime"
body, err := json.Marshal(m.Registry.UpdateRuntime)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
}
case m.Governance != nil:
switch {
case m.Governance.CastVote != nil:
messageData.messageType = "governance.cast_vote"
body, err := json.Marshal(m.Governance.CastVote)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
case m.Governance.SubmitProposal != nil:
messageData.messageType = "governance.submit_proposal"
body, err := json.Marshal(m.Governance.SubmitProposal)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
}
default:
logger.Info("unhandled message",
"message", m,
)
}
return messageData
}
22 changes: 22 additions & 0 deletions analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,28 @@ var (
INSERT INTO chain.events (type, body, tx_block, tx_hash, tx_index, related_accounts, roothash_runtime_id, roothash_runtime, roothash_runtime_round)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`

ConsensusRoothashMessageScheduleUpsert = `
INSERT INTO chain.roothash_messages
(runtime, round, message_index, type, body, related_accounts)
VALUES
($1, $2, $3, $4, $5, $6)
ON CONFLICT (runtime, round, message_index) DO UPDATE
SET
type = excluded.type,
body = excluded.body,
related_accounts = excluded.related_accounts`

ConsensusRoothashMessageFinalizeUpsert = `
INSERT INTO chain.roothash_messages
(runtime, round, message_index, error_module, error_code, result)
VALUES
($1, $2, $3, $4, $5, $6)
ON CONFLICT (runtime, round, message_index) DO UPDATE
SET
error_module = excluded.error_module,
error_code = excluded.error_code,
result = excluded.result`

ConsensusAccountRelatedTransactionInsert = `
INSERT INTO chain.accounts_related_transactions (account_address, tx_block, tx_index)
VALUES ($1, $2, $3)`
Expand Down
Loading
Loading