Skip to content

Commit

Permalink
Merge pull request #28 from libsv/fix/restart-unhealthy
Browse files Browse the repository at this point in the history
Fix/restart unhealthy
  • Loading branch information
boecklim authored Jul 25, 2024
2 parents b5ce3c9 + c7d4fc8 commit 08a38f5
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 21 deletions.
2 changes: 1 addition & 1 deletion peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func (p *Peer) startMonitorPingPong() {

select {
case p.isUnhealthyCh <- struct{}{}:
default: // Do not block if nothing is ready from channel
default: // Do not block if nothing is reading from channel
}

p.logger.Warn("peer unhealthy")
Expand Down
39 changes: 19 additions & 20 deletions peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ func NewPeerManager(logger *slog.Logger, network wire.BitcoinNet, options ...Pee
logger.Info("Excessive block size set to", slog.Int64("block size", pm.ebs))
wire.SetLimits(uint64(pm.ebs))

if pm.restartUnhealthyPeers {
pm.StartMonitorPeerHealth()
}

return pm
}

Expand All @@ -69,6 +65,10 @@ func (pm *PeerManager) AddPeer(peer PeerI) error {

pm.peers = append(pm.peers, peer)

if pm.restartUnhealthyPeers {
pm.startMonitorPeerHealth(peer)
}

return nil
}

Expand All @@ -95,23 +95,22 @@ func (pm *PeerManager) Shutdown() {
}
}

func (pm *PeerManager) StartMonitorPeerHealth() {

for _, peer := range pm.peers {
pm.waitGroup.Add(1)
go func(p PeerI) {
defer pm.waitGroup.Done()
for {
select {
case <-pm.ctx.Done():
return
case <-p.IsUnhealthyCh():
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected()))
p.Restart()
}
func (pm *PeerManager) startMonitorPeerHealth(peer PeerI) {
pm.logger.Info("Starting peer health monitoring")

pm.waitGroup.Add(1)
go func(p PeerI) {
defer pm.waitGroup.Done()
for {
select {
case <-pm.ctx.Done():
return
case <-p.IsUnhealthyCh():
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected()))
p.Restart()
}
}(peer)
}
}
}(peer)
}

// AnnounceTransaction will send an INV message to the provided peers or to selected peers if peers is nil
Expand Down
42 changes: 42 additions & 0 deletions peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,45 @@ func TestAnnounceNewTransaction(t *testing.T) {
assert.GreaterOrEqual(t, peersMessaged, len(peers)/2)
})
}

func TestMonitorPeerHealth(t *testing.T) {

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
tt := []struct {
name string
restartUnhealthyPeers bool
}{
{
name: "restart unhealthy peers",
restartUnhealthyPeers: true,
},
{
name: "do not restart unhealthy peers",
restartUnhealthyPeers: false,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
var opts []PeerManagerOptions

if tc.restartUnhealthyPeers {
opts = append(opts, WithRestartUnhealthyPeers())
}

pm := NewPeerManager(logger, wire.TestNet, opts...)
require.NotNil(t, pm)

peerHandler := NewMockPeerHandler()

peer, err := NewPeer(logger, "localhost:18333", peerHandler, wire.TestNet, WithPingInterval(100*time.Millisecond, 200*time.Millisecond))
require.NoError(t, err)

err = pm.AddPeer(peer)
require.NoError(t, err)
assert.Len(t, pm.GetPeers(), 1)
time.Sleep(1 * time.Second)
pm.Shutdown()
})
}
}

0 comments on commit 08a38f5

Please sign in to comment.