Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move tests files to test package #639

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4270665
adding retries on hub client
mtsitrin Apr 4, 2024
2eb737c
handling queries for non existing data
mtsitrin Apr 4, 2024
b81f064
fixed UT
mtsitrin Apr 4, 2024
50fbf2b
added additional check as reponse is ptr
mtsitrin Apr 4, 2024
4eb4983
moved log interface to types. moved test logger to tetutil
mtsitrin Apr 4, 2024
820f234
cleaning block manager
mtsitrin Apr 6, 2024
9efb82c
pr comments
mtsitrin Apr 6, 2024
62e50fe
Merge remote-tracking branch 'origin/main' into mtsitrin/551-retries-…
mtsitrin Apr 6, 2024
9b93160
Merge branch 'main' into cleanup
mtsitrin Apr 6, 2024
a4fda42
Merge branch 'mtsitrin/551-retries-for-hub-client' into mtsitrin/632-…
mtsitrin Apr 6, 2024
a990b38
comments
mtsitrin Apr 6, 2024
b0b568a
adding m.store.NextHeight()
mtsitrin Apr 6, 2024
204e133
moved executer to block package
mtsitrin Apr 6, 2024
9ad450b
seperate executor
mtsitrin Apr 6, 2024
26adb00
moved block testutil to testutil package
mtsitrin Apr 7, 2024
96780ec
updated block manager with exported methods and fields
mtsitrin Apr 7, 2024
9c52ff9
moved config tests
mtsitrin Apr 7, 2024
5f9e78d
changed da and sl tests
mtsitrin Apr 7, 2024
b4d880a
changed more tests
mtsitrin Apr 7, 2024
ea19483
node package
mtsitrin Apr 7, 2024
4dec981
p2p tests
mtsitrin Apr 7, 2024
6ebe6fc
moved testutils to testutil package
mtsitrin Apr 7, 2024
72ba6ad
Merge branch 'main' into mtsitirn/636-refactor-move-tests-files-to-_t…
mtsitrin Apr 20, 2024
db13e03
fixed tests
mtsitrin Apr 20, 2024
f69e237
Merge branch 'main' into mtsitirn/636-refactor-move-tests-files-to-_t…
mtsitrin Apr 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// TODO (#330): allow genesis block with height > 0 to be applied.
// TODO: add switch case to have defined behavior for each case.
// validate block height
if block.Header.Height != m.store.NextHeight() {
if block.Header.Height != m.Store.NextHeight() {
return types.ErrInvalidBlockHeight
}

Expand All @@ -43,36 +43,36 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return nil
}
// Start applying the block assuming no inconsistency was found.
_, err = m.store.SaveBlock(block, commit, nil)
_, err = m.Store.SaveBlock(block, commit, nil)
if err != nil {
return fmt.Errorf("save block: %w", err)
}

responses, err := m.executor.ExecuteBlock(m.lastState, block)
responses, err := m.Executor.ExecuteBlock(m.LastState, block)
if err != nil {
return fmt.Errorf("execute block: %w", err)
}

newState, err := m.executor.UpdateStateFromResponses(responses, m.lastState, block)
newState, err := m.Executor.UpdateStateFromResponses(responses, m.LastState, block)
if err != nil {
return fmt.Errorf("update state from responses: %w", err)
}

batch := m.store.NewBatch()
batch := m.Store.NewBatch()

batch, err = m.store.SaveBlockResponses(block.Header.Height, responses, batch)
batch, err = m.Store.SaveBlockResponses(block.Header.Height, responses, batch)
if err != nil {
batch.Discard()
return fmt.Errorf("save block responses: %w", err)
}

m.lastState = newState
batch, err = m.store.UpdateState(m.lastState, batch)
m.LastState = newState
batch, err = m.Store.UpdateState(m.LastState, batch)
if err != nil {
batch.Discard()
return fmt.Errorf("update state: %w", err)
}
batch, err = m.store.SaveValidators(block.Header.Height, m.lastState.Validators, batch)
batch, err = m.Store.SaveValidators(block.Header.Height, m.LastState.Validators, batch)
if err != nil {
batch.Discard()
return fmt.Errorf("save validators: %w", err)
Expand All @@ -84,7 +84,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}

// Commit block to app
retainHeight, err := m.executor.Commit(&newState, block, responses)
retainHeight, err := m.Executor.Commit(&newState, block, responses)
if err != nil {
return fmt.Errorf("commit block: %w", err)
}
Expand All @@ -101,17 +101,17 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
newState.LastValidators = m.lastState.Validators.Copy()
newState.LastValidators = m.LastState.Validators.Copy()
newState.LastStoreHeight = block.Header.Height
newState.BaseHeight = m.store.Base()
newState.BaseHeight = m.Store.Base()

_, err = m.store.UpdateState(newState, nil)
_, err = m.Store.UpdateState(newState, nil)
if err != nil {
return fmt.Errorf("final update state: %w", err)
}
m.lastState = newState
m.LastState = newState

if ok := m.store.SetHeight(block.Header.Height); !ok {
if ok := m.Store.SetHeight(block.Header.Height); !ok {
return fmt.Errorf("store set height: %d", block.Header.Height)
}

Expand All @@ -124,7 +124,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
defer m.retrieverMutex.Unlock()

for {
expectedHeight := m.store.NextHeight()
expectedHeight := m.Store.NextHeight()

cachedBlock, blockExists := m.blockCache[expectedHeight]
if !blockExists {
Expand All @@ -146,7 +146,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {

// isHeightAlreadyApplied checks if the block height is already applied to the app.
func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
proxyAppInfo, err := m.executor.GetAppInfo()
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return false, errorsmod.Wrap(err, "get app info")
}
Expand All @@ -160,29 +160,29 @@ func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {

// UpdateStateFromApp is responsible for aligning the state of the store from the abci app
func (m *Manager) UpdateStateFromApp() error {
proxyAppInfo, err := m.executor.GetAppInfo()
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return errorsmod.Wrap(err, "get app info")
}

appHeight := uint64(proxyAppInfo.LastBlockHeight)

// update the state with the hash, last store height and last validators.
m.lastState.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.lastState.LastStoreHeight = appHeight
m.lastState.LastValidators = m.lastState.Validators.Copy()
m.LastState.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.LastState.LastStoreHeight = appHeight
m.LastState.LastValidators = m.LastState.Validators.Copy()

resp, err := m.store.LoadBlockResponses(appHeight)
resp, err := m.Store.LoadBlockResponses(appHeight)
if err != nil {
return errorsmod.Wrap(err, "load block responses")
}
copy(m.lastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())
copy(m.LastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

_, err = m.store.UpdateState(m.lastState, nil)
_, err = m.Store.UpdateState(m.LastState, nil)
if err != nil {
return errorsmod.Wrap(err, "update state")
}
if ok := m.store.SetHeight(appHeight); !ok {
if ok := m.Store.SetHeight(appHeight); !ok {
return fmt.Errorf("store set height: %d", appHeight)
}
return nil
Expand All @@ -191,9 +191,9 @@ func (m *Manager) UpdateStateFromApp() error {
func (m *Manager) validateBlock(block *types.Block, commit *types.Commit) error {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.settlementClient.GetProposer()
proposer := m.SLClient.GetProposer()

return types.ValidateProposedTransition(m.lastState, block, commit, proposer)
return types.ValidateProposedTransition(m.LastState, block, commit, proposer)
}

func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
Expand Down
10 changes: 5 additions & 5 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ import (

func (m *Manager) RunInitChain(ctx context.Context) error {
// get the proposer's consensus pubkey
proposer := m.settlementClient.GetProposer()
proposer := m.SLClient.GetProposer()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposer.PublicKey)
if err != nil {
return err
}
gensisValSet := []*tmtypes.Validator{tmtypes.NewValidator(tmPubKey, 1)}

// call initChain with both addresses
res, err := m.executor.InitChain(m.genesis, gensisValSet)
res, err := m.Executor.InitChain(m.Genesis, gensisValSet)
if err != nil {
return err
}

// update the state with only the consensus pubkey
m.executor.UpdateStateAfterInitChain(&m.lastState, res, gensisValSet)
m.executor.UpdateMempoolAfterInitChain(&m.lastState)
m.Executor.UpdateStateAfterInitChain(&m.LastState, res, gensisValSet)
m.Executor.UpdateMempoolAfterInitChain(&m.LastState)

if _, err := m.store.UpdateState(m.lastState, nil); err != nil {
if _, err := m.Store.UpdateState(m.LastState, nil); err != nil {
return err
}

Expand Down
80 changes: 40 additions & 40 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@ import (
// Manager is responsible for aggregating transactions into blocks.
type Manager struct {
// Configuration
conf config.BlockManagerConfig
genesis *tmtypes.GenesisDoc
proposerKey crypto.PrivKey
Conf config.BlockManagerConfig
Genesis *tmtypes.GenesisDoc
ProposerKey crypto.PrivKey

// Store and execution
store store.Store
lastState types.State
executor *Executor
Store store.Store
LastState types.State
Executor *Executor

// Clients and servers
pubsub *pubsub.Server
p2pClient *p2p.Client
dalc da.DataAvailabilityLayerClient
settlementClient settlement.LayerI
Pubsub *pubsub.Server
p2pClient *p2p.Client
DAClient da.DataAvailabilityLayerClient
SLClient settlement.LayerI

// Data retrieval
retriever da.BatchRetriever
Retriever da.BatchRetriever

// Synchronization
syncTargetDiode diodes.Diode
SyncTargetDiode diodes.Diode

syncTarget atomic.Uint64
SyncTarget atomic.Uint64

// Block production
shouldProduceBlocksCh chan bool
Expand Down Expand Up @@ -122,19 +122,19 @@ func NewManager(
}

agg := &Manager{
pubsub: pubsub,
p2pClient: p2pClient,
proposerKey: proposerKey,
conf: conf,
genesis: genesis,
lastState: s,
store: store,
executor: exec,
dalc: dalc,
settlementClient: settlementClient,
retriever: dalc.(da.BatchRetriever),
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
LastState: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
syncTargetDiode: diodes.NewOneToOne(1, nil),
SyncTargetDiode: diodes.NewOneToOne(1, nil),
shouldProduceBlocksCh: make(chan bool, 1),
produceEmptyBlockCh: make(chan bool, 1),
logger: logger,
Expand All @@ -151,16 +151,16 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
// TODO (#283): set aggregator mode by proposer addr on the hub
if isAggregator {
// make sure local signing key is the registered on the hub
slProposerKey := m.settlementClient.GetProposer().PublicKey.Bytes()
localProposerKey, _ := m.proposerKey.GetPublic().Raw()
slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, _ := m.ProposerKey.GetPublic().Raw()
if !bytes.Equal(slProposerKey, localProposerKey) {
return fmt.Errorf("proposer key mismatch: settlement proposer key: %s, block manager proposer key: %s", slProposerKey, m.proposerKey.GetPublic())
return fmt.Errorf("proposer key mismatch: settlement proposer key: %s, block manager proposer key: %s", slProposerKey, m.ProposerKey.GetPublic())
}
m.logger.Info("Starting in aggregator mode")
}

// Check if InitChain flow is needed
if m.lastState.IsGenesis() {
if m.LastState.IsGenesis() {
m.logger.Info("Running InitChain")

err := m.RunInitChain(ctx)
Expand All @@ -176,11 +176,11 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
}

if isAggregator {
go uevent.MustSubscribe(ctx, m.pubsub, "nodeHealth", events.QueryHealthStatus, m.onNodeHealthStatus, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "nodeHealth", events.QueryHealthStatus, m.onNodeHealthStatus, m.logger)
Dismissed Show dismissed Hide dismissed
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go uevent.MustSubscribe(ctx, m.pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100)
Dismissed Show dismissed Hide dismissed
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
}
Expand All @@ -198,26 +198,26 @@ func (m *Manager) syncBlockManager() error {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
m.syncTarget.Store(uint64(m.genesis.InitialHeight - 1))
m.SyncTarget.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}
return err
}
m.syncTarget.Store(resultRetrieveBatch.EndHeight)
m.SyncTarget.Store(resultRetrieveBatch.EndHeight)
err = m.syncUntilTarget(resultRetrieveBatch.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", m.syncTarget.Load())
m.logger.Info("Synced", "current height", m.Store.Height(), "syncTarget", m.SyncTarget.Load())
return nil
}

// updateSyncParams updates the sync target and state index if necessary
func (m *Manager) updateSyncParams(endHeight uint64) {
// UpdateSyncParams updates the sync target and state index if necessary
func (m *Manager) UpdateSyncParams(endHeight uint64) {
types.RollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
m.syncTarget.Store(endHeight)
m.SyncTarget.Store(endHeight)
m.lastSubmissionTime.Store(time.Now().UnixNano())
}

Expand Down Expand Up @@ -249,13 +249,13 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
return
}

nextHeight := m.store.NextHeight()
nextHeight := m.Store.NextHeight()
if block.Header.Height >= nextHeight {
m.blockCache[block.Header.Height] = CachedBlock{
Block: &block,
Commit: &commit,
}
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.store.Height())
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.Store.Height())
}

if block.Header.Height == nextHeight {
Expand All @@ -268,7 +268,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {

// getLatestBatchFromSL gets the latest batch from the SL
func (m *Manager) getLatestBatchFromSL() (*settlement.ResultRetrieveBatch, error) {
return m.settlementClient.RetrieveBatch()
return m.SLClient.RetrieveBatch()
}

// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
Expand Down
Loading
Loading