Skip to content

Commit

Permalink
Merge pull request #554 from oasisprotocol/mitjat/remove-consensus-cl…
Browse files Browse the repository at this point in the history
…ient

refactor: Simplify node client stack (remove `ConsensusSourceStorage`)
  • Loading branch information
mitjat authored Nov 7, 2023
2 parents 182d8f1 + bf33bf7 commit 02d919c
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 718 deletions.
70 changes: 36 additions & 34 deletions analyzer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
sdkConfig "github.com/oasisprotocol/oasis-sdk/client-sdk/go/config"

"github.com/oasisprotocol/nexus/coreapi/v22.2.11/consensus/api/transaction"
genesis "github.com/oasisprotocol/nexus/coreapi/v22.2.11/genesis/api"
Expand All @@ -26,7 +27,6 @@ import (
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/metrics"
"github.com/oasisprotocol/nexus/storage"
source "github.com/oasisprotocol/nexus/storage/oasis"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"
)

Expand Down Expand Up @@ -61,7 +61,8 @@ func OpenSignedTxNoVerify(signedTx *transaction.SignedTransaction) (*transaction
type processor struct {
mode analyzer.BlockAnalysisMode
history config.History
source storage.ConsensusSourceStorage
source nodeapi.ConsensusApiLite
network sdkConfig.Network
target storage.TargetStorage
logger *log.Logger
metrics metrics.StorageMetrics
Expand All @@ -70,11 +71,12 @@ type processor struct {
var _ block.BlockProcessor = (*processor)(nil)

// NewAnalyzer returns a new analyzer for the consensus layer.
func NewAnalyzer(blockRange config.BlockRange, batchSize uint64, mode analyzer.BlockAnalysisMode, history config.History, sourceClient *source.ConsensusClient, target storage.TargetStorage, logger *log.Logger) (analyzer.Analyzer, error) {
func NewAnalyzer(blockRange config.BlockRange, batchSize uint64, mode analyzer.BlockAnalysisMode, history config.History, source nodeapi.ConsensusApiLite, network sdkConfig.Network, target storage.TargetStorage, logger *log.Logger) (analyzer.Analyzer, error) {
processor := &processor{
mode: mode,
history: history,
source: sourceClient,
source: source,
network: network,
target: target,
logger: logger.With("analyzer", consensusAnalyzerName),
metrics: metrics.NewDefaultStorageMetrics(consensusAnalyzerName),
Expand Down Expand Up @@ -124,7 +126,7 @@ func (m *processor) FinalizeFastSync(ctx context.Context, lastFastSyncHeight int
var nodes []nodeapi.Node
if r.GenesisHeight == firstSlowSyncHeight {
m.logger.Info("fetching genesis document before starting with the first block of a chain", "chain_context", r.ChainContext, "genesis_height", r.GenesisHeight)
genesisDoc, err = m.source.GenesisDocument(ctx, r.ChainContext)
genesisDoc, err = m.source.GetGenesisDocument(ctx, r.ChainContext)
if err != nil {
return err
}
Expand Down Expand Up @@ -205,7 +207,7 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
height := int64(uheight)

// Fetch all data.
data, err := m.source.AllData(ctx, height, m.mode == analyzer.FastSyncMode)
data, err := fetchAllData(ctx, m.source, m.network, height, m.mode == analyzer.FastSyncMode)
if err != nil {
if strings.Contains(err.Error(), fmt.Sprintf("%d must be less than or equal to the current blockchain height", height)) {
return analyzer.ErrOutOfRange
Expand All @@ -215,7 +217,7 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {

// Process data, prepare updates.
batch := &storage.QueryBatch{}
for _, f := range []func(*storage.QueryBatch, *storage.ConsensusBlockData) error{
for _, f := range []func(*storage.QueryBatch, *consensusBlockData) error{
m.queueBlockInserts,
m.queueEpochInserts,
m.queueTransactionInserts,
Expand All @@ -226,7 +228,7 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
}
}

for _, f := range []func(*storage.QueryBatch, *storage.RegistryData) error{
for _, f := range []func(*storage.QueryBatch, *registryData) error{
m.queueEntityEvents,
m.queueRuntimeRegistrations,
m.queueRegistryEventInserts,
Expand All @@ -239,7 +241,7 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
return err
}

for _, f := range []func(*storage.QueryBatch, *storage.StakingData) error{
for _, f := range []func(*storage.QueryBatch, *stakingData) error{
m.queueRegularTransfers,
m.queueBurns,
m.queueEscrows,
Expand All @@ -252,7 +254,7 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
}
}

for _, f := range []func(*storage.QueryBatch, *storage.SchedulerData) error{
for _, f := range []func(*storage.QueryBatch, *schedulerData) error{
m.queueValidatorUpdates,
m.queueCommitteeUpdates,
} {
Expand All @@ -261,7 +263,7 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
}
}

for _, f := range []func(*storage.QueryBatch, *storage.GovernanceData) error{
for _, f := range []func(*storage.QueryBatch, *governanceData) error{
m.queueSubmissions,
m.queueExecutions,
m.queueFinalizations,
Expand Down Expand Up @@ -298,7 +300,7 @@ func (m *processor) ProcessBlock(ctx context.Context, uheight uint64) error {
return nil
}

func (m *processor) queueBlockInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
func (m *processor) queueBlockInserts(batch *storage.QueryBatch, data *consensusBlockData) error {
batch.Queue(
queries.ConsensusBlockInsert,
data.BlockHeader.Height,
Expand All @@ -314,7 +316,7 @@ func (m *processor) queueBlockInserts(batch *storage.QueryBatch, data *storage.C
return nil
}

func (m *processor) queueEpochInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
func (m *processor) queueEpochInserts(batch *storage.QueryBatch, data *consensusBlockData) error {
batch.Queue(
queries.ConsensusEpochUpsert,
data.Epoch,
Expand All @@ -324,7 +326,7 @@ func (m *processor) queueEpochInserts(batch *storage.QueryBatch, data *storage.C
return nil
}

func (m *processor) queueTransactionInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
func (m *processor) queueTransactionInserts(batch *storage.QueryBatch, data *consensusBlockData) error {
for i, txr := range data.TransactionsWithResults {
signedTx := txr.Transaction
result := txr.Result
Expand Down Expand Up @@ -413,7 +415,7 @@ func (m *processor) queueTransactionInserts(batch *storage.QueryBatch, data *sto
}

// Enqueue DB statements to store events that were generated as the result of a TX execution.
func (m *processor) queueTxEventInserts(batch *storage.QueryBatch, data *storage.ConsensusBlockData) error {
func (m *processor) queueTxEventInserts(batch *storage.QueryBatch, data *consensusBlockData) error {
for i, txr := range data.TransactionsWithResults {
var txAccounts []staking.Address
for _, event := range txr.Result.Events {
Expand Down Expand Up @@ -447,7 +449,7 @@ func (m *processor) queueTxEventInserts(batch *storage.QueryBatch, data *storage
return nil
}

func (m *processor) queueRuntimeRegistrations(batch *storage.QueryBatch, data *storage.RegistryData) error {
func (m *processor) queueRuntimeRegistrations(batch *storage.QueryBatch, data *registryData) error {
// Runtime registered or (re)started.
for _, runtimeEvent := range data.RuntimeStartedEvents {
var keyManager *string
Expand Down Expand Up @@ -482,7 +484,7 @@ func (m *processor) queueRuntimeRegistrations(batch *storage.QueryBatch, data *s
return nil
}

func (m *processor) queueEntityEvents(batch *storage.QueryBatch, data *storage.RegistryData) error {
func (m *processor) queueEntityEvents(batch *storage.QueryBatch, data *registryData) error {
for _, entityEvent := range data.EntityEvents {
entityID := entityEvent.Entity.ID.String()

Expand All @@ -502,7 +504,7 @@ func (m *processor) queueEntityEvents(batch *storage.QueryBatch, data *storage.R
}

// Performs bookkeeping related to node (de)registrations, ignoring registrations that are already expired.
func (m *processor) queueNodeEvents(batch *storage.QueryBatch, data *storage.RegistryData, currentEpoch uint64) error {
func (m *processor) queueNodeEvents(batch *storage.QueryBatch, data *registryData, currentEpoch uint64) error {
if m.mode == analyzer.FastSyncMode {
// Skip node updates during fast sync; this function only modifies chain.nodes and chain.runtime_nodes,
// which are both recreated from scratch by the genesis.
Expand Down Expand Up @@ -549,7 +551,7 @@ func (m *processor) queueNodeEvents(batch *storage.QueryBatch, data *storage.Reg
return nil
}

func (m *processor) queueRegistryEventInserts(batch *storage.QueryBatch, data *storage.RegistryData) error {
func (m *processor) queueRegistryEventInserts(batch *storage.QueryBatch, data *registryData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
if hash != nil {
Expand All @@ -566,7 +568,7 @@ func (m *processor) queueRegistryEventInserts(batch *storage.QueryBatch, data *s
return nil
}

func (m *processor) queueRootHashEventInserts(batch *storage.QueryBatch, data *storage.RootHashData) error {
func (m *processor) queueRootHashEventInserts(batch *storage.QueryBatch, data *rootHashData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
if hash != nil {
Expand Down Expand Up @@ -602,15 +604,15 @@ const (
TransferTypeOther TransferType = "Other"
)

func (m *processor) queueRegularTransfers(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *processor) queueRegularTransfers(batch *storage.QueryBatch, data *stakingData) error {
return m.queueTransfers(batch, data, TransferTypeOther)
}

func (m *processor) queueDisbursementTransfers(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *processor) queueDisbursementTransfers(batch *storage.QueryBatch, data *stakingData) error {
return m.queueTransfers(batch, data, TransferTypeAccumulatorDisbursement)
}

func (m *processor) queueTransfers(batch *storage.QueryBatch, data *storage.StakingData, targetType TransferType) error {
func (m *processor) queueTransfers(batch *storage.QueryBatch, data *stakingData, targetType TransferType) error {
if m.mode == analyzer.FastSyncMode {
// Skip dead reckoning of balances during fast sync. Genesis contains consensus balances.
return nil
Expand Down Expand Up @@ -639,7 +641,7 @@ func (m *processor) queueTransfers(batch *storage.QueryBatch, data *storage.Stak
return nil
}

func (m *processor) queueBurns(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *processor) queueBurns(batch *storage.QueryBatch, data *stakingData) error {
if m.mode == analyzer.FastSyncMode {
// Skip dead reckoning of balances during fast sync. Genesis contains consensus balances.
return nil
Expand All @@ -655,7 +657,7 @@ func (m *processor) queueBurns(batch *storage.QueryBatch, data *storage.StakingD
return nil
}

func (m *processor) queueEscrows(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *processor) queueEscrows(batch *storage.QueryBatch, data *stakingData) error {
if m.mode == analyzer.FastSyncMode {
// Skip dead reckoning of escrows during fast sync.
// Genesis contains all info on escrow balances (active, debonding) and delegations.
Expand Down Expand Up @@ -744,7 +746,7 @@ func (m *processor) queueEscrows(batch *storage.QueryBatch, data *storage.Stakin
return nil
}

func (m *processor) queueAllowanceChanges(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *processor) queueAllowanceChanges(batch *storage.QueryBatch, data *stakingData) error {
if m.mode == analyzer.FastSyncMode {
// Skip tracking of allowances during fast sync.
// Genesis contains all info on current allowances, and we don't track the history of allowances.
Expand Down Expand Up @@ -772,7 +774,7 @@ func (m *processor) queueAllowanceChanges(batch *storage.QueryBatch, data *stora
return nil
}

func (m *processor) queueStakingEventInserts(batch *storage.QueryBatch, data *storage.StakingData) error {
func (m *processor) queueStakingEventInserts(batch *storage.QueryBatch, data *stakingData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
if hash != nil {
Expand All @@ -789,7 +791,7 @@ func (m *processor) queueStakingEventInserts(batch *storage.QueryBatch, data *st
return nil
}

func (m *processor) queueValidatorUpdates(batch *storage.QueryBatch, data *storage.SchedulerData) error {
func (m *processor) queueValidatorUpdates(batch *storage.QueryBatch, data *schedulerData) error {
if m.mode == analyzer.FastSyncMode {
// Skip validator updates during fast sync.
// The state of validators is pulled from the node at every height, and is
Expand All @@ -810,7 +812,7 @@ func (m *processor) queueValidatorUpdates(batch *storage.QueryBatch, data *stora
return nil
}

func (m *processor) queueCommitteeUpdates(batch *storage.QueryBatch, data *storage.SchedulerData) error {
func (m *processor) queueCommitteeUpdates(batch *storage.QueryBatch, data *schedulerData) error {
if m.mode == analyzer.FastSyncMode {
// Skip committee updates during fast sync.
// The state of committees is pulled from the node at every height, and is
Expand Down Expand Up @@ -842,7 +844,7 @@ func (m *processor) queueCommitteeUpdates(batch *storage.QueryBatch, data *stora
return nil
}

func (m *processor) queueSubmissions(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *processor) queueSubmissions(batch *storage.QueryBatch, data *governanceData) error {
if m.mode == analyzer.FastSyncMode {
// Skip proposal tracking during fast sync.
// The full state of proposals is present in the genesis.
Expand Down Expand Up @@ -880,7 +882,7 @@ func (m *processor) queueSubmissions(batch *storage.QueryBatch, data *storage.Go
return nil
}

func (m *processor) queueExecutions(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *processor) queueExecutions(batch *storage.QueryBatch, data *governanceData) error {
if m.mode == analyzer.FastSyncMode {
// Skip proposal tracking during fast sync.
// The full state of proposals is present in the genesis.
Expand All @@ -896,7 +898,7 @@ func (m *processor) queueExecutions(batch *storage.QueryBatch, data *storage.Gov
return nil
}

func (m *processor) queueFinalizations(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *processor) queueFinalizations(batch *storage.QueryBatch, data *governanceData) error {
if m.mode == analyzer.FastSyncMode {
// Skip proposal tracking during fast sync.
// The full state of proposals is present in the genesis.
Expand All @@ -917,7 +919,7 @@ func (m *processor) queueFinalizations(batch *storage.QueryBatch, data *storage.
return nil
}

func (m *processor) queueVotes(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *processor) queueVotes(batch *storage.QueryBatch, data *governanceData) error {
if m.mode == analyzer.FastSyncMode {
// Skip proposal tracking during fast sync.
// The full state of proposals is present in the genesis.
Expand All @@ -935,7 +937,7 @@ func (m *processor) queueVotes(batch *storage.QueryBatch, data *storage.Governan
return nil
}

func (m *processor) queueGovernanceEventInserts(batch *storage.QueryBatch, data *storage.GovernanceData) error {
func (m *processor) queueGovernanceEventInserts(batch *storage.QueryBatch, data *governanceData) error {
for _, event := range data.Events {
hash := util.SanitizeTxHash(event.TxHash.Hex())
if hash != nil {
Expand Down
Loading

0 comments on commit 02d919c

Please sign in to comment.