Skip to content

Commit

Permalink
historical_uptime: update logic to use bigtable instead of badgerdb
Browse files Browse the repository at this point in the history
Signed-off-by: bingyuyap <[email protected]>
  • Loading branch information
bingyuyap committed Mar 28, 2024
1 parent ed5906d commit c99a51e
Show file tree
Hide file tree
Showing 16 changed files with 888 additions and 930 deletions.
69 changes: 42 additions & 27 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/wormhole-foundation/wormhole-monitor/fly/common"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/db"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/bigtable"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/historical_uptime"
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types"
"github.com/wormhole-foundation/wormhole-monitor/fly/utils"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
Expand All @@ -31,7 +32,6 @@ var (
rootCtx context.Context
rootCtxCancel context.CancelFunc

dataDir string
p2pNetworkID string
p2pPort uint
p2pBootstrap string
Expand All @@ -40,6 +40,12 @@ var (
ethRpcUrl string
coreBridgeAddr string
promRemoteURL string

gcpProjectId string
useBigtableEmulator bool // only use this in local development
bigTableEmulatorHost string = "" // required if using emulator
gcpCredentialsFile string = "" // required if not using emulator
bigTableInstanceId string
)

var (
Expand Down Expand Up @@ -96,7 +102,6 @@ func loadEnvVars() {
if err != nil {
log.Fatal("Error loading .env file")
}
dataDir = verifyEnvVar("DATA_DIR")
p2pNetworkID = verifyEnvVar("P2P_NETWORK_ID")
port, err := strconv.ParseUint(verifyEnvVar("P2P_PORT"), 10, 32)
if err != nil {
Expand All @@ -108,6 +113,18 @@ func loadEnvVars() {
ethRpcUrl = verifyEnvVar("ETH_RPC_URL")
coreBridgeAddr = verifyEnvVar("CORE_BRIDGE_ADDR")
promRemoteURL = verifyEnvVar("PROM_REMOTE_URL")

gcpProjectId = verifyEnvVar("GCP_PROJECT_ID")
bigTableInstanceId = verifyEnvVar("BIGTABLE_INSTANCE_ID")
useBigtableEmulator, err = strconv.ParseBool(verifyEnvVar("USE_BIGTABLE_EMULATOR"))
if err != nil {
log.Fatal("Error parsing USE_BIGTABLE_EMULATOR")
}
if useBigtableEmulator {
bigTableEmulatorHost = verifyEnvVar("BIGTABLE_EMULATOR_HOST")
} else {
gcpCredentialsFile = verifyEnvVar("GCP_CREDENTIALS_FILE")
}
}

func verifyEnvVar(key string) string {
Expand Down Expand Up @@ -180,7 +197,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error)
}
}

func initObservationScraper(db *db.Database, logger *zap.Logger, errC chan error) {
func initObservationScraper(db *bigtable.BigtableDB, logger *zap.Logger, errC chan error) {
node_common.StartRunnable(rootCtx, errC, false, "observation_scraper", func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)

Expand All @@ -189,20 +206,35 @@ func initObservationScraper(db *db.Database, logger *zap.Logger, errC chan error
case <-ctx.Done():
return nil
case <-t.C:
messages, err := db.QueryMessagesByIndex(false, common.ExpiryDuration)
messageObservations := make(map[types.MessageID][]*types.Observation)

messages, err := db.GetUnprocessedMessagesBeforeCutOffTime(ctx, time.Now().Add(-common.ExpiryDuration))
if err != nil {
logger.Error("QueryMessagesByIndex error", zap.Error(err))
continue
}

for _, message := range messages {
observations, err := db.GetObservationsByMessageID(ctx, string(message.MessageID))
if err != nil {
logger.Error("GetObservationsByMessageID error",
zap.Error(err),
zap.String("messageId", string(message.MessageID)),
)
continue
}

messageObservations[message.MessageID] = observations
}

// Tally the number of messages for each chain
messagesPerChain := historical_uptime.TallyMessagesPerChain(logger, messages)

// Initialize the missing observations count for each guardian for each chain
guardianMissingObservations := historical_uptime.InitializeMissingObservationsCount(logger, messages, messagesPerChain)

// Decrement the missing observations count for each observed message
historical_uptime.DecrementMissingObservationsCount(logger, guardianMissingObservations, messages)
historical_uptime.DecrementMissingObservationsCount(logger, guardianMissingObservations, messageObservations)

// Update the metrics with the final count of missing observations
historical_uptime.UpdateMetrics(guardianMissedObservations, guardianMissingObservations)
Expand All @@ -211,25 +243,6 @@ func initObservationScraper(db *db.Database, logger *zap.Logger, errC chan error
})
}

func initDatabaseCleanUp(db *db.Database, logger *zap.Logger, errC chan error) {
node_common.StartRunnable(rootCtx, errC, false, "db_cleanup", func(ctx context.Context) error {
t := time.NewTicker(common.DatabaseCleanUpInterval)

for {
select {
case <-ctx.Done():
return nil
case <-t.C:
err := db.RemoveObservationsByIndex(true, common.ExpiryDuration)
if err != nil {
logger.Error("RemoveObservationsByIndex error", zap.Error(err))
}
}
}
})

}

func main() {
loadEnvVars()
p2pBootstrap = "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"
Expand Down Expand Up @@ -283,12 +296,14 @@ func main() {
}
gst.Set(&gs)

db := db.OpenDb(logger, &dataDir)
db, err := bigtable.NewBigtableDB(rootCtx, gcpProjectId, bigTableInstanceId, gcpCredentialsFile, bigTableEmulatorHost, useBigtableEmulator)
if err != nil {
logger.Fatal("Failed to create bigtable db", zap.Error(err))
}
promErrC := make(chan error)
// Start Prometheus scraper
initPromScraper(promRemoteURL, logger, promErrC)
initObservationScraper(db, logger, promErrC)
initDatabaseCleanUp(db, logger, promErrC)

go func() {
for {
Expand Down
2 changes: 1 addition & 1 deletion fly/common/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

const (
ExpiryDuration = 30 * time.Hour
ExpiryDuration = 30 * time.Hour
DatabaseCleanUpInterval = 48 * time.Hour

MessageUpdateBatchSize = 100
Expand Down
Loading

0 comments on commit c99a51e

Please sign in to comment.