Skip to content

Commit

Permalink
Use the new p2p.Run API
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley authored and evan-gray committed Jul 1, 2024
1 parent 4943c13 commit 2e94bba
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 447 deletions.
75 changes: 16 additions & 59 deletions fly/cmd/fly/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,6 @@ func main() {
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()

// Outbound gossip message queue
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)

// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)

Expand Down Expand Up @@ -224,29 +215,6 @@ func main() {
mu := sync.Mutex{}
pythNetSeqs := map[string]map[uint64]time.Time{}

// Ignore observations
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-obsvC:
}
}
}()

// Ignore observation requests
// Note: without this, the whole program hangs on observation requests
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-obsvReqC:
}
}
}()

// Write signed VAAs to bigtable periodically
go func() {
signedVAAs := map[string][]byte{}
Expand Down Expand Up @@ -517,36 +485,25 @@ func main() {
components.GossipParams.Dlo = 1 // default: 5
components.GossipParams.Dhi = 2 // default: 12
components.GossipParams.Dout = 1 // default: 2

params, err := p2p.NewRunParams(
p2pBootstrap,
p2pNetworkID,
priv,
gst,
rootCtxCancel,
p2p.WithSignedVAAListener(signedInC),
p2p.WithChainGovernorConfigListener(govConfigC),
p2p.WithChainGovernorStatusListener(govStatusC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
}

supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
p2pNetworkID,
p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
p2p.Run(params)); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions fly/cmd/healthcheck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func main() {
flag.StringVar(&pubKey, "pubKey", "", "A guardian public key")
flag.StringVar(&url, "url", "", "The public web url of a guardian")
flag.DurationVar(&timeout, "timeout", 15*time.Second, "The duration to wait for a heartbeat and observations")
flag.StringVar(&p2pNetworkID, "network", "/wormhole/mainnet/2", "P2P network identifier")
flag.StringVar(&p2pBootstrap, "bootstrap", "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1", "The list of bootstrap peers (comma-separate) to connect to for gossip network tests. This can be useful to test a particular bootstrap peer.")
flag.StringVar(&p2pNetworkID, "network", p2p.MainnetNetworkId, "P2P network identifier")
flag.StringVar(&p2pBootstrap, "bootstrap", p2p.MainnetBootstrapPeers, "The list of bootstrap peers (comma-separate) to connect to for gossip network tests. This can be useful to test a particular bootstrap peer.")
flag.UintVar(&p2pPort, "port", p2p.DefaultPort, "P2P UDP listener port")
flag.StringVar(&nodeKeyPath, "nodeKeyPath", "/tmp/health_check.key", "A libp2p node key. Will be created if it does not exist.")
flag.StringVar(&logLevel, "logLevel", "error", "The logging level. Valid values are error, warn, info, and debug.")
Expand Down
48 changes: 18 additions & 30 deletions fly/cmd/heartbeats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,6 @@ func main() {
GSM_maxTypeVal
)

// Outbound gossip message queue
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)

Expand Down Expand Up @@ -616,36 +613,27 @@ func main() {
// Run supervisor.
components := p2p.DefaultComponents()
components.Port = *p2pPort

params, err := p2p.NewRunParams(
*p2pBootstrap,
*p2pNetworkID,
priv,
gst,
rootCtxCancel,
p2p.WithSignedObservationListener(obsvC),
p2p.WithSignedVAAListener(signedInC),
p2p.WithObservationRequestListener(obsvReqC),
p2p.WithChainGovernorConfigListener(govConfigC),
p2p.WithChainGovernorStatusListener(govStatusC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
}

supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
*p2pNetworkID,
*p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
p2p.Run(params)); err != nil {
return err
}

Expand Down
100 changes: 14 additions & 86 deletions fly/cmd/historical_uptime/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,29 +261,15 @@ func main() {
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()

// Outbound gossip message queue
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)

// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)

// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)

// Heartbeat updates
heartbeatC := make(chan *gossipv1.Heartbeat, 50)

// Guardian set state managed by processor
gst := node_common.NewGuardianSetState(heartbeatC)

// Governor cfg
govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, 50)

// Governor status
govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, 50)
// Bootstrap guardian set, otherwise heartbeats would be skipped
idx, sgs, err := utils.FetchCurrentGuardianSet(ethRpcUrl, coreBridgeAddr)
if err != nil {
Expand Down Expand Up @@ -328,29 +314,6 @@ func main() {
}
}()

// Ignore observation requests
// Note: without this, the whole program hangs on observation requests
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-obsvReqC:
}
}
}()

// Ignore signed VAAs
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-signedInC:
}
}
}()

// Handle heartbeats
go func() {
for {
Expand Down Expand Up @@ -388,28 +351,6 @@ func main() {
}
}()

// Handle govConfigs
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-govConfigC:
}
}
}()

// Handle govStatus
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-govStatusC:
}
}
}()

// Load p2p private key
var priv crypto.PrivKey
priv, err = node_common.GetOrCreateNodeKey(logger, nodeKeyPath)
Expand All @@ -420,36 +361,23 @@ func main() {
// Run supervisor.
components := p2p.DefaultComponents()
components.Port = p2pPort

params, err := p2p.NewRunParams(
p2pBootstrap,
p2pNetworkID,
priv,
gst,
rootCtxCancel,
p2p.WithSignedObservationListener(obsvC),
)
if err != nil {
logger.Fatal("Failed to create RunParams", zap.Error(err))
}

supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(obsvC,
obsvReqC,
nil,
sendC,
signedInC,
priv,
nil,
gst,
p2pNetworkID,
p2pBootstrap,
"",
false,
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
components,
nil,
false,
false,
nil,
nil,
"",
0,
"",
)); err != nil {
p2p.Run(params)); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 2e94bba

Please sign in to comment.