diff --git a/Peer.go b/Peer.go index 274c1a4..970154d 100644 --- a/Peer.go +++ b/Peer.go @@ -22,10 +22,6 @@ import ( "github.com/ordishs/go-utils/batcher" ) -var ( - pingInterval = 2 * time.Minute -) - const ( defaultMaximumMessageSize = 32 * 1024 * 1024 defaultBatchDelayMilliseconds = 200 @@ -40,6 +36,9 @@ const ( retryReadWriteMessageInterval = 10 * time.Second retryReadWriteMessageAttempts = 5 + + pingInterval = 2 * time.Minute + connectionHealthTickerDuration = 3 * time.Minute ) type Block struct { @@ -62,6 +61,7 @@ type Peer struct { peerHandler PeerHandlerI writeChan chan wire.Message quit chan struct{} + pingPongAlive chan struct{} logger *slog.Logger sentVerAck atomic.Bool receivedVerAck atomic.Bool @@ -69,6 +69,7 @@ type Peer struct { invBatcher *batcher.Batcher[chainhash.Hash] dataBatcher *batcher.Batcher[chainhash.Hash] maximumMessageSize int64 + isHealthy bool } // NewPeer returns a new bitcoin peer for the provided address and configuration. @@ -86,6 +87,7 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw network: network, address: address, writeChan: writeChan, + pingPongAlive: make(chan struct{}, 1), peerHandler: peerHandler, logger: peerLogger, dial: net.Dial, @@ -104,6 +106,7 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw func (p *Peer) initialize() { + go p.monitorConnectionHealth() go p.pingHandler() for i := 0; i < 10; i++ { // start 10 workers that will write to the peer @@ -311,6 +314,9 @@ func (p *Peer) readHandler() { p.sentVerAck.Store(true) case wire.CmdPing: + commandLogger.Debug(receivedMsg, slog.String(commandKey, strings.ToUpper(wire.CmdPing))) + p.pingPongAlive <- struct{}{} + pingMsg, ok := msg.(*wire.MsgPing) if !ok { continue @@ -404,7 +410,12 @@ func (p *Peer) readHandler() { commandLogger.Debug(receivedMsg) p.receivedVerAck.Store(true) + case wire.CmdPong: + commandLogger.Debug(receivedMsg, slog.String(commandKey, strings.ToUpper(wire.CmdPong))) + p.pingPongAlive <- struct{}{} + default: + commandLogger.Debug("command ignored") } } @@ -605,12 +616,11 @@ func (p *Peer) versionMessage(address string) *wire.MsgVersion { return msg } -// pingHandler periodically pings the peer. It must be run as a goroutine. +// pingHandler periodically pings the peer. It must be run as a goroutine. func (p *Peer) pingHandler() { pingTicker := time.NewTicker(pingInterval) defer pingTicker.Stop() -out: for { select { case <-pingTicker.C: @@ -622,7 +632,38 @@ out: p.writeChan <- wire.NewMsgPing(nonce) case <-p.quit: - break out + return } } } + +func (p *Peer) monitorConnectionHealth() { + // if no ping/pong signal is received for certain amount of time, mark peer as unhealthy + checkConnectionHealthTicker := time.NewTicker(connectionHealthTickerDuration) + defer checkConnectionHealthTicker.Stop() + + for { + select { + case <-p.pingPongAlive: + p.mu.Lock() + p.isHealthy = true + p.mu.Unlock() + + // if ping/pong is received signal reset the ticker + checkConnectionHealthTicker.Reset(connectionHealthTickerDuration) + case <-checkConnectionHealthTicker.C: + p.mu.Lock() + p.isHealthy = false + p.mu.Unlock() + case <-p.quit: + return + } + } +} + +func (p *Peer) IsHealthy() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.isHealthy +}