From c5f8f07c96e5d2ffe3cfa092e572bcead2168d6a Mon Sep 17 00:00:00 2001 From: zale144 Date: Sun, 28 Jul 2024 10:10:10 +0200 Subject: [PATCH] feat(rpc): Add sync info metrics (#979) --- block/block.go | 16 +++++++++------- block/block_cache.go | 34 ++++++++++++++++++++++++++++++++++ block/gossip.go | 18 +++++++++--------- block/manager.go | 8 +++++--- block/produce.go | 2 +- block/retriever.go | 12 +++++++++--- block/types.go | 37 ------------------------------------- types/block_source.go | 40 ++++++++++++++++++++++++++++++++++++++++ types/metrics.go | 34 ++++++++++++++++++++++++++++++++++ 9 files changed, 141 insertions(+), 60 deletions(-) create mode 100644 block/block_cache.go delete mode 100644 block/types.go create mode 100644 types/block_source.go diff --git a/block/block.go b/block/block.go index e72c6b813..9256b4110 100644 --- a/block/block.go +++ b/block/block.go @@ -14,14 +14,16 @@ import ( // As the entire process can't be atomic we need to make sure the following condition apply before // - block height is the expected block height on the store (height + 1). // - block height is the expected block height on the app (last block height + 1). -func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error { +func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error { // TODO: add switch case to have defined behavior for each case. // validate block height if block.Header.Height != m.State.NextHeight() { return types.ErrInvalidBlockHeight } - m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.source) + types.SetLastAppliedBlockSource(blockMetaData.Source.String()) + + m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.Source.String()) // Check if the app's last block height is the same as the currently produced block height isBlockAlreadyApplied, err := m.isHeightAlreadyApplied(block.Header.Height) @@ -120,23 +122,23 @@ func (m *Manager) attemptApplyCachedBlocks() error { for { expectedHeight := m.State.NextHeight() - cachedBlock, blockExists := m.blockCache[expectedHeight] + cachedBlock, blockExists := m.blockCache.GetBlockFromCache(expectedHeight) if !blockExists { break } if err := m.validateBlock(cachedBlock.Block, cachedBlock.Commit); err != nil { - delete(m.blockCache, cachedBlock.Block.Header.Height) - /// TODO: can we take an action here such as dropping the peer / reducing their reputation? + m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height) + // TODO: can we take an action here such as dropping the peer / reducing their reputation? return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err) } - err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, blockMetaData{source: gossipedBlock}) + err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: types.GossipedBlock}) if err != nil { return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err) } m.logger.Info("Block applied", "height", expectedHeight) - delete(m.blockCache, cachedBlock.Block.Header.Height) + m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height) } return nil diff --git a/block/block_cache.go b/block/block_cache.go new file mode 100644 index 000000000..b8e1a296c --- /dev/null +++ b/block/block_cache.go @@ -0,0 +1,34 @@ +package block + +import ( + "github.com/dymensionxyz/dymint/types" +) + +type Cache struct { + // concurrency managed by Manager.retrieverMu mutex + cache map[uint64]types.CachedBlock +} + +func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit) { + m.cache[h] = types.CachedBlock{Block: b, Commit: c} + types.BlockCacheSizeGauge.Set(float64(m.Size())) +} + +func (m *Cache) DeleteBlockFromCache(h uint64) { + delete(m.cache, h) + types.BlockCacheSizeGauge.Set(float64(m.Size())) +} + +func (m *Cache) GetBlockFromCache(h uint64) (types.CachedBlock, bool) { + ret, found := m.cache[h] + return ret, found +} + +func (m *Cache) HasBlockInCache(h uint64) bool { + _, found := m.GetBlockFromCache(h) + return found +} + +func (m *Cache) Size() int { + return len(m.cache) +} diff --git a/block/gossip.go b/block/gossip.go index d8c5a71ae..8b26a084a 100644 --- a/block/gossip.go +++ b/block/gossip.go @@ -4,9 +4,10 @@ import ( "context" "fmt" + "github.com/tendermint/tendermint/libs/pubsub" + "github.com/dymensionxyz/dymint/p2p" "github.com/dymensionxyz/dymint/types" - "github.com/tendermint/tendermint/libs/pubsub" ) // onNewGossipedBlock will take a block and apply it @@ -15,21 +16,20 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { block := eventData.Block commit := eventData.Commit m.retrieverMu.Lock() // needed to protect blockCache access - _, found := m.blockCache[block.Header.Height] + height := block.Header.Height // It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks - if found { + if m.blockCache.HasBlockInCache(height) { m.retrieverMu.Unlock() return } - m.logger.Debug("Received new block via gossip.", "block height", block.Header.Height, "store height", m.State.Height(), "n cachedBlocks", len(m.blockCache)) + types.LastReceivedP2PHeightGauge.Set(float64(height)) + + m.logger.Debug("Received new block via gossip.", "block height", height, "store height", m.State.Height(), "n cachedBlocks", m.blockCache.Size()) nextHeight := m.State.NextHeight() - if block.Header.Height >= nextHeight { - m.blockCache[block.Header.Height] = CachedBlock{ - Block: &block, - Commit: &commit, - } + if height >= nextHeight { + m.blockCache.AddBlockToCache(height, &block, &commit) } m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant diff --git a/block/manager.go b/block/manager.go index 78d5ce885..5bd4c3746 100644 --- a/block/manager.go +++ b/block/manager.go @@ -70,7 +70,7 @@ type Manager struct { targetSyncHeight diodes.Diode // Cached blocks and commits for applying at future heights. The blocks may not be valid, because // we can only do full validation in sequential order. - blockCache map[uint64]CachedBlock + blockCache *Cache } // NewManager creates new block Manager. @@ -88,7 +88,7 @@ func NewManager( p2pClient *p2p.Client, logger types.Logger, ) (*Manager, error) { - localAddress, err := getAddress(localKey) + localAddress, err := types.GetAddress(localKey) if err != nil { return nil, err } @@ -110,7 +110,9 @@ func NewManager( Retriever: dalc.(da.BatchRetriever), targetSyncHeight: diodes.NewOneToOne(1, nil), logger: logger, - blockCache: make(map[uint64]CachedBlock), + blockCache: &Cache{ + cache: make(map[uint64]types.CachedBlock), + }, } err = m.LoadStateOnInit(store, genesis, logger) diff --git a/block/produce.go b/block/produce.go index b83ba0584..8fe960c03 100644 --- a/block/produce.go +++ b/block/produce.go @@ -181,7 +181,7 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er } } - if err := m.applyBlock(block, commit, blockMetaData{source: producedBlock}); err != nil { + if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.ProducedBlock}); err != nil { return nil, nil, fmt.Errorf("apply block: %w: %w", err, ErrNonRecoverable) } diff --git a/block/retriever.go b/block/retriever.go index c9b7774d7..213d6e963 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -9,6 +9,7 @@ import ( "github.com/dymensionxyz/gerr-cosmos/gerrc" "github.com/dymensionxyz/dymint/da" + "github.com/dymensionxyz/dymint/types" ) // RetrieveLoop listens for new target sync heights and then syncs the chain by @@ -99,7 +100,7 @@ func (m *Manager) applyLocalBlock(height uint64) error { } m.retrieverMu.Lock() - err = m.applyBlock(block, commit, blockMetaData{source: localDbBlock}) + err = m.applyBlock(block, commit, types.BlockMetaData{Source: types.LocalDbBlock}) if err != nil { return fmt.Errorf("apply block from local store: height: %d: %w", height, err) } @@ -121,6 +122,7 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error { m.retrieverMu.Lock() defer m.retrieverMu.Unlock() + var lastAppliedHeight float64 for _, batch := range batchResp.Batches { for i, block := range batch.Blocks { if block.Header.Height != m.State.NextHeight() { @@ -130,14 +132,18 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error { m.logger.Error("validate block from DA", "height", block.Header.Height, "err", err) continue } - err := m.applyBlock(block, batch.Commits[i], blockMetaData{source: daBlock, daHeight: daMetaData.Height}) + err := m.applyBlock(block, batch.Commits[i], types.BlockMetaData{Source: types.DABlock, DAHeight: daMetaData.Height}) if err != nil { return fmt.Errorf("apply block: height: %d: %w", block.Header.Height, err) } - delete(m.blockCache, block.Header.Height) + lastAppliedHeight = float64(block.Header.Height) + + m.blockCache.DeleteBlockFromCache(block.Header.Height) } } + types.LastReceivedDAHeightGauge.Set(lastAppliedHeight) + return nil } diff --git a/block/types.go b/block/types.go deleted file mode 100644 index ef55d271d..000000000 --- a/block/types.go +++ /dev/null @@ -1,37 +0,0 @@ -package block - -import ( - "github.com/dymensionxyz/dymint/types" - - "github.com/libp2p/go-libp2p/core/crypto" - tmcrypto "github.com/tendermint/tendermint/crypto" -) - -// TODO: move to types package - -type blockSource string - -const ( - producedBlock blockSource = "produced" - gossipedBlock blockSource = "gossip" - daBlock blockSource = "da" - localDbBlock blockSource = "local_db" -) - -type blockMetaData struct { - source blockSource - daHeight uint64 -} - -type CachedBlock struct { - Block *types.Block - Commit *types.Commit -} - -func getAddress(key crypto.PrivKey) ([]byte, error) { - rawKey, err := key.GetPublic().Raw() - if err != nil { - return nil, err - } - return tmcrypto.AddressHash(rawKey), nil -} diff --git a/types/block_source.go b/types/block_source.go new file mode 100644 index 000000000..343346d60 --- /dev/null +++ b/types/block_source.go @@ -0,0 +1,40 @@ +package types + +import ( + "github.com/libp2p/go-libp2p/core/crypto" + tmcrypto "github.com/tendermint/tendermint/crypto" +) + +type BlockSource uint64 + +const ( + _ BlockSource = iota + ProducedBlock + GossipedBlock + DABlock + LocalDbBlock +) + +func (s BlockSource) String() string { + return AllSources[s] +} + +var AllSources = []string{"none", "produced", "gossip", "da", "local_db"} + +type BlockMetaData struct { + Source BlockSource + DAHeight uint64 +} + +type CachedBlock struct { + Block *Block + Commit *Commit +} + +func GetAddress(key crypto.PrivKey) ([]byte, error) { + rawKey, err := key.GetPublic().Raw() + if err != nil { + return nil, err + } + return tmcrypto.AddressHash(rawKey), nil +} diff --git a/types/metrics.go b/types/metrics.go index d4232086d..f8f27d7cf 100644 --- a/types/metrics.go +++ b/types/metrics.go @@ -35,6 +35,40 @@ var RollappPendingSubmissionsSkewNumBytes = promauto.NewGauge(prometheus.GaugeOp Help: "The number of bytes (of blocks and commits) which have been accumulated but not yet submitted.", }) +var LastReceivedP2PHeightGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "last_received_p2p_height", + Help: "The height of the last block received from P2P.", +}) + +var LastReceivedDAHeightGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "last_received_da_height", + Help: "The height of the last block received from DA.", +}) + +const SourceLabel = "source" + +func init() { + LastAppliedBlockSource.With(prometheus.Labels{SourceLabel: "none"}).Set(0) +} + +var LastAppliedBlockSource = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "last_applied_block_source", + Help: "The source of the last applied block", + }, + []string{SourceLabel}, +) + +func SetLastAppliedBlockSource(source string) { + LastAppliedBlockSource.Reset() + LastAppliedBlockSource.With(prometheus.Labels{SourceLabel: source}).Set(0) +} + +var BlockCacheSizeGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "block_cache_size", + Help: "The number of blocks in the cache.", +}) + var RollappConsecutiveFailedDASubmission = promauto.NewGauge(prometheus.GaugeOpts{ Name: "rollapp_consecutive_failed_da_submissions", Help: "The number of consecutive times the da fails to submit.",