Skip to content

Commit

Permalink
Make other tools use batches
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 21, 2024
1 parent 48e40ed commit eabbccc
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 11 deletions.
35 changes: 34 additions & 1 deletion fly/cmd/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func main() {

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 20000)
Expand Down Expand Up @@ -338,7 +339,7 @@ func main() {
select {
case <-rootCtx.Done():
return
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
spl := strings.Split(o.Msg.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
Expand Down Expand Up @@ -367,6 +368,37 @@ func main() {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
}
gossipLock.Unlock()
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
spl := strings.Split(o.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
if knownEmitters[emitter] {
gossipCounter[idx][GSM_tbObservation]++
gossipCounter[totalsRow][GSM_tbObservation]++
}
if handleObsv(uint(idx)) {
obsvRateTable.ResetRows()
for i := 0; i < numGuardians; i++ {
obsvRateTable.AppendRow(table.Row{i, obsvRateRows[int(i)].guardianName, obsvRateRows[int(i)].obsvCount, obsvRateRows[uint(i)].percents[0], obsvRateRows[uint(i)].percents[1], obsvRateRows[uint(i)].percents[2], obsvRateRows[uint(i)].percents[3], obsvRateRows[uint(i)].percents[4], obsvRateRows[uint(i)].percents[5], obsvRateRows[uint(i)].percents[6], obsvRateRows[uint(i)].percents[7], obsvRateRows[uint(i)].percents[8], obsvRateRows[uint(i)].percents[9]})
}
}
gossipCounter[idx][GSM_signedObservation]++
gossipCounter[totalsRow][GSM_signedObservation]++

if *loadTesting {
uniqueObs[hex.EncodeToString(o.Hash)] = struct{}{}
gossipCounter[uniqueRow][GSM_signedObservation] = len(uniqueObs)
}

gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
}
gossipLock.Unlock()
}
}
}
}()
Expand Down Expand Up @@ -568,6 +600,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
p2p.WithChainGovernorConfigListener(govConfigC),
Expand Down
18 changes: 17 additions & 1 deletion fly/cmd/observation_stats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func main() {

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Guardian set state managed by processor
gst := common.NewGuardianSetState(nil)
Expand All @@ -92,7 +93,7 @@ func main() {
select {
case <-rootCtx.Done():
return
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
if o.Msg.MessageId[:3] != "26/" && o.Msg.MessageId[:2] != "7/" {
ga := eth_common.BytesToAddress(o.Msg.Addr).String()
// logger.Warn("observation", zap.String("id",o.MessageId), zap.String("addr",ga))
Expand All @@ -104,6 +105,20 @@ func main() {
}
logger.Warn("status", zap.String("id", o.Msg.MessageId), zap.Any("msg", obsvByHash[o.Msg.MessageId]))
}
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
if o.MessageId[:3] != "26/" && o.MessageId[:2] != "7/" {
ga := eth_common.BytesToAddress(batch.Msg.Addr).String()
// logger.Warn("observation", zap.String("id",o.MessageId), zap.String("addr",ga))
if _, ok := obsvByHash[o.MessageId]; !ok {
obsvByHash[o.MessageId] = map[string]time.Time{}
}
if _, ok := obsvByHash[o.MessageId][ga]; !ok {
obsvByHash[o.MessageId][ga] = time.Now()
}
logger.Warn("status", zap.String("id", o.MessageId), zap.Any("msg", obsvByHash[o.MessageId]))
}
}
}
}
}()
Expand All @@ -127,6 +142,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
Expand Down
26 changes: 25 additions & 1 deletion fly/cmd/prom_gossip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func main() {

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 20000)
Expand Down Expand Up @@ -247,7 +248,7 @@ func main() {
afterCount := len(uniqueObs)
logger.Info("Cleaned up unique observations cache", zap.Int("beforeCount", beforeCount), zap.Int("afterCount", afterCount), zap.Int("cleanedUpCount", beforeCount-afterCount))
timer.Reset(delay)
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
gossipByType.WithLabelValues("observation").Inc()
spl := strings.Split(o.Msg.MessageId, "/")
chain, err := parseChainID(spl[0])
Expand All @@ -270,6 +271,28 @@ func main() {
uniqueObservationsCounter.Inc()
}
uniqueObs[hash] = time.Now()
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
gossipByType.WithLabelValues("observation").Inc()
spl := strings.Split(o.MessageId, "/")
chain, err := parseChainID(spl[0])
if err != nil {
chain = vaa.ChainIDUnset
}
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
name := guardianIndexToNameMap[idx]
observationsByGuardianPerChain.WithLabelValues(name, chain.String()).Inc()
if knownEmitters[emitter] {
tbObservationsByGuardianPerChain.WithLabelValues(name, chain.String()).Inc()
}
hash := hex.EncodeToString(o.Hash)
if _, exists := uniqueObs[hash]; exists {
uniqueObservationsCounter.Inc()
}
uniqueObs[hash] = time.Now()
}
}
}
}()
Expand Down Expand Up @@ -406,6 +429,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
p2p.WithChainGovernorConfigListener(govConfigC),
Expand Down
6 changes: 5 additions & 1 deletion fly/cmd/track_gossip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func main() {

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
Expand Down Expand Up @@ -127,8 +128,10 @@ func main() {
select {
case <-rootCtx.Done():
return
case <-obsvC:
case <-obsvC: // TODO: Rip out this code once we cut over to batching.
numObs++
case batch := <-batchObsvC:
numObs += len(batch.Msg.Observations)
case <-signedInC:
numSigned++
case <-obsvReqC:
Expand Down Expand Up @@ -191,6 +194,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
p2p.WithChainGovernorConfigListener(govConfigC),
Expand Down
26 changes: 19 additions & 7 deletions fly/cmd/track_pyth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func main() {

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)

// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
Expand Down Expand Up @@ -159,8 +160,18 @@ func main() {
select {
case <-rootCtx.Done():
return
case m := <-obsvC:
handleObservation(logger, gs, m.Msg)
case m := <-obsvC: // TODO: Rip out this code once we cut over to batching.
obs := &gossipv1.Observation{
Hash: m.Msg.Hash,
Signature: m.Msg.Signature,
TxHash: m.Msg.TxHash,
MessageId: m.Msg.MessageId,
}
handleObservation(logger, gs, m.Msg.Addr, obs)
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
handleObservation(logger, gs, batch.Msg.Addr, o)
}
}
}
}()
Expand Down Expand Up @@ -225,6 +236,7 @@ func main() {
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
)
if err != nil {
Expand Down Expand Up @@ -350,7 +362,7 @@ func handleSignedVAAWithQuorum(logger *zap.Logger, gs common.GuardianSet, m *gos
}
}

func handleObservation(logger *zap.Logger, gs common.GuardianSet, m *gossipv1.SignedObservation) {
func handleObservation(logger *zap.Logger, gs common.GuardianSet, addr []byte, m *gossipv1.Observation) {
hash := hex.EncodeToString(m.Hash)

// Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers
Expand All @@ -360,20 +372,20 @@ func handleObservation(logger *zap.Logger, gs common.GuardianSet, m *gossipv1.Si
logger.Warn("failed to verify signature on observation",
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("addr", hex.EncodeToString(addr)),
zap.Error(err))
return
}

// Verify that m.Addr matches the public key that signed m.Hash.
their_addr := eth_common.BytesToAddress(m.Addr)
their_addr := eth_common.BytesToAddress(addr)
signer_pk := eth_common.BytesToAddress(eth_crypto.Keccak256(pk[1:])[12:])

if their_addr != signer_pk {
logger.Warn("invalid observation - address does not match pubkey",
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("addr", hex.EncodeToString(addr)),
zap.String("pk", signer_pk.Hex()))
return
}
Expand All @@ -398,7 +410,7 @@ func handleObservation(logger *zap.Logger, gs common.GuardianSet, m *gossipv1.Si
logger.Debug("received observation",
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("addr", hex.EncodeToString(addr)),
zap.String("txhash", hex.EncodeToString(m.TxHash)),
zap.String("txhash_b58", base58.Encode(m.TxHash)),
zap.String("message_id", m.MessageId),
Expand Down

0 comments on commit eabbccc

Please sign in to comment.