Skip to content

Commit

Permalink
refactor: move tests files to test package (#639)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Apr 21, 2024
1 parent 7290af6 commit 3573217
Show file tree
Hide file tree
Showing 40 changed files with 687 additions and 884 deletions.
54 changes: 27 additions & 27 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// TODO (#330): allow genesis block with height > 0 to be applied.
// TODO: add switch case to have defined behavior for each case.
// validate block height
if block.Header.Height != m.store.NextHeight() {
if block.Header.Height != m.Store.NextHeight() {
return types.ErrInvalidBlockHeight
}

Expand All @@ -43,36 +43,36 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return nil
}
// Start applying the block assuming no inconsistency was found.
_, err = m.store.SaveBlock(block, commit, nil)
_, err = m.Store.SaveBlock(block, commit, nil)
if err != nil {
return fmt.Errorf("save block: %w", err)
}

responses, err := m.executor.ExecuteBlock(m.lastState, block)
responses, err := m.Executor.ExecuteBlock(m.LastState, block)
if err != nil {
return fmt.Errorf("execute block: %w", err)
}

newState, err := m.executor.UpdateStateFromResponses(responses, m.lastState, block)
newState, err := m.Executor.UpdateStateFromResponses(responses, m.LastState, block)
if err != nil {
return fmt.Errorf("update state from responses: %w", err)
}

batch := m.store.NewBatch()
batch := m.Store.NewBatch()

batch, err = m.store.SaveBlockResponses(block.Header.Height, responses, batch)
batch, err = m.Store.SaveBlockResponses(block.Header.Height, responses, batch)
if err != nil {
batch.Discard()
return fmt.Errorf("save block responses: %w", err)
}

m.lastState = newState
batch, err = m.store.UpdateState(m.lastState, batch)
m.LastState = newState
batch, err = m.Store.UpdateState(m.LastState, batch)
if err != nil {
batch.Discard()
return fmt.Errorf("update state: %w", err)
}
batch, err = m.store.SaveValidators(block.Header.Height, m.lastState.Validators, batch)
batch, err = m.Store.SaveValidators(block.Header.Height, m.LastState.Validators, batch)
if err != nil {
batch.Discard()
return fmt.Errorf("save validators: %w", err)
Expand All @@ -84,7 +84,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}

// Commit block to app
retainHeight, err := m.executor.Commit(&newState, block, responses)
retainHeight, err := m.Executor.Commit(&newState, block, responses)
if err != nil {
return fmt.Errorf("commit block: %w", err)
}
Expand All @@ -101,17 +101,17 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
newState.LastValidators = m.lastState.Validators.Copy()
newState.LastValidators = m.LastState.Validators.Copy()
newState.LastStoreHeight = block.Header.Height
newState.BaseHeight = m.store.Base()
newState.BaseHeight = m.Store.Base()

_, err = m.store.UpdateState(newState, nil)
_, err = m.Store.UpdateState(newState, nil)
if err != nil {
return fmt.Errorf("final update state: %w", err)
}
m.lastState = newState
m.LastState = newState

if ok := m.store.SetHeight(block.Header.Height); !ok {
if ok := m.Store.SetHeight(block.Header.Height); !ok {
return fmt.Errorf("store set height: %d", block.Header.Height)
}

Expand All @@ -124,7 +124,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
defer m.retrieverMutex.Unlock()

for {
expectedHeight := m.store.NextHeight()
expectedHeight := m.Store.NextHeight()

cachedBlock, blockExists := m.blockCache[expectedHeight]
if !blockExists {
Expand All @@ -146,7 +146,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {

// isHeightAlreadyApplied checks if the block height is already applied to the app.
func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
proxyAppInfo, err := m.executor.GetAppInfo()
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return false, errorsmod.Wrap(err, "get app info")
}
Expand All @@ -160,29 +160,29 @@ func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {

// UpdateStateFromApp is responsible for aligning the state of the store from the abci app
func (m *Manager) UpdateStateFromApp() error {
proxyAppInfo, err := m.executor.GetAppInfo()
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return errorsmod.Wrap(err, "get app info")
}

appHeight := uint64(proxyAppInfo.LastBlockHeight)

// update the state with the hash, last store height and last validators.
m.lastState.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.lastState.LastStoreHeight = appHeight
m.lastState.LastValidators = m.lastState.Validators.Copy()
m.LastState.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.LastState.LastStoreHeight = appHeight
m.LastState.LastValidators = m.LastState.Validators.Copy()

resp, err := m.store.LoadBlockResponses(appHeight)
resp, err := m.Store.LoadBlockResponses(appHeight)
if err != nil {
return errorsmod.Wrap(err, "load block responses")
}
copy(m.lastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())
copy(m.LastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

_, err = m.store.UpdateState(m.lastState, nil)
_, err = m.Store.UpdateState(m.LastState, nil)
if err != nil {
return errorsmod.Wrap(err, "update state")
}
if ok := m.store.SetHeight(appHeight); !ok {
if ok := m.Store.SetHeight(appHeight); !ok {
return fmt.Errorf("store set height: %d", appHeight)
}
return nil
Expand All @@ -191,9 +191,9 @@ func (m *Manager) UpdateStateFromApp() error {
func (m *Manager) validateBlock(block *types.Block, commit *types.Commit) error {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.settlementClient.GetProposer()
proposer := m.SLClient.GetProposer()

return types.ValidateProposedTransition(m.lastState, block, commit, proposer)
return types.ValidateProposedTransition(m.LastState, block, commit, proposer)
}

func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
Expand Down
10 changes: 5 additions & 5 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ import (

func (m *Manager) RunInitChain(ctx context.Context) error {
// get the proposer's consensus pubkey
proposer := m.settlementClient.GetProposer()
proposer := m.SLClient.GetProposer()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposer.PublicKey)
if err != nil {
return err
}
gensisValSet := []*tmtypes.Validator{tmtypes.NewValidator(tmPubKey, 1)}

// call initChain with both addresses
res, err := m.executor.InitChain(m.genesis, gensisValSet)
res, err := m.Executor.InitChain(m.Genesis, gensisValSet)
if err != nil {
return err
}

// update the state with only the consensus pubkey
m.executor.UpdateStateAfterInitChain(&m.lastState, res, gensisValSet)
m.executor.UpdateMempoolAfterInitChain(&m.lastState)
m.Executor.UpdateStateAfterInitChain(&m.LastState, res, gensisValSet)
m.Executor.UpdateMempoolAfterInitChain(&m.LastState)

if _, err := m.store.UpdateState(m.lastState, nil); err != nil {
if _, err := m.Store.UpdateState(m.LastState, nil); err != nil {
return err
}

Expand Down
80 changes: 40 additions & 40 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@ import (
// Manager is responsible for aggregating transactions into blocks.
type Manager struct {
// Configuration
conf config.BlockManagerConfig
genesis *tmtypes.GenesisDoc
proposerKey crypto.PrivKey
Conf config.BlockManagerConfig
Genesis *tmtypes.GenesisDoc
ProposerKey crypto.PrivKey

// Store and execution
store store.Store
lastState types.State
executor *Executor
Store store.Store
LastState types.State
Executor *Executor

// Clients and servers
pubsub *pubsub.Server
p2pClient *p2p.Client
dalc da.DataAvailabilityLayerClient
settlementClient settlement.LayerI
Pubsub *pubsub.Server
p2pClient *p2p.Client
DAClient da.DataAvailabilityLayerClient
SLClient settlement.LayerI

// Data retrieval
retriever da.BatchRetriever
Retriever da.BatchRetriever

// Synchronization
syncTargetDiode diodes.Diode
SyncTargetDiode diodes.Diode

syncTarget atomic.Uint64
SyncTarget atomic.Uint64

// Block production
shouldProduceBlocksCh chan bool
Expand Down Expand Up @@ -122,19 +122,19 @@ func NewManager(
}

agg := &Manager{
pubsub: pubsub,
p2pClient: p2pClient,
proposerKey: proposerKey,
conf: conf,
genesis: genesis,
lastState: s,
store: store,
executor: exec,
dalc: dalc,
settlementClient: settlementClient,
retriever: dalc.(da.BatchRetriever),
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
LastState: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
syncTargetDiode: diodes.NewOneToOne(1, nil),
SyncTargetDiode: diodes.NewOneToOne(1, nil),
shouldProduceBlocksCh: make(chan bool, 1),
produceEmptyBlockCh: make(chan bool, 1),
logger: logger,
Expand All @@ -151,16 +151,16 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
// TODO (#283): set aggregator mode by proposer addr on the hub
if isAggregator {
// make sure local signing key is the registered on the hub
slProposerKey := m.settlementClient.GetProposer().PublicKey.Bytes()
localProposerKey, _ := m.proposerKey.GetPublic().Raw()
slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, _ := m.ProposerKey.GetPublic().Raw()
if !bytes.Equal(slProposerKey, localProposerKey) {
return fmt.Errorf("proposer key mismatch: settlement proposer key: %s, block manager proposer key: %s", slProposerKey, m.proposerKey.GetPublic())
return fmt.Errorf("proposer key mismatch: settlement proposer key: %s, block manager proposer key: %s", slProposerKey, m.ProposerKey.GetPublic())
}
m.logger.Info("Starting in aggregator mode")
}

// Check if InitChain flow is needed
if m.lastState.IsGenesis() {
if m.LastState.IsGenesis() {
m.logger.Info("Running InitChain")

err := m.RunInitChain(ctx)
Expand All @@ -176,11 +176,11 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
}

if isAggregator {
go uevent.MustSubscribe(ctx, m.pubsub, "nodeHealth", events.QueryHealthStatus, m.onNodeHealthStatus, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "nodeHealth", events.QueryHealthStatus, m.onNodeHealthStatus, m.logger)
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go uevent.MustSubscribe(ctx, m.pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100)
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
}
Expand All @@ -198,26 +198,26 @@ func (m *Manager) syncBlockManager() error {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
m.syncTarget.Store(uint64(m.genesis.InitialHeight - 1))
m.SyncTarget.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}
return err
}
m.syncTarget.Store(resultRetrieveBatch.EndHeight)
m.SyncTarget.Store(resultRetrieveBatch.EndHeight)
err = m.syncUntilTarget(resultRetrieveBatch.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", m.syncTarget.Load())
m.logger.Info("Synced", "current height", m.Store.Height(), "syncTarget", m.SyncTarget.Load())
return nil
}

// updateSyncParams updates the sync target and state index if necessary
func (m *Manager) updateSyncParams(endHeight uint64) {
// UpdateSyncParams updates the sync target and state index if necessary
func (m *Manager) UpdateSyncParams(endHeight uint64) {
types.RollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
m.syncTarget.Store(endHeight)
m.SyncTarget.Store(endHeight)
m.lastSubmissionTime.Store(time.Now().UnixNano())
}

Expand Down Expand Up @@ -249,13 +249,13 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
return
}

nextHeight := m.store.NextHeight()
nextHeight := m.Store.NextHeight()
if block.Header.Height >= nextHeight {
m.blockCache[block.Header.Height] = CachedBlock{
Block: &block,
Commit: &commit,
}
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.store.Height())
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.Store.Height())
}

if block.Header.Height == nextHeight {
Expand All @@ -268,7 +268,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {

// getLatestBatchFromSL gets the latest batch from the SL
func (m *Manager) getLatestBatchFromSL() (*settlement.ResultRetrieveBatch, error) {
return m.settlementClient.RetrieveBatch()
return m.SLClient.RetrieveBatch()
}

// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
Expand Down
Loading

0 comments on commit 3573217

Please sign in to comment.