diff --git a/Makefile b/Makefile index ce511b5..e06dadc 100644 --- a/Makefile +++ b/Makefile @@ -19,3 +19,7 @@ lint: .PHONY: install install: go install honnef.co/go/tools/cmd/staticcheck@latest + +.PHONY: gen_go +gen_go: + go generate ./... diff --git a/interface.go b/interface.go index 4221807..0b268b6 100644 --- a/interface.go +++ b/interface.go @@ -29,6 +29,7 @@ type PeerI interface { RequestBlock(blockHash *chainhash.Hash) Network() wire.BitcoinNet IsHealthy() bool + IsUnhealthyCh() <-chan struct{} Shutdown() Restart() } diff --git a/peer.go b/peer.go index 2a63bdd..550b174 100644 --- a/peer.go +++ b/peer.go @@ -40,8 +40,8 @@ const ( retryReadWriteMessageAttempts = 5 reconnectInterval = 10 * time.Second - pingInterval = 2 * time.Minute - connectionHealthTickerDuration = 3 * time.Minute + pingIntervalDefault = 2 * time.Minute + connectionHealthTickerDurationDefault = 3 * time.Minute ) type Block struct { @@ -71,13 +71,15 @@ type Peer struct { invBatcher *batcher.Batcher[chainhash.Hash] dataBatcher *batcher.Batcher[chainhash.Hash] maximumMessageSize int64 - isHealthy bool + isHealthy atomic.Bool userAgentName *string userAgentVersion *string retryReadWriteMessageInterval time.Duration nrWriteHandlers int - - ctx context.Context + isUnhealthyCh chan struct{} + pingInterval time.Duration + connectionHealthThreshold time.Duration + ctx context.Context cancelReadHandler context.CancelFunc cancelWriteHandler context.CancelFunc @@ -105,6 +107,7 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw address: address, writeChan: writeChan, pingPongAlive: make(chan struct{}, 1), + isUnhealthyCh: make(chan struct{}), peerHandler: peerHandler, logger: peerLogger, dial: net.Dial, @@ -112,6 +115,8 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw maximumMessageSize: defaultMaximumMessageSize, batchDelay: defaultBatchDelayMilliseconds * time.Millisecond, retryReadWriteMessageInterval: retryReadWriteMessageIntervalDefault, + pingInterval: pingIntervalDefault, + connectionHealthThreshold: connectionHealthTickerDurationDefault, writerWg: &sync.WaitGroup{}, readerWg: &sync.WaitGroup{}, reconnectingWg: &sync.WaitGroup{}, @@ -132,6 +137,9 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw } func (p *Peer) start() { + + p.logger.Info("Starting peer") + ctx, cancelAll := context.WithCancel(context.Background()) p.cancelAll = cancelAll p.ctx = ctx @@ -788,15 +796,15 @@ func (p *Peer) versionMessage(address string) *wire.MsgVersion { func (p *Peer) startMonitorPingPong() { p.healthMonitorWg.Add(1) - pingTicker := time.NewTicker(pingInterval) + pingTicker := time.NewTicker(p.pingInterval) go func() { // if no ping/pong signal is received for certain amount of time, mark peer as unhealthy - checkConnectionHealthTicker := time.NewTicker(connectionHealthTickerDuration) + monitorConnectionTicker := time.NewTicker(p.connectionHealthThreshold) defer func() { p.healthMonitorWg.Done() - checkConnectionHealthTicker.Stop() + monitorConnectionTicker.Stop() }() for { @@ -809,18 +817,19 @@ func (p *Peer) startMonitorPingPong() { } p.writeChan <- wire.NewMsgPing(nonce) case <-p.pingPongAlive: - p.mu.Lock() - p.isHealthy = true - p.mu.Unlock() + // if ping/pong signal is received reset the ticker + monitorConnectionTicker.Reset(p.connectionHealthThreshold) + p.setHealthy() + case <-monitorConnectionTicker.C: - // if ping/pong is received signal reset the ticker - checkConnectionHealthTicker.Reset(connectionHealthTickerDuration) - case <-checkConnectionHealthTicker.C: + p.isHealthy.Store(false) + + select { + case p.isUnhealthyCh <- struct{}{}: + default: // Do not block if nothing is ready from channel + } - p.mu.Lock() - p.isHealthy = false p.logger.Warn("peer unhealthy") - p.mu.Unlock() case <-p.ctx.Done(): return } @@ -828,31 +837,22 @@ func (p *Peer) startMonitorPingPong() { }() } -func (p *Peer) IsHealthy() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.isHealthy +func (p *Peer) IsUnhealthyCh() <-chan struct{} { + return p.isUnhealthyCh } -func (p *Peer) stopReadHandler() { - if p.cancelReadHandler == nil { +func (p *Peer) setHealthy() { + + if p.isHealthy.Load() { return } - p.logger.Debug("Cancelling read handlers") - p.cancelReadHandler() - p.logger.Debug("Waiting for read handlers to stop") - p.readerWg.Wait() + + p.logger.Info("peer healthy") + p.isHealthy.Store(true) } -func (p *Peer) stopWriteHandler() { - if p.cancelWriteHandler == nil { - return - } - p.logger.Debug("Cancelling write handlers") - p.cancelWriteHandler() - p.logger.Debug("Waiting for writer handlers to stop") - p.writerWg.Wait() +func (p *Peer) IsHealthy() bool { + return p.isHealthy.Load() } func (p *Peer) Restart() { @@ -862,10 +862,14 @@ func (p *Peer) Restart() { } func (p *Peer) Shutdown() { + p.logger.Info("Shutting down") + p.cancelAll() p.reconnectingWg.Wait() p.healthMonitorWg.Wait() p.writerWg.Wait() p.readerWg.Wait() + + p.logger.Info("Shutdown complete") } diff --git a/peer_manager.go b/peer_manager.go index 309b8cc..aeaf134 100644 --- a/peer_manager.go +++ b/peer_manager.go @@ -21,7 +21,6 @@ type PeerManager struct { logger *slog.Logger ebs int64 restartUnhealthyPeers bool - monitorPeersInterval time.Duration waitGroup sync.WaitGroup cancelAll context.CancelFunc ctx context.Context @@ -84,6 +83,7 @@ func (pm *PeerManager) GetPeers() []PeerI { } func (pm *PeerManager) Shutdown() { + pm.logger.Info("Shutting down peer manager") if pm.cancelAll != nil { pm.cancelAll() @@ -96,24 +96,22 @@ func (pm *PeerManager) Shutdown() { } func (pm *PeerManager) StartMonitorPeerHealth() { - ticker := time.NewTicker(pm.monitorPeersInterval) - pm.waitGroup.Add(1) - go func() { - defer pm.waitGroup.Done() - for { - select { - case <-pm.ctx.Done(): - return - case <-ticker.C: - for _, peer := range pm.GetPeers() { - if !peer.IsHealthy() { - pm.logger.Warn("peer unhealthy - restarting", slog.String("address", peer.String()), slog.Bool("connected", peer.Connected())) - peer.Restart() - } + + for _, peer := range pm.peers { + pm.waitGroup.Add(1) + go func(p PeerI) { + defer pm.waitGroup.Done() + for { + select { + case <-pm.ctx.Done(): + return + case <-p.IsUnhealthyCh(): + pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected())) + p.Restart() } } - } - }() + }(peer) + } } // AnnounceTransaction will send an INV message to the provided peers or to selected peers if peers is nil diff --git a/peer_manager_options.go b/peer_manager_options.go index 53eb54b..9700599 100644 --- a/peer_manager_options.go +++ b/peer_manager_options.go @@ -16,9 +16,8 @@ func WithExcessiveBlockSize(ebs int64) PeerManagerOptions { } } -func WithRestartUnhealthyPeers(monitorPeersInterval time.Duration) PeerManagerOptions { +func WithRestartUnhealthyPeers() PeerManagerOptions { return func(p *PeerManager) { p.restartUnhealthyPeers = true - p.monitorPeersInterval = monitorPeersInterval } } diff --git a/peer_manager_test.go b/peer_manager_test.go index 6062b22..67b371f 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -39,7 +39,7 @@ func TestNewPeerManager(t *testing.T) { err = pm.AddPeer(peer) require.NoError(t, err) assert.Len(t, pm.GetPeers(), 1) - peer.Shutdown() + pm.Shutdown() }) t.Run("1 peer - de dup", func(t *testing.T) { @@ -64,9 +64,7 @@ func TestNewPeerManager(t *testing.T) { assert.Len(t, pm.GetPeers(), 4) - for _, peer := range peers { - peer.Shutdown() - } + pm.Shutdown() }) } diff --git a/peer_mock.go b/peer_mock.go index f60f51f..4d581d1 100644 --- a/peer_mock.go +++ b/peer_mock.go @@ -47,6 +47,10 @@ func (p *PeerMock) IsHealthy() bool { return true } +func (p *PeerMock) IsUnhealthyCh() <-chan struct{} { + return make(<-chan struct{}) +} + func (p *PeerMock) Connected() bool { return true } diff --git a/peer_options.go b/peer_options.go index 432ebeb..6b8d739 100644 --- a/peer_options.go +++ b/peer_options.go @@ -61,3 +61,14 @@ func WithNrOfWriteHandlers(NrWriteHandlers int) PeerOptions { return nil } } + +// WithPingInterval sets the optional time duration ping interval and connection health threshold +// ping interval is the time interval in which the peer sends a ping +// connection health threshold is the time duration after which the connection is marked unhealthy if no signal is received +func WithPingInterval(pingInterval time.Duration, connectionHealthThreshold time.Duration) PeerOptions { + return func(p *Peer) error { + p.pingInterval = pingInterval + p.connectionHealthThreshold = connectionHealthThreshold + return nil + } +} diff --git a/peer_test.go b/peer_test.go index a5f9c2a..0075be9 100644 --- a/peer_test.go +++ b/peer_test.go @@ -333,10 +333,10 @@ func TestReconnect(t *testing.T) { if tc.cancelRead { // cancel reader so that writer will disconnect - peer.stopReadHandler() + stopReadHandler(peer) } else { // cancel writer so that reader will disconnect - peer.stopWriteHandler() + stopWriteHandler(peer) } // break connection @@ -637,3 +637,23 @@ func doHandshake(t *testing.T, p *Peer, myConn net.Conn) { assert.NoError(t, err) assert.Equal(t, wire.CmdVerAck, msg.Command()) } + +func stopReadHandler(p *Peer) { + if p.cancelReadHandler == nil { + return + } + p.logger.Debug("Cancelling read handlers") + p.cancelReadHandler() + p.logger.Debug("Waiting for read handlers to stop") + p.readerWg.Wait() +} + +func stopWriteHandler(p *Peer) { + if p.cancelWriteHandler == nil { + return + } + p.logger.Debug("Cancelling write handlers") + p.cancelWriteHandler() + p.logger.Debug("Waiting for writer handlers to stop") + p.writerWg.Wait() +} diff --git a/test/peer_integration_test.go b/test/peer_integration_test.go index d443af3..2db87c8 100644 --- a/test/peer_integration_test.go +++ b/test/peer_integration_test.go @@ -148,22 +148,12 @@ func TestNewPeer(t *testing.T) { t.Log("shutdown finished") }) - t.Run("announce transaction", func(t *testing.T) { + t.Run("restart", func(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) - pm := p2p.NewPeerManager(logger, wire.TestNet) - require.NotNil(t, pm) - - peerHandler := &p2p.PeerHandlerIMock{ - HandleTransactionsGetFunc: func(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) { - return [][]byte{TX1RawBytes}, nil - }, - } - - peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet) - require.NoError(t, err) + peerHandler := p2p.NewMockPeerHandler() - err = pm.AddPeer(peer) + peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet, p2p.WithUserAgent("agent", "0.0.1")) require.NoError(t, err) t.Log("expect that peer has connected") @@ -175,14 +165,30 @@ func TestNewPeer(t *testing.T) { break connectLoop } case <-time.NewTimer(5 * time.Second).C: - t.Fatal("peer did not disconnect") + t.Fatal("peer did not connect") } } - pm.AnnounceTransaction(TX1Hash, []p2p.PeerI{peer}) + t.Log("restart peer") + peer.Restart() - time.Sleep(100 * time.Millisecond) + t.Log("expect that peer has re-established connection") + reconnectLoop: + for { + select { + case <-time.NewTicker(200 * time.Millisecond).C: + if peer.Connected() { + break reconnectLoop + } + case <-time.NewTimer(2 * time.Second).C: + t.Fatal("peer did not reconnect") + } + } + + require.NoError(t, err) + t.Log("shutdown") peer.Shutdown() + t.Log("shutdown finished") }) } diff --git a/test/peer_manager_integration_test.go b/test/peer_manager_integration_test.go new file mode 100644 index 0000000..a26f85c --- /dev/null +++ b/test/peer_manager_integration_test.go @@ -0,0 +1,58 @@ +package test + +import ( + "log/slog" + "os" + "testing" + "time" + + "github.com/libsv/go-p2p" + + "github.com/libsv/go-p2p/wire" + "github.com/stretchr/testify/require" +) + +func TestNewPeerManager(t *testing.T) { + t.Helper() + if testing.Short() { + t.Skip("skipping integration test") + } + + t.Run("announce transaction", func(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + pm := p2p.NewPeerManager(logger, wire.TestNet) + require.NotNil(t, pm) + + peerHandler := &p2p.PeerHandlerIMock{ + HandleTransactionsGetFunc: func(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) { + return [][]byte{TX1RawBytes}, nil + }, + } + + peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet) + require.NoError(t, err) + + err = pm.AddPeer(peer) + require.NoError(t, err) + + t.Log("expect that peer has connected") + connectLoop: + for { + select { + case <-time.NewTicker(200 * time.Millisecond).C: + if peer.Connected() { + break connectLoop + } + case <-time.NewTimer(5 * time.Second).C: + t.Fatal("peer did not disconnect") + } + } + + pm.AnnounceTransaction(TX1Hash, []p2p.PeerI{peer}) + + time.Sleep(100 * time.Millisecond) + + peer.Shutdown() + }) +}