Skip to content

Commit

Permalink
Merge pull request #599 from oasisprotocol/pro-wh/feature/roothash2
Browse files Browse the repository at this point in the history
consensus roothash messages
  • Loading branch information
pro-wh authored Apr 10, 2024
2 parents 075641c + c655ca2 commit 8024fbd
Show file tree
Hide file tree
Showing 31 changed files with 573 additions and 29 deletions.
1 change: 1 addition & 0 deletions .changelog/599.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
consensus: roothash messages
118 changes: 116 additions & 2 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
sdkConfig "github.com/oasisprotocol/oasis-sdk/client-sdk/go/config"

"github.com/oasisprotocol/nexus/analyzer/util/addresses"
"github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api/transaction"
genesis "github.com/oasisprotocol/nexus/coreapi/v22.2.11/genesis/api"
staking "github.com/oasisprotocol/nexus/coreapi/v22.2.11/staking/api"
Expand Down Expand Up @@ -286,8 +287,13 @@ func (m *processor) queueDbUpdates(batch *storage.QueryBatch, data allData) erro
}
}

if err := m.queueRootHashEventInserts(batch, data.RootHashData); err != nil {
return err
for _, f := range []func(*storage.QueryBatch, *rootHashData) error{
m.queueRootHashMessageUpserts,
m.queueRootHashEventInserts,
} {
if err := f(batch, data.RootHashData); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -648,6 +654,114 @@ func (m *processor) queueRegistryEventInserts(batch *storage.QueryBatch, data *r
return nil
}

func (m *processor) queueRootHashMessageUpserts(batch *storage.QueryBatch, data *rootHashData) error {
// Collect (I) roothash messages being scheduled and (II) roothash
// messages being finalized. They're always scheduled in the first
// ExecutorCommitedEvent (i.e. the proposal). They're finalized in (a)
// MessageEvent in Cobalt and in (b) the last round results in Damask and
// later.
finalized := map[coreCommon.Namespace]uint64{}
var roothashMessageEvents []nodeapi.Event
for _, event := range data.Events {
switch {
case event.RoothashMisc != nil:
switch event.Type { //nolint:gocritic,exhaustive // singleCaseSwitch, no special handling for other types
case apiTypes.ConsensusEventTypeRoothashFinalized:
// (II.a) MessageEvent does not have its own Round field, so
// use the value from the FinalizedEvent that happens at the
// same time.
finalized[event.RoothashMisc.RuntimeID] = *event.RoothashMisc.Round
}
case event.RoothashExecutorCommitted != nil:
runtime := RuntimeFromID(event.RoothashExecutorCommitted.RuntimeID, m.network)
if runtime == nil {
break
}
round := event.RoothashExecutorCommitted.Round
// (I) Extract roothash messages from the ExecutorCommittedEvent.
// Only the proposal has the messages, so the other commits will
// harmlessly skip over this part.
for i, message := range event.RoothashExecutorCommitted.Messages {
logger := m.logger.With(
"height", data.Height,
"runtime", runtime,
"round", round,
"message_index", i,
)
messageData := extractMessageData(logger, message)
// The runtime has its own staking account, which is what
// performs these actions, e.g. when sending or receiving the
// consensus token. Register that as related to the message.
if _, err := addresses.RegisterRelatedRuntimeAddress(messageData.addressPreimages, messageData.relatedAddresses, event.RoothashExecutorCommitted.RuntimeID); err != nil {
logger.Info("register runtime address failed",
"runtime_id", event.RoothashExecutorCommitted.RuntimeID,
"err", err,
)
}
for addr, preimageData := range messageData.addressPreimages {
batch.Queue(queries.AddressPreimageInsert,
addr,
preimageData.ContextIdentifier,
preimageData.ContextVersion,
preimageData.Data,
)
}
batch.Queue(queries.ConsensusRoothashMessageScheduleUpsert,
runtime,
round,
i,
messageData.messageType,
messageData.body,
addresses.SliceFromSet(messageData.relatedAddresses),
)
}
case event.RoothashMessage != nil:
// (II.a) Extract message results from the MessageEvents.
// Save these for after we collect all roothash finalized events.
roothashMessageEvents = append(roothashMessageEvents, event)
}
}
for _, event := range roothashMessageEvents {
runtime := RuntimeFromID(event.RoothashMessage.RuntimeID, m.network)
if runtime == nil {
continue
}
batch.Queue(queries.ConsensusRoothashMessageFinalizeUpsert,
runtime,
finalized[event.RoothashMessage.RuntimeID],
event.RoothashMessage.Index,
event.RoothashMessage.Module,
event.RoothashMessage.Code,
nil,
)
}
for rtid, results := range data.LastRoundResults {
runtime := RuntimeFromID(rtid, m.network)
if runtime == nil {
// We shouldn't even have gathered last round results for unknown
// runtimes. But prevent nil-runtime inserts anyway.
continue
}
round, ok := finalized[rtid]
if !ok {
continue
}
// (II.b) Extract message results from the last round results.
for _, message := range results.Messages {
batch.Queue(queries.ConsensusRoothashMessageFinalizeUpsert,
runtime,
round,
message.Index,
message.Module,
message.Code,
cbor.Marshal(message.Result),
)
}
}

return nil
}

func (m *processor) queueRootHashEventInserts(batch *storage.QueryBatch, data *rootHashData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
Expand Down
30 changes: 25 additions & 5 deletions analyzer/consensus/data_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

beacon "github.com/oasisprotocol/nexus/coreapi/v22.2.11/beacon/api"
consensus "github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api"
roothash "github.com/oasisprotocol/nexus/coreapi/v22.2.11/roothash/api"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func fetchAllData(ctx context.Context, cc nodeapi.ConsensusApiLite, network sdkC
return nil, err
}

rootHashData, err := fetchRootHashData(ctx, cc, height)
rootHashData, err := fetchRootHashData(ctx, cc, network, height)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -276,16 +277,33 @@ func fetchGovernanceData(ctx context.Context, cc nodeapi.ConsensusApiLite, heigh
}, nil
}

// fetchRootHashData retrieves roothash events at the provided block height.
func fetchRootHashData(ctx context.Context, cc nodeapi.ConsensusApiLite, height int64) (*rootHashData, error) {
// fetchRootHashData retrieves roothash events and last round results at the
// provided block height.
func fetchRootHashData(ctx context.Context, cc nodeapi.ConsensusApiLite, network sdkConfig.Network, height int64) (*rootHashData, error) {
events, err := cc.RoothashEvents(ctx, height)
if err != nil {
return nil, err
}

lastRoundResults := make(map[coreCommon.Namespace]*roothash.RoundResults, len(network.ParaTimes.All))

for name := range network.ParaTimes.All {
var runtimeID coreCommon.Namespace
if err1 := runtimeID.UnmarshalHex(network.ParaTimes.All[name].ID); err1 != nil {
return nil, err1
}

res, err1 := cc.RoothashLastRoundResults(ctx, height, runtimeID)
if err1 != nil {
return nil, err1
}
lastRoundResults[runtimeID] = res
}

return &rootHashData{
Height: height,
Events: events,
Height: height,
Events: events,
LastRoundResults: lastRoundResults,
}, nil
}

Expand Down Expand Up @@ -352,6 +370,8 @@ type rootHashData struct {
Height int64

Events []nodeapi.Event

LastRoundResults map[coreCommon.Namespace]*roothash.RoundResults
}

// schedulerData represents data for elected committees and validators at a given height.
Expand Down
149 changes: 149 additions & 0 deletions analyzer/consensus/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package consensus

import (
"encoding/json"

"github.com/oasisprotocol/nexus/analyzer/util/addresses"
apiTypes "github.com/oasisprotocol/nexus/api/v1/types"
"github.com/oasisprotocol/nexus/coreapi/v22.2.11/roothash/api/message"
"github.com/oasisprotocol/nexus/log"
)

type MessageData struct {
messageType string
body json.RawMessage
addressPreimages map[apiTypes.Address]*addresses.PreimageData
relatedAddresses map[apiTypes.Address]struct{}
}

func extractMessageData(logger *log.Logger, m message.Message) MessageData {
messageData := MessageData{
addressPreimages: map[apiTypes.Address]*addresses.PreimageData{},
relatedAddresses: map[apiTypes.Address]struct{}{},
}
switch {
case m.Staking != nil:
switch {
case m.Staking.Transfer != nil:
messageData.messageType = "staking.transfer"
body, err := json.Marshal(m.Staking.Transfer)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Transfer.To)
if err != nil {
logger.Info("register related address 'to' failed",
"message_type", messageData.messageType,
"err", err,
)
}
case m.Staking.Withdraw != nil:
messageData.messageType = "staking.withdraw"
body, err := json.Marshal(m.Staking.Withdraw)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Withdraw.From)
if err != nil {
logger.Info("register related address 'from' failed",
"message_type", messageData.messageType,
"err", err,
)
}
case m.Staking.AddEscrow != nil:
messageData.messageType = "staking.add_escrow"
body, err := json.Marshal(m.Staking.AddEscrow)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.AddEscrow.Account)
if err != nil {
logger.Info("register related address 'account' failed",
"message_type", messageData.messageType,
"err", err,
)
}
case m.Staking.ReclaimEscrow != nil:
messageData.messageType = "staking.reclaim_escrow"
body, err := json.Marshal(m.Staking.ReclaimEscrow)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
_, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.ReclaimEscrow.Account)
if err != nil {
logger.Info("register related address 'account' failed",
"message_type", messageData.messageType,
"err", err,
)
}
default:
logger.Info("unhandled staking message",
"staking_message", m.Staking,
)
}
case m.Registry != nil:
switch {
case m.Registry.UpdateRuntime != nil:
messageData.messageType = "registry.update_runtime"
body, err := json.Marshal(m.Registry.UpdateRuntime)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
}
case m.Governance != nil:
switch {
case m.Governance.CastVote != nil:
messageData.messageType = "governance.cast_vote"
body, err := json.Marshal(m.Governance.CastVote)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
case m.Governance.SubmitProposal != nil:
messageData.messageType = "governance.submit_proposal"
body, err := json.Marshal(m.Governance.SubmitProposal)
if err != nil {
logger.Info("marshal message body failed",
"message_type", messageData.messageType,
"err", err,
)
break
}
messageData.body = body
}
default:
logger.Info("unhandled message",
"message", m,
)
}
return messageData
}
22 changes: 22 additions & 0 deletions analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,28 @@ var (
INSERT INTO chain.events (type, body, tx_block, tx_hash, tx_index, related_accounts, roothash_runtime_id, roothash_runtime, roothash_runtime_round)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`

ConsensusRoothashMessageScheduleUpsert = `
INSERT INTO chain.roothash_messages
(runtime, round, message_index, type, body, related_accounts)
VALUES
($1, $2, $3, $4, $5, $6)
ON CONFLICT (runtime, round, message_index) DO UPDATE
SET
type = excluded.type,
body = excluded.body,
related_accounts = excluded.related_accounts`

ConsensusRoothashMessageFinalizeUpsert = `
INSERT INTO chain.roothash_messages
(runtime, round, message_index, error_module, error_code, result)
VALUES
($1, $2, $3, $4, $5, $6)
ON CONFLICT (runtime, round, message_index) DO UPDATE
SET
error_module = excluded.error_module,
error_code = excluded.error_code,
result = excluded.result`

ConsensusAccountRelatedTransactionInsert = `
INSERT INTO chain.accounts_related_transactions (account_address, tx_block, tx_index)
VALUES ($1, $2, $3)`
Expand Down
Loading

0 comments on commit 8024fbd

Please sign in to comment.