Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the new p2p.Run API #329

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 17 additions & 59 deletions fly/cmd/fly/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,6 @@ func main() {
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()

// Outbound gossip message queue
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)

// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)

Expand Down Expand Up @@ -241,29 +232,6 @@ func main() {
mu := sync.Mutex{}
pythNetSeqs := map[string]map[uint64]time.Time{}

// Ignore observations
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-obsvC:
}
}
}()

// Ignore observation requests
// Note: without this, the whole program hangs on observation requests
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-obsvReqC:
}
}
}()

// Write signed VAAs to bigtable periodically
go func() {
signedVAAs := map[string][]byte{}
Expand Down Expand Up @@ -534,36 +502,26 @@ func main() {
components.GossipParams.Dlo = 1 // default: 5
components.GossipParams.Dhi = 2 // default: 12
components.GossipParams.Dout = 1 // default: 2
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved

params, err := p2p.NewRunParams(
p2pBootstrap,
p2pNetworkID,
priv,
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedVAAListener(signedInC),
p2p.WithChainGovernorConfigListener(govConfigC),
p2p.WithChainGovernorStatusListener(govStatusC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
}

supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
p2pNetworkID,
p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
p2p.Run(params)); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions fly/cmd/healthcheck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func main() {
flag.StringVar(&pubKey, "pubKey", "", "A guardian public key")
flag.StringVar(&url, "url", "", "The public web url of a guardian")
flag.DurationVar(&timeout, "timeout", 15*time.Second, "The duration to wait for a heartbeat and observations")
flag.StringVar(&p2pNetworkID, "network", "/wormhole/mainnet/2", "P2P network identifier")
flag.StringVar(&p2pBootstrap, "bootstrap", "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1", "The list of bootstrap peers (comma-separate) to connect to for gossip network tests. This can be useful to test a particular bootstrap peer.")
flag.StringVar(&p2pNetworkID, "network", p2p.MainnetNetworkId, "P2P network identifier")
flag.StringVar(&p2pBootstrap, "bootstrap", p2p.MainnetBootstrapPeers, "The list of bootstrap peers (comma-separate) to connect to for gossip network tests. This can be useful to test a particular bootstrap peer.")
flag.UintVar(&p2pPort, "port", p2p.DefaultPort, "P2P UDP listener port")
flag.StringVar(&nodeKeyPath, "nodeKeyPath", "/tmp/health_check.key", "A libp2p node key. Will be created if it does not exist.")
flag.StringVar(&logLevel, "logLevel", "error", "The logging level. Valid values are error, warn, info, and debug.")
Expand Down
107 changes: 69 additions & 38 deletions fly/cmd/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func main() {

const (
GSM_signedObservation GossipMsgType = iota
GSM_signedObservationInBatch
GSM_signedObservationBatch
GSM_tbObservation
GSM_signedHeartbeat
GSM_signedVaaWithQuorum
Expand All @@ -193,11 +195,9 @@ func main() {
GSM_maxTypeVal
)

// Outbound gossip message queue
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 20000)
Expand Down Expand Up @@ -266,7 +266,7 @@ func main() {

gossipMsgTable := table.NewWriter()
gossipMsgTable.SetOutputMirror(os.Stdout)
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "ObsvInB", "ObsvB", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
gossipMsgTable.SetStyle(table.StyleColoredDark)

obsvRateTable := table.NewWriter()
Expand Down Expand Up @@ -316,9 +316,11 @@ func main() {
guardianTable.Render()
prompt()
case "m":
gossipLock.Lock()
activeTable = 2
resetTerm(true)
gossipMsgTable.Render()
gossipLock.Unlock()
prompt()
case "o":
activeTable = 3
Expand All @@ -337,11 +339,12 @@ func main() {
// Just count observations
go func() {
uniqueObs := map[string]struct{}{}
uniqueObsInBatch := map[string]struct{}{}
for {
select {
case <-rootCtx.Done():
return
case o := <-obsvC:
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
bingyuyap marked this conversation as resolved.
Show resolved Hide resolved
spl := strings.Split(o.Msg.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
Expand All @@ -367,9 +370,44 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
case batch := <-batchObsvC:
for _, o := range batch.Msg.Observations {
spl := strings.Split(o.MessageId, "/")
emitter := strings.ToLower(spl[1])
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
idx := guardianIndexMap[strings.ToLower(addr)]
if knownEmitters[emitter] {
gossipCounter[idx][GSM_tbObservation]++
gossipCounter[totalsRow][GSM_tbObservation]++
}
if handleObsv(uint(idx)) {
obsvRateTable.ResetRows()
for i := 0; i < numGuardians; i++ {
obsvRateTable.AppendRow(table.Row{i, obsvRateRows[int(i)].guardianName, obsvRateRows[int(i)].obsvCount,
obsvRateRows[uint(i)].percents[0], obsvRateRows[uint(i)].percents[1], obsvRateRows[uint(i)].percents[2],
obsvRateRows[uint(i)].percents[3], obsvRateRows[uint(i)].percents[4], obsvRateRows[uint(i)].percents[5],
obsvRateRows[uint(i)].percents[6], obsvRateRows[uint(i)].percents[7], obsvRateRows[uint(i)].percents[8],
obsvRateRows[uint(i)].percents[9]})
}
}
gossipCounter[idx][GSM_signedObservationInBatch]++
gossipCounter[totalsRow][GSM_signedObservationInBatch]++

if *loadTesting {
uniqueObsInBatch[hex.EncodeToString(o.Hash)] = struct{}{}
gossipCounter[uniqueRow][GSM_signedObservationInBatch] = len(uniqueObsInBatch)
}

gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
}
}
}()
Expand All @@ -388,7 +426,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand Down Expand Up @@ -420,7 +458,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand Down Expand Up @@ -489,7 +527,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
if activeTable == 0 {
Expand Down Expand Up @@ -524,7 +562,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand All @@ -545,7 +583,7 @@ func main() {
gossipLock.Lock()
gossipMsgTable.ResetRows()
for idx, r := range gossipCounter {
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6]})
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
}
gossipLock.Unlock()
}
Expand All @@ -562,36 +600,29 @@ func main() {
// Run supervisor.
components := p2p.DefaultComponents()
components.Port = *p2pPort
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved

params, err := p2p.NewRunParams(
*p2pBootstrap,
*p2pNetworkID,
priv,
gst,
rootCtxCancel,
p2p.WithComponents(components),
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedObservationBatchListener(batchObsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
p2p.WithChainGovernorConfigListener(govConfigC),
p2p.WithChainGovernorStatusListener(govStatusC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
}

supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
*p2pNetworkID,
*p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
p2p.Run(params)); err != nil {
return err
}

Expand Down
Loading
Loading