Skip to content

Commit

Permalink
Peer health monitoring routine is notified about unhealthy peer. That…
Browse files Browse the repository at this point in the history
… triggers restarting of the peer.
  • Loading branch information
boecklim committed Jun 18, 2024
1 parent 9e32c96 commit 424e6ee
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 28 deletions.
1 change: 1 addition & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type PeerI interface {
RequestBlock(blockHash *chainhash.Hash)
Network() wire.BitcoinNet
IsHealthy() bool
IsUnhealthyCh() <-chan struct{}
Shutdown()
Restart()
}
Expand Down
49 changes: 37 additions & 12 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ const (
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second

pingInterval = 2 * time.Minute
connectionHealthTickerDuration = 3 * time.Minute
pingInterval = 30 * time.Second
connectionHealthTickerDuration = 1 * time.Minute
)

type Block struct {
Expand Down Expand Up @@ -76,6 +76,7 @@ type Peer struct {
userAgentVersion *string
retryReadWriteMessageInterval time.Duration
nrWriteHandlers int
isUnhealthyCh chan struct{}

ctx context.Context

Expand Down Expand Up @@ -105,6 +106,7 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
isUnhealthyCh: make(chan struct{}),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
Expand Down Expand Up @@ -421,6 +423,7 @@ func (p *Peer) startReadHandler(ctx context.Context) {
return
}

//p.anySignalReceived <- struct{}{}
commandLogger := p.logger.With(slog.String(commandKey, strings.ToUpper(msg.Command())))

// we could check this based on type (switch msg.(type)) but that would not allow
Expand Down Expand Up @@ -668,7 +671,10 @@ func (p *Peer) writeRetry(ctx context.Context, msg wire.Message) error {
}

func (p *Peer) startWriteChannelHandler(ctx context.Context, instance int) {
p.writerWg.Add(1)
defer func() {
p.logger.Debug("Shutting down write handler", slog.Int("instance", instance))
p.writerWg.Done()
}()
go func() {
p.logger.Debug("Starting write handler", slog.Int("instance", instance))

Expand Down Expand Up @@ -784,11 +790,11 @@ func (p *Peer) startMonitorPingPong() {

go func() {
// if no ping/pong signal is received for certain amount of time, mark peer as unhealthy
checkConnectionHealthTicker := time.NewTicker(connectionHealthTickerDuration)
monitorConnectionTicker := time.NewTicker(connectionHealthTickerDuration)

defer func() {
p.healthMonitorWg.Done()
checkConnectionHealthTicker.Stop()
monitorConnectionTicker.Stop()
}()

for {
Expand All @@ -801,16 +807,19 @@ func (p *Peer) startMonitorPingPong() {
}
p.writeChan <- wire.NewMsgPing(nonce)
case <-p.pingPongAlive:
p.mu.Lock()
p.isHealthy = true
p.mu.Unlock()

// if ping/pong is received signal reset the ticker
checkConnectionHealthTicker.Reset(connectionHealthTickerDuration)
case <-checkConnectionHealthTicker.C:
// if ping/pong signal is received reset the ticker
monitorConnectionTicker.Reset(connectionHealthTickerDuration)
p.setHealthy()
case <-monitorConnectionTicker.C:

p.mu.Lock()
p.isHealthy = false

select {
case p.isUnhealthyCh <- struct{}{}:
default:
}

p.logger.Warn("peer unhealthy")
p.mu.Unlock()
case <-p.ctx.Done():
Expand All @@ -820,6 +829,22 @@ func (p *Peer) startMonitorPingPong() {
}()
}

func (p *Peer) IsUnhealthyCh() <-chan struct{} {
return p.isUnhealthyCh
}

func (p *Peer) setHealthy() {

p.mu.Lock()
if p.isHealthy {
p.mu.Unlock()
return
}
p.logger.Warn("peer healthy")
p.isHealthy = true
p.mu.Unlock()
}

func (p *Peer) IsHealthy() bool {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
32 changes: 16 additions & 16 deletions peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,24 @@ func (pm *PeerManager) Shutdown() {
}

func (pm *PeerManager) StartMonitorPeerHealth() {
ticker := time.NewTicker(pm.monitorPeersInterval)
pm.waitGroup.Add(1)
go func() {
defer pm.waitGroup.Done()
for {
select {
case <-pm.ctx.Done():
return
case <-ticker.C:
for _, peer := range pm.GetPeers() {
if !peer.IsHealthy() {
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", peer.String()), slog.Bool("connected", peer.Connected()))
peer.Restart()
}

for _, peer := range pm.peers {
pm.waitGroup.Add(1)
go func(p PeerI) {
defer func() {
pm.waitGroup.Done()
}()
for {
select {
case <-pm.ctx.Done():
return
case <-p.IsUnhealthyCh():
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected()))
p.Restart()
}
}
}
}()
}(peer)
}
}

// AnnounceTransaction will send an INV message to the provided peers or to selected peers if peers is nil
Expand Down

0 comments on commit 424e6ee

Please sign in to comment.