Skip to content

Commit

Permalink
Update historical_uptime support batching
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 21, 2024
1 parent 785fe58 commit 48e40ed
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
16 changes: 14 additions & 2 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func main() {

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Heartbeat updates
heartbeatC := make(chan *gossipv1.Heartbeat, 50)
Expand Down Expand Up @@ -308,8 +309,18 @@ func main() {
select {
case <-rootCtx.Done():
return
case o := <-obsvC:
historical_uptime.ProcessObservation(*db, logger, *o)
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
obs := &gossipv1.Observation{
Hash: o.Msg.Hash,
Signature: o.Msg.Signature,
TxHash: o.Msg.TxHash,
MessageId: o.Msg.MessageId,
}
historical_uptime.ProcessObservation(*db, logger, o.Timestamp, o.Msg.Addr, obs)
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
historical_uptime.ProcessObservation(*db, logger, batch.Timestamp, batch.Msg.Addr, o)
}
}
}
}()
Expand Down Expand Up @@ -370,6 +381,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
Expand Down
18 changes: 9 additions & 9 deletions fly/pkg/historical_uptime/process_observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"encoding/hex"
"fmt"
"time"

node_common "github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/wormhole-foundation/wormhole-monitor/fly/common"
Expand All @@ -15,13 +15,13 @@ import (
)

// createNewObservation creates a new observation from the given observation
func createNewObservation(o node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) types.Observation {
ga := eth_common.BytesToAddress(o.Msg.Addr).String()
func createNewObservation(timestamp time.Time, addr []byte, o *gossipv1.Observation) types.Observation {
ga := eth_common.BytesToAddress(addr).String()
return types.Observation{
MessageID: types.MessageID(o.Msg.MessageId),
MessageID: types.MessageID(o.MessageId),
GuardianAddr: ga,
Signature: hex.EncodeToString(o.Msg.Signature),
ObservedAt: o.Timestamp,
Signature: hex.EncodeToString(o.Signature),
ObservedAt: timestamp,
Status: types.OnTime,
}
}
Expand All @@ -39,10 +39,10 @@ func checkObservationTime(message *types.Message, newObservation types.Observati
// If the message does not exist in the database, it will be created.
// If the message exists, the observation will be appended to the message.
// If the observation is late, observation status will be set to Late.
func ProcessObservation(db bigtable.BigtableDB, logger *zap.Logger, o node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) error {
newObservation := createNewObservation(o)
func ProcessObservation(db bigtable.BigtableDB, logger *zap.Logger, timestamp time.Time, addr []byte, o *gossipv1.Observation) error {
newObservation := createNewObservation(timestamp, addr, o)

message, err := db.GetMessage(context.TODO(), types.MessageID(o.Msg.MessageId))
message, err := db.GetMessage(context.TODO(), types.MessageID(o.MessageId))
if err != nil {
fmt.Printf("failed to get message: %v", err)
return err
Expand Down

0 comments on commit 48e40ed

Please sign in to comment.