From f3442d47a2520c072684a2becc125e4fc8e7b5d7 Mon Sep 17 00:00:00 2001 From: bingyuyap Date: Sat, 21 Sep 2024 03:57:08 +0800 Subject: [PATCH] hum: add message index bulk mutation that was hum hum: update MAX_CHAIN_ID Signed-off-by: bingyuyap --- fly/cmd/historical_uptime/main.go | 10 +++++----- fly/pkg/bigtable/operations.go | 23 +++++++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/fly/cmd/historical_uptime/main.go b/fly/cmd/historical_uptime/main.go index 87e999e6..697e8ef3 100644 --- a/fly/cmd/historical_uptime/main.go +++ b/fly/cmd/historical_uptime/main.go @@ -109,7 +109,9 @@ var ( ) ) -const PYTHNET_CHAIN_ID = int(vaa.ChainIDPythNet) +// [0, MAX_CHAIN_ID] is the range of chain id that we will track for the uptime monitor +// in this case it's snaxchain since it's the largest mainnet chain idj +const MAX_CHAIN_ID = vaa.ChainIDSnaxchain // guardianChainHeights indexes current chain height by chain id and guardian name var guardianChainHeights = make(common.GuardianChainHeights) @@ -187,10 +189,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error) case <-t.C: recordGuardianHeightDifferences() - for i := 1; i < 36; i++ { - if i == PYTHNET_CHAIN_ID { - continue - } + for i := 1; i <= int(MAX_CHAIN_ID); i++ { chainName := vaa.ChainID(i).String() if strings.HasPrefix(chainName, "unknown chain ID:") { continue @@ -230,6 +229,7 @@ func initObservationScraper(db *bigtable.BigtableDB, logger *zap.Logger, errC ch messageObservations := make(map[types.MessageID][]*types.Observation) messages, err := db.GetUnprocessedMessagesBeforeCutOffTime(ctx, time.Now().Add(-common.ExpiryDuration)) + logger.Info("Number of unprocessed messages", zap.Int("count", len(messages))) if err != nil { logger.Error("QueryMessagesByIndex error", zap.Error(err)) continue diff --git a/fly/pkg/bigtable/operations.go b/fly/pkg/bigtable/operations.go index fae8b9e1..3d9c7f82 100644 --- a/fly/pkg/bigtable/operations.go +++ b/fly/pkg/bigtable/operations.go @@ -44,6 +44,10 @@ func (db *BigtableDB) FlushCache(ctx context.Context, logger *zap.Logger, cache observationMuts := make([]*bigtable.Mutation, 0) observationRows := make([]string, 0) + // Prepare bulk mutations for message indexes + indexMuts := make([]*bigtable.Mutation, 0, len(cache.Messages)) + indexRows := make([]string, 0, len(cache.Messages)) + for messageID, message := range cache.Messages { // Prepare message mutation messageMut, err := createMessageMutation(message) @@ -64,20 +68,39 @@ func (db *BigtableDB) FlushCache(ctx context.Context, logger *zap.Logger, cache observationMuts = append(observationMuts, observationMut) observationRows = append(observationRows, observationRow) } + + // Prepare message index mutation + indexMut := bigtable.NewMutation() + indexMut.Set("indexData", "placeholder", bigtable.Now(), nil) + indexMuts = append(indexMuts, indexMut) + indexRows = append(indexRows, string(messageID)) } + // Apply bulk mutations for messages err := db.ApplyBulk(ctx, MessageTableName, messageRows, messageMuts) if err != nil { logger.Error("Failed to apply bulk mutations for messages", zap.Error(err)) return err } + // Apply bulk mutations for observations err = db.ApplyBulk(ctx, ObservationTableName, observationRows, observationMuts) if err != nil { logger.Error("Failed to apply bulk mutations for observations", zap.Error(err)) return err } + // Apply bulk mutations for message indexes + err = db.ApplyBulk(ctx, MessageIndexTableName, indexRows, indexMuts) + if err != nil { + logger.Error("Failed to apply bulk mutations for message indexes", zap.Error(err)) + return err + } + + logger.Info("Successfully applied bulk mutations for messages", zap.Int("count", len(messageMuts))) + logger.Info("Successfully applied bulk mutations for observations", zap.Int("count", len(observationMuts))) + logger.Info("Successfully applied bulk mutations for message indexes", zap.Int("count", len(indexMuts))) + return nil }