Skip to content

Commit

Permalink
fix: nodes keep out of sync when missing gossiped block (#540)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene authored Dec 28, 2023
1 parent d2b2cdf commit 14ae6fd
Show file tree
Hide file tree
Showing 21 changed files with 359 additions and 329 deletions.
27 changes: 27 additions & 0 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
if block.Header.Height != m.store.Height()+1 {
// We crashed after the commit and before updating the store height.
m.logger.Error("Block not applied. Wrong height", "block height", block.Header.Height, "store height", m.store.Height())
return nil
}

Expand Down Expand Up @@ -112,6 +113,32 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
return nil
}

func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
m.applyCachedBlockMutex.Lock()
defer m.applyCachedBlockMutex.Unlock()

prevCachedBlock, exists := m.prevBlock[m.store.Height()+1]

for exists {
m.logger.Debug("Applying cached block", "height", m.store.Height()+1)

err := m.applyBlock(ctx, prevCachedBlock, m.prevCommit[m.store.Height()+1], blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply previously cached block", "err", err)
return err
}
prevCachedBlock, exists = m.prevBlock[m.store.Height()+1]
}

for k := range m.prevBlock {
if k <= m.store.Height() {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
}
}
return nil
}

// alignStoreWithApp is responsible for aligning the state of the store and the abci app if necessary.
func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bool, error) {
isRequired := false
Expand Down
35 changes: 27 additions & 8 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,19 @@ type Manager struct {
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool

syncTarget uint64
lastSubmissionTime int64
batchInProcess atomic.Value
isSyncedCond sync.Cond
produceBlockMutex sync.Mutex
syncTarget uint64
lastSubmissionTime int64
batchInProcess atomic.Value
isSyncedCond sync.Cond
produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex

syncCache map[uint64]*types.Block

logger log.Logger

prevBlock map[uint64]*types.Block
prevCommit map[uint64]*types.Commit
}

// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
Expand Down Expand Up @@ -168,6 +172,8 @@ func NewManager(
shouldProduceBlocksCh: make(chan bool, 1),
produceEmptyBlockCh: make(chan bool, 1),
logger: logger,
prevBlock: make(map[uint64]*types.Block),
prevCommit: make(map[uint64]*types.Commit),
}

return agg, nil
Expand Down Expand Up @@ -212,13 +218,26 @@ func (m *Manager) healthStatusEventCallback(event pubsub.Message) {
}

func (m *Manager) applyBlockCallback(event pubsub.Message) {
m.logger.Debug("Received new block event", "eventData", event.Data())
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.prevBlock))
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
err := m.applyBlock(context.Background(), &block, &commit, blockMetaData{source: gossipedBlock})

if block.Header.Height != m.store.Height()+1 {
if block.Header.Height > m.store.Height() {
m.prevBlock[block.Header.Height] = &block
m.prevCommit[block.Header.Height] = &commit
m.logger.Debug("Caching block", "block height", block.Header.Height, "store height", m.store.Height())
}
} else {
err := m.applyBlock(context.Background(), &block, &commit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply block", "err", err)
}
}
err := m.attemptApplyCachedBlocks(context.Background())
if err != nil {
m.logger.Debug("Failed to apply block", "err", err)
m.logger.Debug("Failed to apply previous cached blocks", "err", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestInitialState(t *testing.T) {

// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, privKey, "TestChain", logger)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, privKey, "TestChain", 50, logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down
11 changes: 6 additions & 5 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,12 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) {

// Init manager with empty blocks feature enabled
managerConfig := config.BlockManagerConfig{
BlockTime: blockTime,
EmptyBlocksMaxTime: 0,
BatchSubmitMaxTime: submitTimeout,
BlockBatchSize: batchSize,
BlockBatchMaxSizeBytes: 1000,
BlockTime: blockTime,
EmptyBlocksMaxTime: 0,
BatchSubmitMaxTime: submitTimeout,
BlockBatchSize: batchSize,
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
}

manager, err := getManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
Expand Down
11 changes: 4 additions & 7 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,12 @@ func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) error
return err
}

if settlementBatch.StartHeight != currentHeight+1 {
return fmt.Errorf("settlement batch start height (%d) on index (%d) is not the expected", settlementBatch.StartHeight, currStateIdx)
}

err = m.processNextDABatch(ctx, settlementBatch.MetaData.DA.Height)
if err != nil {
return err
}

currentHeight = m.store.Height()
if currentHeight != settlementBatch.EndHeight {
return fmt.Errorf("after applying state index (%d), the height (%d) is not as expected (%d)", currStateIdx, currentHeight, settlementBatch.EndHeight)
}

err = m.updateStateIndex(settlementBatch.StateIndex)
if err != nil {
Expand Down Expand Up @@ -99,6 +92,10 @@ func (m *Manager) processNextDABatch(ctx context.Context, daHeight uint64) error
}
}
}
err = m.attemptApplyCachedBlocks(ctx)
if err != nil {
m.logger.Debug("Error applying previous cached blocks", "err", err)
}
return nil
}

Expand Down
13 changes: 7 additions & 6 deletions block/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerI,

// Init p2p client and validator
p2pKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, p2pKey, "TestChain", logger)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, p2pKey, "TestChain", 50, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,10 +143,11 @@ func initSettlementLayerMock(settlementlc settlement.LayerI, proposer string, pu

func getManagerConfig() config.BlockManagerConfig {
return config.BlockManagerConfig{
BlockTime: 100 * time.Millisecond,
BlockBatchSize: defaultBatchSize,
BlockBatchMaxSizeBytes: 1000,
BatchSubmitMaxTime: 30 * time.Minute,
NamespaceID: "0102030405060708",
BlockTime: 100 * time.Millisecond,
BlockBatchSize: defaultBatchSize,
BlockBatchMaxSizeBytes: 1000,
BatchSubmitMaxTime: 30 * time.Minute,
NamespaceID: "0102030405060708",
GossipedBlocksCacheSize: 50,
}
}
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type BlockManagerConfig struct {
BlockBatchSize uint64 `mapstructure:"block_batch_size"`
// The size of the batch in Bytes. Every batch we'll write to the DA and the settlement layer.
BlockBatchMaxSizeBytes uint64 `mapstructure:"block_batch_max_size_bytes"`
// The number of messages cached by gossipsub protocol
GossipedBlocksCacheSize int `mapstructure:"gossiped_blocks_cache_size"`
}

// GetViperConfig reads configuration parameters from Viper instance.
Expand Down Expand Up @@ -114,6 +116,10 @@ func (c BlockManagerConfig) Validate() error {
return fmt.Errorf("block_batch_size_bytes must be positive")
}

if c.GossipedBlocksCacheSize <= 0 {
return fmt.Errorf("gossiped_blocks_cache_size must be positive")
}

return nil
}

Expand Down
13 changes: 7 additions & 6 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ func DefaultConfig(home, chainId string) *NodeConfig {
Seeds: ""},
Aggregator: true,
BlockManagerConfig: BlockManagerConfig{
BlockTime: 200 * time.Millisecond,
EmptyBlocksMaxTime: 3 * time.Second,
BatchSubmitMaxTime: 30 * time.Second,
NamespaceID: "000000000000ffff",
BlockBatchSize: 500,
BlockBatchMaxSizeBytes: 500000},
BlockTime: 200 * time.Millisecond,
EmptyBlocksMaxTime: 3 * time.Second,
BatchSubmitMaxTime: 30 * time.Second,
NamespaceID: "000000000000ffff",
BlockBatchSize: 500,
BlockBatchMaxSizeBytes: 500000,
GossipedBlocksCacheSize: 50},
DALayer: "mock",
SettlementLayer: "mock",
Instrumentation: &InstrumentationConfig{
Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ da_config = "{{ .DAConfig }}"
# max size of batch in bytes that can be accepted by DA
block_batch_max_size_bytes = {{ .BlockManagerConfig.BlockBatchMaxSizeBytes }}
# max number of cached messages by gossipsub protocol
gossiped_blocks_cache_size = {{ .BlockManagerConfig.GossipedBlocksCacheSize }}
#celestia config example:
# da_config = "{\"base_url\": \"http://127.0.0.1:26659\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_adjustment\": 1.3, \"namespace_id\":\"000000000000ffff\"}"
# Avail config example:
Expand Down
Loading

0 comments on commit 14ae6fd

Please sign in to comment.