Skip to content

Commit

Permalink
feat(rpc): Add sync info metrics (#979)
Browse files Browse the repository at this point in the history
  • Loading branch information
zale144 authored Jul 28, 2024
1 parent e922dea commit c5f8f07
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 60 deletions.
16 changes: 9 additions & 7 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ import (
// As the entire process can't be atomic we need to make sure the following condition apply before
// - block height is the expected block height on the store (height + 1).
// - block height is the expected block height on the app (last block height + 1).
func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
// TODO: add switch case to have defined behavior for each case.
// validate block height
if block.Header.Height != m.State.NextHeight() {
return types.ErrInvalidBlockHeight
}

m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.source)
types.SetLastAppliedBlockSource(blockMetaData.Source.String())

m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.Source.String())

// Check if the app's last block height is the same as the currently produced block height
isBlockAlreadyApplied, err := m.isHeightAlreadyApplied(block.Header.Height)
Expand Down Expand Up @@ -120,23 +122,23 @@ func (m *Manager) attemptApplyCachedBlocks() error {
for {
expectedHeight := m.State.NextHeight()

cachedBlock, blockExists := m.blockCache[expectedHeight]
cachedBlock, blockExists := m.blockCache.GetBlockFromCache(expectedHeight)
if !blockExists {
break
}
if err := m.validateBlock(cachedBlock.Block, cachedBlock.Commit); err != nil {
delete(m.blockCache, cachedBlock.Block.Header.Height)
/// TODO: can we take an action here such as dropping the peer / reducing their reputation?
m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
}

err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, blockMetaData{source: gossipedBlock})
err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: types.GossipedBlock})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
m.logger.Info("Block applied", "height", expectedHeight)

delete(m.blockCache, cachedBlock.Block.Header.Height)
m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height)
}

return nil
Expand Down
34 changes: 34 additions & 0 deletions block/block_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package block

import (
"github.com/dymensionxyz/dymint/types"
)

type Cache struct {
// concurrency managed by Manager.retrieverMu mutex
cache map[uint64]types.CachedBlock
}

func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c}
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) DeleteBlockFromCache(h uint64) {
delete(m.cache, h)
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) GetBlockFromCache(h uint64) (types.CachedBlock, bool) {
ret, found := m.cache[h]
return ret, found
}

func (m *Cache) HasBlockInCache(h uint64) bool {
_, found := m.GetBlockFromCache(h)
return found
}

func (m *Cache) Size() int {
return len(m.cache)
}
18 changes: 9 additions & 9 deletions block/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"fmt"

"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
"github.com/tendermint/tendermint/libs/pubsub"
)

// onNewGossipedBlock will take a block and apply it
Expand All @@ -15,21 +16,20 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
block := eventData.Block
commit := eventData.Commit
m.retrieverMu.Lock() // needed to protect blockCache access
_, found := m.blockCache[block.Header.Height]
height := block.Header.Height
// It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks
if found {
if m.blockCache.HasBlockInCache(height) {
m.retrieverMu.Unlock()
return
}

m.logger.Debug("Received new block via gossip.", "block height", block.Header.Height, "store height", m.State.Height(), "n cachedBlocks", len(m.blockCache))
types.LastReceivedP2PHeightGauge.Set(float64(height))

m.logger.Debug("Received new block via gossip.", "block height", height, "store height", m.State.Height(), "n cachedBlocks", m.blockCache.Size())

nextHeight := m.State.NextHeight()
if block.Header.Height >= nextHeight {
m.blockCache[block.Header.Height] = CachedBlock{
Block: &block,
Commit: &commit,
}
if height >= nextHeight {
m.blockCache.AddBlockToCache(height, &block, &commit)
}
m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant

Expand Down
8 changes: 5 additions & 3 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Manager struct {
targetSyncHeight diodes.Diode
// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache map[uint64]CachedBlock
blockCache *Cache
}

// NewManager creates new block Manager.
Expand All @@ -88,7 +88,7 @@ func NewManager(
p2pClient *p2p.Client,
logger types.Logger,
) (*Manager, error) {
localAddress, err := getAddress(localKey)
localAddress, err := types.GetAddress(localKey)
if err != nil {
return nil, err
}
Expand All @@ -110,7 +110,9 @@ func NewManager(
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
}

err = m.LoadStateOnInit(store, genesis, logger)
Expand Down
2 changes: 1 addition & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er
}
}

if err := m.applyBlock(block, commit, blockMetaData{source: producedBlock}); err != nil {
if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.ProducedBlock}); err != nil {
return nil, nil, fmt.Errorf("apply block: %w: %w", err, ErrNonRecoverable)
}

Expand Down
12 changes: 9 additions & 3 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/types"
)

// RetrieveLoop listens for new target sync heights and then syncs the chain by
Expand Down Expand Up @@ -99,7 +100,7 @@ func (m *Manager) applyLocalBlock(height uint64) error {
}

m.retrieverMu.Lock()
err = m.applyBlock(block, commit, blockMetaData{source: localDbBlock})
err = m.applyBlock(block, commit, types.BlockMetaData{Source: types.LocalDbBlock})
if err != nil {
return fmt.Errorf("apply block from local store: height: %d: %w", height, err)
}
Expand All @@ -121,6 +122,7 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
m.retrieverMu.Lock()
defer m.retrieverMu.Unlock()

var lastAppliedHeight float64
for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
if block.Header.Height != m.State.NextHeight() {
Expand All @@ -130,14 +132,18 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
m.logger.Error("validate block from DA", "height", block.Header.Height, "err", err)
continue
}
err := m.applyBlock(block, batch.Commits[i], blockMetaData{source: daBlock, daHeight: daMetaData.Height})
err := m.applyBlock(block, batch.Commits[i], types.BlockMetaData{Source: types.DABlock, DAHeight: daMetaData.Height})
if err != nil {
return fmt.Errorf("apply block: height: %d: %w", block.Header.Height, err)
}

delete(m.blockCache, block.Header.Height)
lastAppliedHeight = float64(block.Header.Height)

m.blockCache.DeleteBlockFromCache(block.Header.Height)
}
}
types.LastReceivedDAHeightGauge.Set(lastAppliedHeight)

return nil
}

Expand Down
37 changes: 0 additions & 37 deletions block/types.go

This file was deleted.

40 changes: 40 additions & 0 deletions types/block_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package types

import (
"github.com/libp2p/go-libp2p/core/crypto"
tmcrypto "github.com/tendermint/tendermint/crypto"
)

type BlockSource uint64

const (
_ BlockSource = iota
ProducedBlock
GossipedBlock
DABlock
LocalDbBlock
)

func (s BlockSource) String() string {
return AllSources[s]
}

var AllSources = []string{"none", "produced", "gossip", "da", "local_db"}

type BlockMetaData struct {
Source BlockSource
DAHeight uint64
}

type CachedBlock struct {
Block *Block
Commit *Commit
}

func GetAddress(key crypto.PrivKey) ([]byte, error) {
rawKey, err := key.GetPublic().Raw()
if err != nil {
return nil, err
}
return tmcrypto.AddressHash(rawKey), nil
}
34 changes: 34 additions & 0 deletions types/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,40 @@ var RollappPendingSubmissionsSkewNumBytes = promauto.NewGauge(prometheus.GaugeOp
Help: "The number of bytes (of blocks and commits) which have been accumulated but not yet submitted.",
})

var LastReceivedP2PHeightGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "last_received_p2p_height",
Help: "The height of the last block received from P2P.",
})

var LastReceivedDAHeightGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "last_received_da_height",
Help: "The height of the last block received from DA.",
})

const SourceLabel = "source"

func init() {
LastAppliedBlockSource.With(prometheus.Labels{SourceLabel: "none"}).Set(0)
}

var LastAppliedBlockSource = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "last_applied_block_source",
Help: "The source of the last applied block",
},
[]string{SourceLabel},
)

func SetLastAppliedBlockSource(source string) {
LastAppliedBlockSource.Reset()
LastAppliedBlockSource.With(prometheus.Labels{SourceLabel: source}).Set(0)
}

var BlockCacheSizeGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "block_cache_size",
Help: "The number of blocks in the cache.",
})

var RollappConsecutiveFailedDASubmission = promauto.NewGauge(prometheus.GaugeOpts{
Name: "rollapp_consecutive_failed_da_submissions",
Help: "The number of consecutive times the da fails to submit.",
Expand Down

0 comments on commit c5f8f07

Please sign in to comment.