Skip to content

Commit

Permalink
historical_uptime: track missing observations by guardians
Browse files Browse the repository at this point in the history
Signed-off-by: bingyuyap <[email protected]>
  • Loading branch information
bingyuyap committed Feb 7, 2024
1 parent 43998b8 commit b2f23e2
Show file tree
Hide file tree
Showing 9 changed files with 797 additions and 19 deletions.
66 changes: 47 additions & 19 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
promremotew "github.com/certusone/wormhole/node/pkg/telemetry/prom_remote_write"
eth_common "github.com/ethereum/go-ethereum/common"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/joho/godotenv"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/wormhole-foundation/wormhole-monitor/fly/common"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/db"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/historical_uptime"
"github.com/wormhole-foundation/wormhole-monitor/fly/utils"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
Expand Down Expand Up @@ -56,6 +57,14 @@ var (
},
[]string{"guardian", "chain"},
)

guardianMissedObservations = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "guardian_missed_observations_total",
Help: "Total number of observations missed by each guardian on each chain",
},
[]string{"guardian", "chain"},
)
)

const PYTHNET_CHAIN_ID = int(vaa.ChainIDPythNet)
Expand Down Expand Up @@ -120,6 +129,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
// adding this will make sure chain labels are present regardless
for _, guardianName := range common.GetGuardianIndexToNameMap() {
guardianObservations.WithLabelValues(guardianName, chainName).Add(0)
guardianMissedObservations.WithLabelValues(guardianName, chainName).Add(0)
}
}
err := promremotew.ScrapeAndSendLocalMetrics(ctx, info, promLogger)
Expand All @@ -134,6 +144,37 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger) {
}
}

func initObservationScraper(db *db.Database, logger *zap.Logger) {
node_common.StartRunnable(rootCtx, nil, false, "observation_scraper", func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
messages, err := db.QueryMessagesByIndex(false, common.ExpiryDuration)
if err != nil {
logger.Error("QueryMessagesByIndex error", zap.Error(err))
continue
}

// Tally the number of messages for each chain
messagesPerChain := historical_uptime.TallyMessagesPerChain(logger, messages)

// Initialize the missing observations count for each guardian for each chain
guardianMissingObservations := historical_uptime.InitializeMissingObservationsCount(logger, messages, messagesPerChain)

// Decrement the missing observations count for each observed message
historical_uptime.DecrementMissingObservationsCount(logger, guardianMissingObservations, messages)

// Update the metrics with the final count of missing observations
historical_uptime.UpdateMetrics(guardianMissedObservations, guardianMissingObservations)
}
}
})
}

func main() {
loadEnvVars()
p2pBootstrap = "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"
Expand Down Expand Up @@ -180,15 +221,18 @@ func main() {
if err != nil {
logger.Fatal("Failed to fetch guardian set", zap.Error(err))
}
logger.Info("guardian set", zap.Uint32("index", idx), zap.Any("gs", sgs))

gs := node_common.GuardianSet{
Keys: sgs.Keys,
Index: idx,
}
gst.Set(&gs)

db := db.OpenDb(logger, nil)

// Start Prometheus scraper
initPromScraper(promRemoteURL, logger)
initObservationScraper(db, logger)

// WIP(bing): add metrics for guardian observations
go func() {
Expand All @@ -197,23 +241,7 @@ func main() {
case <-rootCtx.Done():
return
case o := <-obsvC:
// Ignore observations from pythnnet
// Pythnet sends too many observations that could deteriorate the performance of the fly node
if o.Msg.MessageId[:3] != strconv.Itoa(PYTHNET_CHAIN_ID) + "/" {
ga := eth_common.BytesToAddress(o.Msg.Addr).String()
chainID := strings.Split(o.Msg.MessageId, "/")[0]
ui64, err := strconv.ParseUint(chainID, 10, 16)
if err != nil {
panic(err)
}
chainName := vaa.ChainID(ui64).String()
guardianName, ok := common.GetGuardianName(ga)
if !ok {
logger.Error("guardian name not found", zap.String("guardian", ga))
continue // Skip setting the metric if guardianName is not found
}
guardianObservations.WithLabelValues(guardianName, chainName).Inc()
}
historical_uptime.ProcessObservation(*db, logger, *o)
}
}
}()
Expand Down
9 changes: 9 additions & 0 deletions fly/common/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package common

import (
"time"
)

const (
ExpiryDuration = 30 * time.Hour
)
64 changes: 64 additions & 0 deletions fly/pkg/db/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package db

import (
"fmt"
"os"
"path"

"github.com/dgraph-io/badger/v3"
"go.uber.org/zap"
)

type badgerZapLogger struct {
*zap.Logger
}

func (l badgerZapLogger) Errorf(f string, v ...interface{}) {
l.Error(fmt.Sprintf(f, v...))
}

func (l badgerZapLogger) Warningf(f string, v ...interface{}) {
l.Warn(fmt.Sprintf(f, v...))
}

func (l badgerZapLogger) Infof(f string, v ...interface{}) {
l.Info(fmt.Sprintf(f, v...))
}

func (l badgerZapLogger) Debugf(f string, v ...interface{}) {
l.Debug(fmt.Sprintf(f, v...))
}

type Database struct {
db *badger.DB
}

func OpenDb(logger *zap.Logger, dataDir *string) *Database {
var options badger.Options

if dataDir != nil {
dbPath := path.Join(*dataDir, "db")
if err := os.MkdirAll(dbPath, 0700); err != nil {
logger.Fatal("failed to create database directory", zap.Error(err))
}

options = badger.DefaultOptions(dbPath)
} else {
options = badger.DefaultOptions("").WithInMemory(true)
}

options = options.WithLogger(badgerZapLogger{logger})

db, err := badger.Open(options)
if err != nil {
logger.Fatal("failed to open database", zap.Error(err))
}

return &Database{
db: db,
}
}

func (db *Database) Close() error {
return db.db.Close()
}
46 changes: 46 additions & 0 deletions fly/pkg/db/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package db

import (
"fmt"
"strconv"
"strings"

"github.com/dgraph-io/badger/v3"
)

// Index keys formats
const (
// Format for metricsChecked index key: metricsChecked|<bool>|<messageID>
metricsCheckedIndexKeyFmt = "metricsChecked|%t|%s"
)

// CreateOrUpdateIndex creates or updates indexes for a message
func CreateOrUpdateIndex(txn *badger.Txn, message *Message) error {
// Index for metricsChecked
mcKey := fmt.Sprintf(metricsCheckedIndexKeyFmt, message.MetricsChecked, message.MessageID)
if err := txn.Set([]byte(mcKey), []byte(message.MessageID)); err != nil {
return fmt.Errorf("failed to set metricsChecked index: %w", err)
}

return nil
}

// parseMetricsCheckedIndexKey helper function to parse index key and extract values
func parseMetricsCheckedIndexKey(key []byte) (bool, string, error) {
keyStr := string(key) // Convert byte slice to string
parts := strings.Split(keyStr, "|")
if len(parts) != 3 || parts[0] != "metricsChecked" {
return false, "", fmt.Errorf("invalid key format")
}

// Parse the metricsChecked value from the string to a bool
metricsChecked, err := strconv.ParseBool(parts[1])
if err != nil {
return false, "", fmt.Errorf("error parsing metricsChecked value from key: %w", err)
}

// The MessageID is the last part of the key
messageID := parts[2]

return metricsChecked, messageID, nil
}
Loading

0 comments on commit b2f23e2

Please sign in to comment.