From 922042ba8dc95c2f1daab89193b3258bf938206a Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Mon, 17 Jun 2024 21:15:03 +0200 Subject: [PATCH 01/10] ARCO-105: Peer health monitoring routine is notified about unhealthy peer. That triggers restarting of the peer. --- interface.go | 1 + peer.go | 43 ++++++++++++++++++++++++++++++++----------- peer_manager.go | 32 ++++++++++++++++---------------- peer_mock.go | 4 ++++ 4 files changed, 53 insertions(+), 27 deletions(-) diff --git a/interface.go b/interface.go index 4221807..0b268b6 100644 --- a/interface.go +++ b/interface.go @@ -29,6 +29,7 @@ type PeerI interface { RequestBlock(blockHash *chainhash.Hash) Network() wire.BitcoinNet IsHealthy() bool + IsUnhealthyCh() <-chan struct{} Shutdown() Restart() } diff --git a/peer.go b/peer.go index 2a63bdd..e74999b 100644 --- a/peer.go +++ b/peer.go @@ -40,8 +40,8 @@ const ( retryReadWriteMessageAttempts = 5 reconnectInterval = 10 * time.Second - pingInterval = 2 * time.Minute - connectionHealthTickerDuration = 3 * time.Minute + pingInterval = 30 * time.Second + connectionHealthTickerDuration = 1 * time.Minute ) type Block struct { @@ -76,6 +76,7 @@ type Peer struct { userAgentVersion *string retryReadWriteMessageInterval time.Duration nrWriteHandlers int + isUnhealthyCh chan struct{} ctx context.Context @@ -105,6 +106,7 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw address: address, writeChan: writeChan, pingPongAlive: make(chan struct{}, 1), + isUnhealthyCh: make(chan struct{}), peerHandler: peerHandler, logger: peerLogger, dial: net.Dial, @@ -792,11 +794,11 @@ func (p *Peer) startMonitorPingPong() { go func() { // if no ping/pong signal is received for certain amount of time, mark peer as unhealthy - checkConnectionHealthTicker := time.NewTicker(connectionHealthTickerDuration) + monitorConnectionTicker := time.NewTicker(connectionHealthTickerDuration) defer func() { p.healthMonitorWg.Done() - checkConnectionHealthTicker.Stop() + monitorConnectionTicker.Stop() }() for { @@ -809,16 +811,19 @@ func (p *Peer) startMonitorPingPong() { } p.writeChan <- wire.NewMsgPing(nonce) case <-p.pingPongAlive: - p.mu.Lock() - p.isHealthy = true - p.mu.Unlock() - - // if ping/pong is received signal reset the ticker - checkConnectionHealthTicker.Reset(connectionHealthTickerDuration) - case <-checkConnectionHealthTicker.C: + // if ping/pong signal is received reset the ticker + monitorConnectionTicker.Reset(connectionHealthTickerDuration) + p.setHealthy() + case <-monitorConnectionTicker.C: p.mu.Lock() p.isHealthy = false + + select { + case p.isUnhealthyCh <- struct{}{}: + default: // Do not block if nothing is ready from channel + } + p.logger.Warn("peer unhealthy") p.mu.Unlock() case <-p.ctx.Done(): @@ -828,6 +833,22 @@ func (p *Peer) startMonitorPingPong() { }() } +func (p *Peer) IsUnhealthyCh() <-chan struct{} { + return p.isUnhealthyCh +} + +func (p *Peer) setHealthy() { + + p.mu.Lock() + if p.isHealthy { + p.mu.Unlock() + return + } + p.logger.Warn("peer healthy") + p.isHealthy = true + p.mu.Unlock() +} + func (p *Peer) IsHealthy() bool { p.mu.Lock() defer p.mu.Unlock() diff --git a/peer_manager.go b/peer_manager.go index 309b8cc..a06f300 100644 --- a/peer_manager.go +++ b/peer_manager.go @@ -96,24 +96,24 @@ func (pm *PeerManager) Shutdown() { } func (pm *PeerManager) StartMonitorPeerHealth() { - ticker := time.NewTicker(pm.monitorPeersInterval) - pm.waitGroup.Add(1) - go func() { - defer pm.waitGroup.Done() - for { - select { - case <-pm.ctx.Done(): - return - case <-ticker.C: - for _, peer := range pm.GetPeers() { - if !peer.IsHealthy() { - pm.logger.Warn("peer unhealthy - restarting", slog.String("address", peer.String()), slog.Bool("connected", peer.Connected())) - peer.Restart() - } + + for _, peer := range pm.peers { + pm.waitGroup.Add(1) + go func(p PeerI) { + defer func() { + pm.waitGroup.Done() + }() + for { + select { + case <-pm.ctx.Done(): + return + case <-p.IsUnhealthyCh(): + pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected())) + p.Restart() } } - } - }() + }(peer) + } } // AnnounceTransaction will send an INV message to the provided peers or to selected peers if peers is nil diff --git a/peer_mock.go b/peer_mock.go index f60f51f..4d581d1 100644 --- a/peer_mock.go +++ b/peer_mock.go @@ -47,6 +47,10 @@ func (p *PeerMock) IsHealthy() bool { return true } +func (p *PeerMock) IsUnhealthyCh() <-chan struct{} { + return make(<-chan struct{}) +} + func (p *PeerMock) Connected() bool { return true } From 8733372ef3bbd9c0e8ebeefb74f3682622bb6d85 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Fri, 28 Jun 2024 13:52:21 +0200 Subject: [PATCH 02/10] ARCO-105: Option to set ping interval and connection health threshold --- peer.go | 17 ++++++++++------- peer_manager.go | 1 - peer_manager_options.go | 3 +-- peer_options.go | 11 +++++++++++ 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/peer.go b/peer.go index e74999b..e6aa468 100644 --- a/peer.go +++ b/peer.go @@ -40,8 +40,8 @@ const ( retryReadWriteMessageAttempts = 5 reconnectInterval = 10 * time.Second - pingInterval = 30 * time.Second - connectionHealthTickerDuration = 1 * time.Minute + pingIntervalDefault = 30 * time.Second + connectionHealthTickerDurationDefault = 1 * time.Minute ) type Block struct { @@ -77,8 +77,9 @@ type Peer struct { retryReadWriteMessageInterval time.Duration nrWriteHandlers int isUnhealthyCh chan struct{} - - ctx context.Context + pingInterval time.Duration + connectionHealthThreshold time.Duration + ctx context.Context cancelReadHandler context.CancelFunc cancelWriteHandler context.CancelFunc @@ -114,6 +115,8 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw maximumMessageSize: defaultMaximumMessageSize, batchDelay: defaultBatchDelayMilliseconds * time.Millisecond, retryReadWriteMessageInterval: retryReadWriteMessageIntervalDefault, + pingInterval: pingIntervalDefault, + connectionHealthThreshold: connectionHealthTickerDurationDefault, writerWg: &sync.WaitGroup{}, readerWg: &sync.WaitGroup{}, reconnectingWg: &sync.WaitGroup{}, @@ -790,11 +793,11 @@ func (p *Peer) versionMessage(address string) *wire.MsgVersion { func (p *Peer) startMonitorPingPong() { p.healthMonitorWg.Add(1) - pingTicker := time.NewTicker(pingInterval) + pingTicker := time.NewTicker(p.pingInterval) go func() { // if no ping/pong signal is received for certain amount of time, mark peer as unhealthy - monitorConnectionTicker := time.NewTicker(connectionHealthTickerDuration) + monitorConnectionTicker := time.NewTicker(p.connectionHealthThreshold) defer func() { p.healthMonitorWg.Done() @@ -812,7 +815,7 @@ func (p *Peer) startMonitorPingPong() { p.writeChan <- wire.NewMsgPing(nonce) case <-p.pingPongAlive: // if ping/pong signal is received reset the ticker - monitorConnectionTicker.Reset(connectionHealthTickerDuration) + monitorConnectionTicker.Reset(p.connectionHealthThreshold) p.setHealthy() case <-monitorConnectionTicker.C: diff --git a/peer_manager.go b/peer_manager.go index a06f300..f50be8c 100644 --- a/peer_manager.go +++ b/peer_manager.go @@ -21,7 +21,6 @@ type PeerManager struct { logger *slog.Logger ebs int64 restartUnhealthyPeers bool - monitorPeersInterval time.Duration waitGroup sync.WaitGroup cancelAll context.CancelFunc ctx context.Context diff --git a/peer_manager_options.go b/peer_manager_options.go index 53eb54b..9700599 100644 --- a/peer_manager_options.go +++ b/peer_manager_options.go @@ -16,9 +16,8 @@ func WithExcessiveBlockSize(ebs int64) PeerManagerOptions { } } -func WithRestartUnhealthyPeers(monitorPeersInterval time.Duration) PeerManagerOptions { +func WithRestartUnhealthyPeers() PeerManagerOptions { return func(p *PeerManager) { p.restartUnhealthyPeers = true - p.monitorPeersInterval = monitorPeersInterval } } diff --git a/peer_options.go b/peer_options.go index 432ebeb..6b8d739 100644 --- a/peer_options.go +++ b/peer_options.go @@ -61,3 +61,14 @@ func WithNrOfWriteHandlers(NrWriteHandlers int) PeerOptions { return nil } } + +// WithPingInterval sets the optional time duration ping interval and connection health threshold +// ping interval is the time interval in which the peer sends a ping +// connection health threshold is the time duration after which the connection is marked unhealthy if no signal is received +func WithPingInterval(pingInterval time.Duration, connectionHealthThreshold time.Duration) PeerOptions { + return func(p *Peer) error { + p.pingInterval = pingInterval + p.connectionHealthThreshold = connectionHealthThreshold + return nil + } +} From aa06c4a62f864257fc1a09d712c50c549f2c38b5 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Fri, 19 Jul 2024 14:45:14 +0200 Subject: [PATCH 03/10] ARCO-105: Logs --- peer.go | 7 +++++++ peer_manager.go | 1 + 2 files changed, 8 insertions(+) diff --git a/peer.go b/peer.go index e6aa468..e1546f2 100644 --- a/peer.go +++ b/peer.go @@ -137,6 +137,9 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw } func (p *Peer) start() { + + p.logger.Info("Starting peer") + ctx, cancelAll := context.WithCancel(context.Background()) p.cancelAll = cancelAll p.ctx = ctx @@ -886,10 +889,14 @@ func (p *Peer) Restart() { } func (p *Peer) Shutdown() { + p.logger.Info("Shutting down") + p.cancelAll() p.reconnectingWg.Wait() p.healthMonitorWg.Wait() p.writerWg.Wait() p.readerWg.Wait() + + p.logger.Info("Shutdown complete") } diff --git a/peer_manager.go b/peer_manager.go index f50be8c..03d3a7e 100644 --- a/peer_manager.go +++ b/peer_manager.go @@ -83,6 +83,7 @@ func (pm *PeerManager) GetPeers() []PeerI { } func (pm *PeerManager) Shutdown() { + pm.logger.Info("Shutting down peer manager") if pm.cancelAll != nil { pm.cancelAll() From e1ace6ab867b759bbfa7e7a62044b79948653ea9 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Fri, 19 Jul 2024 15:09:48 +0200 Subject: [PATCH 04/10] ARCO-105: Makefile generate go target --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index ce511b5..e06dadc 100644 --- a/Makefile +++ b/Makefile @@ -19,3 +19,7 @@ lint: .PHONY: install install: go install honnef.co/go/tools/cmd/staticcheck@latest + +.PHONY: gen_go +gen_go: + go generate ./... From 59f9ee8ce9d2316543567dd5e1db908f6b19f938 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Fri, 19 Jul 2024 16:13:09 +0200 Subject: [PATCH 05/10] ARCO-105: Change isHealthy variable type to atomic --- peer.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/peer.go b/peer.go index e1546f2..a09be37 100644 --- a/peer.go +++ b/peer.go @@ -71,7 +71,7 @@ type Peer struct { invBatcher *batcher.Batcher[chainhash.Hash] dataBatcher *batcher.Batcher[chainhash.Hash] maximumMessageSize int64 - isHealthy bool + isHealthy atomic.Bool userAgentName *string userAgentVersion *string retryReadWriteMessageInterval time.Duration @@ -822,8 +822,7 @@ func (p *Peer) startMonitorPingPong() { p.setHealthy() case <-monitorConnectionTicker.C: - p.mu.Lock() - p.isHealthy = false + p.isHealthy.Store(false) select { case p.isUnhealthyCh <- struct{}{}: @@ -831,7 +830,6 @@ func (p *Peer) startMonitorPingPong() { } p.logger.Warn("peer unhealthy") - p.mu.Unlock() case <-p.ctx.Done(): return } @@ -845,21 +843,16 @@ func (p *Peer) IsUnhealthyCh() <-chan struct{} { func (p *Peer) setHealthy() { - p.mu.Lock() - if p.isHealthy { - p.mu.Unlock() + if p.isHealthy.Load() { return } - p.logger.Warn("peer healthy") - p.isHealthy = true - p.mu.Unlock() + + p.logger.Info("peer healthy") + p.isHealthy.Store(true) } func (p *Peer) IsHealthy() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.isHealthy + return p.isHealthy.Load() } func (p *Peer) stopReadHandler() { From 0a53a740ad4a3a2a5fb5840543697493beb310dd Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Fri, 19 Jul 2024 16:13:28 +0200 Subject: [PATCH 06/10] ARCO-105: Move functions --- peer.go | 20 -------------------- peer_test.go | 24 ++++++++++++++++++++++-- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/peer.go b/peer.go index a09be37..f259c11 100644 --- a/peer.go +++ b/peer.go @@ -855,26 +855,6 @@ func (p *Peer) IsHealthy() bool { return p.isHealthy.Load() } -func (p *Peer) stopReadHandler() { - if p.cancelReadHandler == nil { - return - } - p.logger.Debug("Cancelling read handlers") - p.cancelReadHandler() - p.logger.Debug("Waiting for read handlers to stop") - p.readerWg.Wait() -} - -func (p *Peer) stopWriteHandler() { - if p.cancelWriteHandler == nil { - return - } - p.logger.Debug("Cancelling write handlers") - p.cancelWriteHandler() - p.logger.Debug("Waiting for writer handlers to stop") - p.writerWg.Wait() -} - func (p *Peer) Restart() { p.Shutdown() diff --git a/peer_test.go b/peer_test.go index a5f9c2a..0075be9 100644 --- a/peer_test.go +++ b/peer_test.go @@ -333,10 +333,10 @@ func TestReconnect(t *testing.T) { if tc.cancelRead { // cancel reader so that writer will disconnect - peer.stopReadHandler() + stopReadHandler(peer) } else { // cancel writer so that reader will disconnect - peer.stopWriteHandler() + stopWriteHandler(peer) } // break connection @@ -637,3 +637,23 @@ func doHandshake(t *testing.T, p *Peer, myConn net.Conn) { assert.NoError(t, err) assert.Equal(t, wire.CmdVerAck, msg.Command()) } + +func stopReadHandler(p *Peer) { + if p.cancelReadHandler == nil { + return + } + p.logger.Debug("Cancelling read handlers") + p.cancelReadHandler() + p.logger.Debug("Waiting for read handlers to stop") + p.readerWg.Wait() +} + +func stopWriteHandler(p *Peer) { + if p.cancelWriteHandler == nil { + return + } + p.logger.Debug("Cancelling write handlers") + p.cancelWriteHandler() + p.logger.Debug("Waiting for writer handlers to stop") + p.writerWg.Wait() +} From 0a94394db04c94f3ad328df6ab3bf2164db0aeee Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Fri, 19 Jul 2024 18:02:34 +0200 Subject: [PATCH 07/10] ARCO-105: Move peer handler integration test --- test/peer_integration_test.go | 37 ----------------- test/peer_manager_integration_test.go | 58 +++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 37 deletions(-) create mode 100644 test/peer_manager_integration_test.go diff --git a/test/peer_integration_test.go b/test/peer_integration_test.go index d443af3..c65bc59 100644 --- a/test/peer_integration_test.go +++ b/test/peer_integration_test.go @@ -148,41 +148,4 @@ func TestNewPeer(t *testing.T) { t.Log("shutdown finished") }) - t.Run("announce transaction", func(t *testing.T) { - logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) - - pm := p2p.NewPeerManager(logger, wire.TestNet) - require.NotNil(t, pm) - - peerHandler := &p2p.PeerHandlerIMock{ - HandleTransactionsGetFunc: func(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) { - return [][]byte{TX1RawBytes}, nil - }, - } - - peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet) - require.NoError(t, err) - - err = pm.AddPeer(peer) - require.NoError(t, err) - - t.Log("expect that peer has connected") - connectLoop: - for { - select { - case <-time.NewTicker(200 * time.Millisecond).C: - if peer.Connected() { - break connectLoop - } - case <-time.NewTimer(5 * time.Second).C: - t.Fatal("peer did not disconnect") - } - } - - pm.AnnounceTransaction(TX1Hash, []p2p.PeerI{peer}) - - time.Sleep(100 * time.Millisecond) - - peer.Shutdown() - }) } diff --git a/test/peer_manager_integration_test.go b/test/peer_manager_integration_test.go new file mode 100644 index 0000000..a26f85c --- /dev/null +++ b/test/peer_manager_integration_test.go @@ -0,0 +1,58 @@ +package test + +import ( + "log/slog" + "os" + "testing" + "time" + + "github.com/libsv/go-p2p" + + "github.com/libsv/go-p2p/wire" + "github.com/stretchr/testify/require" +) + +func TestNewPeerManager(t *testing.T) { + t.Helper() + if testing.Short() { + t.Skip("skipping integration test") + } + + t.Run("announce transaction", func(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + pm := p2p.NewPeerManager(logger, wire.TestNet) + require.NotNil(t, pm) + + peerHandler := &p2p.PeerHandlerIMock{ + HandleTransactionsGetFunc: func(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) { + return [][]byte{TX1RawBytes}, nil + }, + } + + peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet) + require.NoError(t, err) + + err = pm.AddPeer(peer) + require.NoError(t, err) + + t.Log("expect that peer has connected") + connectLoop: + for { + select { + case <-time.NewTicker(200 * time.Millisecond).C: + if peer.Connected() { + break connectLoop + } + case <-time.NewTimer(5 * time.Second).C: + t.Fatal("peer did not disconnect") + } + } + + pm.AnnounceTransaction(TX1Hash, []p2p.PeerI{peer}) + + time.Sleep(100 * time.Millisecond) + + peer.Shutdown() + }) +} From 97323c35f09d1ef13d9559f00f4b2a526624e06e Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Fri, 19 Jul 2024 18:04:29 +0200 Subject: [PATCH 08/10] ARCO-105: Shutdown peer manager --- peer_manager_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/peer_manager_test.go b/peer_manager_test.go index 6062b22..67b371f 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -39,7 +39,7 @@ func TestNewPeerManager(t *testing.T) { err = pm.AddPeer(peer) require.NoError(t, err) assert.Len(t, pm.GetPeers(), 1) - peer.Shutdown() + pm.Shutdown() }) t.Run("1 peer - de dup", func(t *testing.T) { @@ -64,9 +64,7 @@ func TestNewPeerManager(t *testing.T) { assert.Len(t, pm.GetPeers(), 4) - for _, peer := range peers { - peer.Shutdown() - } + pm.Shutdown() }) } From 4fe767d72a6fe0c26707bc8fb925383a463e5418 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Fri, 19 Jul 2024 18:06:16 +0200 Subject: [PATCH 09/10] ARCO-105: Integration test --- test/peer_integration_test.go | 43 +++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/test/peer_integration_test.go b/test/peer_integration_test.go index c65bc59..2db87c8 100644 --- a/test/peer_integration_test.go +++ b/test/peer_integration_test.go @@ -148,4 +148,47 @@ func TestNewPeer(t *testing.T) { t.Log("shutdown finished") }) + t.Run("restart", func(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + peerHandler := p2p.NewMockPeerHandler() + + peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet, p2p.WithUserAgent("agent", "0.0.1")) + require.NoError(t, err) + + t.Log("expect that peer has connected") + connectLoop: + for { + select { + case <-time.NewTicker(200 * time.Millisecond).C: + if peer.Connected() { + break connectLoop + } + case <-time.NewTimer(5 * time.Second).C: + t.Fatal("peer did not connect") + } + } + + t.Log("restart peer") + peer.Restart() + + t.Log("expect that peer has re-established connection") + reconnectLoop: + for { + select { + case <-time.NewTicker(200 * time.Millisecond).C: + if peer.Connected() { + break reconnectLoop + } + case <-time.NewTimer(2 * time.Second).C: + t.Fatal("peer did not reconnect") + } + } + + require.NoError(t, err) + + t.Log("shutdown") + peer.Shutdown() + t.Log("shutdown finished") + }) } From ba6e38b676e65cd78c2293446ee9a141392a0c19 Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Mon, 22 Jul 2024 13:58:01 +0200 Subject: [PATCH 10/10] ARCO-105: Feedback --- peer.go | 4 ++-- peer_manager.go | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/peer.go b/peer.go index f259c11..550b174 100644 --- a/peer.go +++ b/peer.go @@ -40,8 +40,8 @@ const ( retryReadWriteMessageAttempts = 5 reconnectInterval = 10 * time.Second - pingIntervalDefault = 30 * time.Second - connectionHealthTickerDurationDefault = 1 * time.Minute + pingIntervalDefault = 2 * time.Minute + connectionHealthTickerDurationDefault = 3 * time.Minute ) type Block struct { diff --git a/peer_manager.go b/peer_manager.go index 03d3a7e..aeaf134 100644 --- a/peer_manager.go +++ b/peer_manager.go @@ -100,9 +100,7 @@ func (pm *PeerManager) StartMonitorPeerHealth() { for _, peer := range pm.peers { pm.waitGroup.Add(1) go func(p PeerI) { - defer func() { - pm.waitGroup.Done() - }() + defer pm.waitGroup.Done() for { select { case <-pm.ctx.Done():