Skip to content

Commit

Permalink
Peer health monitoring routine is notified about unhealthy peer. That…
Browse files Browse the repository at this point in the history
… triggers restarting of the peer.
  • Loading branch information
boecklim committed Jun 18, 2024
1 parent 9e32c96 commit ca12176
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 27 deletions.
1 change: 1 addition & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type PeerI interface {
RequestBlock(blockHash *chainhash.Hash)
Network() wire.BitcoinNet
IsHealthy() bool
IsUnhealthyCh() <-chan struct{}
Shutdown()
Restart()
}
Expand Down
43 changes: 32 additions & 11 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ const (
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second

pingInterval = 2 * time.Minute
connectionHealthTickerDuration = 3 * time.Minute
pingInterval = 30 * time.Second
connectionHealthTickerDuration = 1 * time.Minute
)

type Block struct {
Expand Down Expand Up @@ -76,6 +76,7 @@ type Peer struct {
userAgentVersion *string
retryReadWriteMessageInterval time.Duration
nrWriteHandlers int
isUnhealthyCh chan struct{}

ctx context.Context

Expand Down Expand Up @@ -105,6 +106,7 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
isUnhealthyCh: make(chan struct{}),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
Expand Down Expand Up @@ -784,11 +786,11 @@ func (p *Peer) startMonitorPingPong() {

go func() {
// if no ping/pong signal is received for certain amount of time, mark peer as unhealthy
checkConnectionHealthTicker := time.NewTicker(connectionHealthTickerDuration)
monitorConnectionTicker := time.NewTicker(connectionHealthTickerDuration)

defer func() {
p.healthMonitorWg.Done()
checkConnectionHealthTicker.Stop()
monitorConnectionTicker.Stop()
}()

for {
Expand All @@ -801,16 +803,19 @@ func (p *Peer) startMonitorPingPong() {
}
p.writeChan <- wire.NewMsgPing(nonce)
case <-p.pingPongAlive:
p.mu.Lock()
p.isHealthy = true
p.mu.Unlock()

// if ping/pong is received signal reset the ticker
checkConnectionHealthTicker.Reset(connectionHealthTickerDuration)
case <-checkConnectionHealthTicker.C:
// if ping/pong signal is received reset the ticker
monitorConnectionTicker.Reset(connectionHealthTickerDuration)
p.setHealthy()
case <-monitorConnectionTicker.C:

p.mu.Lock()
p.isHealthy = false

select {
case p.isUnhealthyCh <- struct{}{}:
default:
}

p.logger.Warn("peer unhealthy")
p.mu.Unlock()
case <-p.ctx.Done():
Expand All @@ -820,6 +825,22 @@ func (p *Peer) startMonitorPingPong() {
}()
}

func (p *Peer) IsUnhealthyCh() <-chan struct{} {
return p.isUnhealthyCh
}

func (p *Peer) setHealthy() {

p.mu.Lock()
if p.isHealthy {
p.mu.Unlock()
return
}
p.logger.Warn("peer healthy")
p.isHealthy = true
p.mu.Unlock()
}

func (p *Peer) IsHealthy() bool {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
32 changes: 16 additions & 16 deletions peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,24 @@ func (pm *PeerManager) Shutdown() {
}

func (pm *PeerManager) StartMonitorPeerHealth() {
ticker := time.NewTicker(pm.monitorPeersInterval)
pm.waitGroup.Add(1)
go func() {
defer pm.waitGroup.Done()
for {
select {
case <-pm.ctx.Done():
return
case <-ticker.C:
for _, peer := range pm.GetPeers() {
if !peer.IsHealthy() {
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", peer.String()), slog.Bool("connected", peer.Connected()))
peer.Restart()
}

for _, peer := range pm.peers {
pm.waitGroup.Add(1)
go func(p PeerI) {
defer func() {
pm.waitGroup.Done()
}()
for {
select {
case <-pm.ctx.Done():
return
case <-p.IsUnhealthyCh():
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected()))
p.Restart()
}
}
}
}()
}(peer)
}
}

// AnnounceTransaction will send an INV message to the provided peers or to selected peers if peers is nil
Expand Down
4 changes: 4 additions & 0 deletions peer_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (p *PeerMock) IsHealthy() bool {
return true
}

func (p *PeerMock) IsUnhealthyCh() <-chan struct{} {
return make(<-chan struct{})
}

func (p *PeerMock) Connected() bool {
return true
}
Expand Down

0 comments on commit ca12176

Please sign in to comment.