Skip to content

Commit

Permalink
refactor: sample impl with new p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Nov 13, 2024
1 parent 120be56 commit 4f9b01c
Show file tree
Hide file tree
Showing 44 changed files with 1,739 additions and 3,594 deletions.
38 changes: 20 additions & 18 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log/slog"
"time"

"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/better_p2p"

"github.com/bitcoin-sv/arc/internal/grpc_opts"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core"
Expand All @@ -15,6 +15,7 @@ import (

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/blocktx"
blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql"
"github.com/bitcoin-sv/arc/internal/version"
Expand All @@ -35,7 +36,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
blockStore store.BlocktxStore
mqClient blocktx.MessageQueueClient
processor *blocktx.Processor
pm p2p.PeerManagerI
pm *better_p2p.PeerManager
server *blocktx.Server
healthServer *grpc_opts.GrpcServer
workers *blocktx.BackgroundWorkers
Expand Down Expand Up @@ -111,8 +112,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
processorOpts = append(processorOpts, blocktx.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer)
blockProcessCh := make(chan *p2p.BlockMessage, blockProcessingBuffer)
blockRequestCh := make(chan blocktx_p2p.BlockRequest, blockProcessingBuffer)
blockProcessCh := make(chan *blocktx_p2p.BlockMessage, blockProcessingBuffer)

processor, err = blocktx.NewProcessor(logger, blockStore, blockRequestCh, blockProcessCh, processorOpts...)
if err != nil {
Expand All @@ -126,25 +127,25 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to start peer handler: %v", err)
}

peerOpts := []p2p.PeerOptions{
p2p.WithMaximumMessageSize(maximumBlockSize),
p2p.WithRetryReadWriteMessageInterval(5 * time.Second),
p2p.WithPingInterval(30*time.Second, 1*time.Minute),
peerOpts := []better_p2p.PeerOptions{
better_p2p.WithMaximumMessageSize(maximumBlockSize),
better_p2p.WithPingInterval(30*time.Second, 1*time.Minute),
}

if version.Version != "" {
peerOpts = append(peerOpts, p2p.WithUserAgent("ARC", version.Version))
peerOpts = append(peerOpts, better_p2p.WithUserAgent("ARC", version.Version))
}

pmOpts := []p2p.PeerManagerOptions{p2p.WithExcessiveBlockSize(maximumBlockSize)}
better_p2p.SetExcessiveBlockSize(maximumBlockSize)
pmOpts := []better_p2p.PeerManagerOptions{}
if arcConfig.Metamorph.MonitorPeers {
pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers())
pmOpts = append(pmOpts, better_p2p.WithRestartUnhealthyPeers())
}

pm = p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...)
peers := make([]p2p.PeerI, len(arcConfig.Broadcasting.Unicast.Peers))
pm = better_p2p.NewBetterPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...)
peers := make([]better_p2p.PeerI, len(arcConfig.Broadcasting.Unicast.Peers))

peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh)
peerHandler := blocktx_p2p.NewPeerMsgHandler(logger, blockRequestCh, blockProcessCh)

for i, peerSetting := range arcConfig.Broadcasting.Unicast.Peers {
peerURL, err := peerSetting.GetP2PUrl()
Expand All @@ -153,10 +154,11 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("error getting peer url: %v", err)
}

peer, err := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerURL, peerHandler, network, peerOpts...)
if err != nil {
peer := better_p2p.NewBetterPeer(logger.With(slog.String("module", "peer")), peerHandler, peerURL, network, peerOpts...)
ok := peer.Connect()
if !ok {
stopFn()
return nil, fmt.Errorf("error creating peer %s: %v", peerURL, err)
return nil, fmt.Errorf("error creating peer %s", peerURL)
}
err = pm.AddPeer(peer)
if err != nil {
Expand Down Expand Up @@ -224,7 +226,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf
}

func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor,
pm p2p.PeerManagerI, mqClient blocktx.MessageQueueClient,
pm *better_p2p.PeerManager, mqClient blocktx.MessageQueueClient,
store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers,
shutdownFns []func(),
) {
Expand Down
62 changes: 31 additions & 31 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/bitcoin-sv/arc/internal/tracing"
"github.com/bitcoin-sv/arc/pkg/callbacker"

"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/better_p2p"
"github.com/ordishs/go-bitcoin"
"google.golang.org/grpc"

Expand All @@ -25,6 +25,7 @@ import (
"github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection"
"github.com/bitcoin-sv/arc/internal/metamorph"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
metamorph_p2p "github.com/bitcoin-sv/arc/internal/metamorph/p2p"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
"github.com/bitcoin-sv/arc/internal/metamorph/store/postgresql"
"github.com/bitcoin-sv/arc/internal/version"
Expand All @@ -43,9 +44,8 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore

var (
metamorphStore store.MetamorphStore
peerHandler *metamorph.PeerHandler
pm metamorph.PeerManager
statusMessageCh chan *metamorph.PeerTxMessage
pm *better_p2p.PeerManager
statusMessageCh chan *metamorph_p2p.PeerTxMessage
mqClient metamorph.MessageQueueClient
processor *metamorph.Processor
server *metamorph.Server
Expand Down Expand Up @@ -75,7 +75,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore

stopFn := func() {
logger.Info("Shutting down metamorph")
disposeMtm(logger, server, processor, peerHandler, mqClient, metamorphStore, healthServer, shutdownFns)
disposeMtm(logger, server, processor, pm, mqClient, metamorphStore, healthServer, shutdownFns)
logger.Info("Shutdown complete")
}

Expand All @@ -84,15 +84,15 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
return nil, fmt.Errorf("failed to create metamorph store: %v", err)
}

pm, peerHandler, statusMessageCh, err = initPeerManager(logger, metamorphStore, arcConfig)
pm, statusMessageCh, err = initPeerManager(logger, metamorphStore, arcConfig)
if err != nil {
stopFn()
return nil, err
}

// maximum amount of messages that could be coming from a single block
minedTxsChan := make(chan *blocktx_api.TransactionBlock, chanBufferSize)
submittedTxsChan := make(chan *metamorph_api.TransactionRequest, chanBufferSize)
minedTxsChan := make(chan *blocktx_api.TransactionBlock, 400)
submittedTxsChan := make(chan *metamorph_api.TransactionRequest, 400)

natsClient, err := nats_connection.New(arcConfig.MessageQueue.URL, logger)
if err != nil {
Expand Down Expand Up @@ -145,7 +145,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
processor, err = metamorph.NewProcessor(
metamorphStore,
cacheStore,
pm,
better_p2p.NewHerald(pm),
statusMessageCh,
processorOpts...,
)
Expand Down Expand Up @@ -256,50 +256,50 @@ func NewMetamorphStore(dbConfig *config.DbConfig, tracingConfig *config.TracingC
return s, err
}

func initPeerManager(logger *slog.Logger, s store.MetamorphStore, arcConfig *config.ArcConfig) (p2p.PeerManagerI, *metamorph.PeerHandler, chan *metamorph.PeerTxMessage, error) {
func initPeerManager(logger *slog.Logger, s store.MetamorphStore, arcConfig *config.ArcConfig) (*better_p2p.PeerManager, chan *metamorph_p2p.PeerTxMessage, error) {
network, err := config.GetNetwork(arcConfig.Network)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to get network: %v", err)
return nil, nil, fmt.Errorf("failed to get network: %v", err)
}

logger.Info("Assuming bitcoin network", "network", network)

messageCh := make(chan *metamorph.PeerTxMessage, 10000)
var pmOpts []p2p.PeerManagerOptions
messageCh := make(chan *metamorph_p2p.PeerTxMessage, 10000)
var pmOpts []better_p2p.PeerManagerOptions
if arcConfig.Metamorph.MonitorPeers {
pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers())
pmOpts = append(pmOpts, better_p2p.WithRestartUnhealthyPeers())
}

pm := p2p.NewPeerManager(logger.With(slog.String("module", "peer-handler")), network, pmOpts...)
pm := better_p2p.NewBetterPeerManager(logger.With(slog.String("module", "peer-handler")), network, pmOpts...)

peerHandler := metamorph.NewPeerHandler(s, messageCh)
msgHandler := metamorph_p2p.NewPeerMsgHandler(logger.With(slog.String("module", "peer-msg-handler")), s, messageCh)

peerOpts := []p2p.PeerOptions{
p2p.WithRetryReadWriteMessageInterval(5 * time.Second),
p2p.WithPingInterval(30*time.Second, 1*time.Minute),
peerOpts := []better_p2p.PeerOptions{
better_p2p.WithPingInterval(30*time.Second, 1*time.Minute),
}
if version.Version != "" {
peerOpts = append(peerOpts, p2p.WithUserAgent("ARC", version.Version))
peerOpts = append(peerOpts, better_p2p.WithUserAgent("ARC", version.Version))
}

for _, peerSetting := range arcConfig.Broadcasting.Unicast.Peers {
peerURL, err := peerSetting.GetP2PUrl()
if err != nil {
return nil, nil, nil, fmt.Errorf("error getting peer url: %v", err)
return nil, nil, fmt.Errorf("error getting peer url: %v", err)
}

var peer *p2p.Peer
peer, err = p2p.NewPeer(logger.With(slog.String("module", "peer")), peerURL, peerHandler, network, peerOpts...)
if err != nil {
return nil, nil, nil, fmt.Errorf("error creating peer %s: %v", peerURL, err)
// TODO: rethink peer connection here since now Connect wait for handshake wich can take awhile
peer := better_p2p.NewBetterPeer(logger.With(slog.String("module", "peer")), msgHandler, peerURL, network, peerOpts...)
peer.Connect()
if !peer.Connect() {
return nil, nil, fmt.Errorf("error creating peer %s: %v", peerURL, err)
}

if err = pm.AddPeer(peer); err != nil {
return nil, nil, nil, fmt.Errorf("error adding peer %s: %v", peerURL, err)
return nil, nil, fmt.Errorf("error adding peer %s: %v", peerURL, err)
}
}

return pm, peerHandler, messageCh, nil
return pm, messageCh, nil
}

func initGrpcCallbackerConn(address, prometheusEndpoint string, grpcMsgSize int, tracingConfig *config.TracingConfig) (callbacker_api.CallbackerAPIClient, error) {
Expand All @@ -316,14 +316,14 @@ func initGrpcCallbackerConn(address, prometheusEndpoint string, grpcMsgSize int,
}

func disposeMtm(l *slog.Logger, server *metamorph.Server, processor *metamorph.Processor,
peerHandler *metamorph.PeerHandler, mqClient metamorph.MessageQueueClient,
pm *better_p2p.PeerManager, mqClient metamorph.MessageQueueClient,
metamorphStore store.MetamorphStore, healthServer *grpc_opts.GrpcServer,
shutdownFns []func(),
) {
// dispose the dependencies in the correct order:
// 1. server - ensure no new request will be received
// 2. processor - ensure all started job are complete
// 3. peerHandler
// 3. peer manager - close p2p connections
// 4. mqClient
// 5. store
// 6. healthServer
Expand All @@ -335,8 +335,8 @@ func disposeMtm(l *slog.Logger, server *metamorph.Server, processor *metamorph.P
if processor != nil {
processor.Shutdown()
}
if peerHandler != nil {
peerHandler.Shutdown()
if pm != nil {
pm.Shutdown()
}
if mqClient != nil {
mqClient.Shutdown()
Expand Down
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down Expand Up @@ -119,8 +118,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.14 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/ordishs/go-utils v1.0.51 // indirect
github.com/paulmach/orb v0.9.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/perimeterx/marshmallow v1.1.5 // indirect
Expand All @@ -139,8 +136,6 @@ require (
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
Expand Down Expand Up @@ -172,3 +167,5 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/libsv/go-p2p => github.com/arkadiuszos4chain/go-p2p v0.0.0-20241108223608-ded470665895
Loading

0 comments on commit 4f9b01c

Please sign in to comment.