Skip to content

Commit

Permalink
Use the new p2p.Run API
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 22, 2024
1 parent d63b975 commit 84e9354
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 512 deletions.
76 changes: 17 additions & 59 deletions fly/cmd/fly/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,6 @@ func main() {
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()

// Outbound gossip message queue
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)

// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)

Expand Down Expand Up @@ -241,29 +232,6 @@ func main() {
mu := sync.Mutex{}
pythNetSeqs := map[string]map[uint64]time.Time{}

// Ignore observations
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-obsvC:
}
}
}()

// Ignore observation requests
// Note: without this, the whole program hangs on observation requests
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-obsvReqC:
}
}
}()

// Write signed VAAs to bigtable periodically
go func() {
signedVAAs := map[string][]byte{}
Expand Down Expand Up @@ -534,36 +502,26 @@ func main() {
components.GossipParams.Dlo = 1 // default: 5
components.GossipParams.Dhi = 2 // default: 12
components.GossipParams.Dout = 1 // default: 2

params, err := p2p.NewRunParams(
p2pBootstrap,
p2pNetworkID,
priv,
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedVAAListener(signedInC),
p2p.WithChainGovernorConfigListener(govConfigC),
p2p.WithChainGovernorStatusListener(govStatusC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
}

supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
p2pNetworkID,
p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
p2p.Run(params)); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions fly/cmd/healthcheck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func main() {
flag.StringVar(&pubKey, "pubKey", "", "A guardian public key")
flag.StringVar(&url, "url", "", "The public web url of a guardian")
flag.DurationVar(&timeout, "timeout", 15*time.Second, "The duration to wait for a heartbeat and observations")
flag.StringVar(&p2pNetworkID, "network", "/wormhole/mainnet/2", "P2P network identifier")
flag.StringVar(&p2pBootstrap, "bootstrap", "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1", "The list of bootstrap peers (comma-separate) to connect to for gossip network tests. This can be useful to test a particular bootstrap peer.")
flag.StringVar(&p2pNetworkID, "network", p2p.MainnetNetworkId, "P2P network identifier")
flag.StringVar(&p2pBootstrap, "bootstrap", p2p.MainnetBootstrapPeers, "The list of bootstrap peers (comma-separate) to connect to for gossip network tests. This can be useful to test a particular bootstrap peer.")
flag.UintVar(&p2pPort, "port", p2p.DefaultPort, "P2P UDP listener port")
flag.StringVar(&nodeKeyPath, "nodeKeyPath", "/tmp/health_check.key", "A libp2p node key. Will be created if it does not exist.")
flag.StringVar(&logLevel, "logLevel", "error", "The logging level. Valid values are error, warn, info, and debug.")
Expand Down
107 changes: 69 additions & 38 deletions fly/cmd/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func main() {

const (
GSM_signedObservation GossipMsgType = iota
GSM_signedObservationInBatch
GSM_signedObservationBatch
GSM_tbObservation
GSM_signedHeartbeat
GSM_signedVaaWithQuorum
Expand All @@ -193,11 +195,9 @@ func main() {
GSM_maxTypeVal
)

// Outbound gossip message queue
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 20000)
Expand Down Expand Up @@ -266,7 +266,7 @@ func main() {

gossipMsgTable := table.NewWriter()
gossipMsgTable.SetOutputMirror(os.Stdout)
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "ObsvInB", "ObsvB", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
gossipMsgTable.SetStyle(table.StyleColoredDark)

obsvRateTable := table.NewWriter()
Expand Down Expand Up @@ -316,9 +316,11 @@ func main() {
guardianTable.Render()
prompt()
case "m":
gossipLock.Lock()
activeTable = 2
resetTerm(true)
gossipMsgTable.Render()
gossipLock.Unlock()
prompt()
case "o":
activeTable = 3
Expand All @@ -337,11 +339,12 @@ func main() {
// Just count observations
go func() {
uniqueObs := map[string]struct{}{}
uniqueObsInBatch := map[string]struct{}{}
for {
select {
case <-rootCtx.Done():
return
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
spl := strings.Split(o.Msg.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
Expand All @@ -367,9 +370,44 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
spl := strings.Split(o.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
if knownEmitters[emitter] {
gossipCounter[idx][GSM_tbObservation]++
gossipCounter[totalsRow][GSM_tbObservation]++
}
if handleObsv(uint(idx)) {
obsvRateTable.ResetRows()
for i := 0; i < numGuardians; i++ {
obsvRateTable.AppendRow(table.Row{i, obsvRateRows[int(i)].guardianName, obsvRateRows[int(i)].obsvCount,
obsvRateRows[uint(i)].percents[0], obsvRateRows[uint(i)].percents[1], obsvRateRows[uint(i)].percents[2],
obsvRateRows[uint(i)].percents[3], obsvRateRows[uint(i)].percents[4], obsvRateRows[uint(i)].percents[5],
obsvRateRows[uint(i)].percents[6], obsvRateRows[uint(i)].percents[7], obsvRateRows[uint(i)].percents[8],
obsvRateRows[uint(i)].percents[9]})
}
}
gossipCounter[idx][GSM_signedObservationInBatch]++
gossipCounter[totalsRow][GSM_signedObservationInBatch]++

if *loadTesting {
uniqueObsInBatch[hex.EncodeToString(o.Hash)] = struct{}{}
gossipCounter[uniqueRow][GSM_signedObservationInBatch] = len(uniqueObsInBatch)
}

gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
}
}
}()
Expand All @@ -388,7 +426,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand Down Expand Up @@ -420,7 +458,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand Down Expand Up @@ -489,7 +527,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
if activeTable == 0 {
Expand Down Expand Up @@ -524,7 +562,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand All @@ -545,7 +583,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand All @@ -562,36 +600,29 @@ func main() {
// Run supervisor.
components := p2p.DefaultComponents()
components.Port = *p2pPort

params, err := p2p.NewRunParams(
*p2pBootstrap,
*p2pNetworkID,
priv,
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
p2p.WithChainGovernorConfigListener(govConfigC),
p2p.WithChainGovernorStatusListener(govStatusC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
}

supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
*p2pNetworkID,
*p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
p2p.Run(params)); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 84e9354

Please sign in to comment.