Skip to content

Commit

Permalink
Log receiving both ping and pong signals. Write to ping pong alive ch…
Browse files Browse the repository at this point in the history
…annel
  • Loading branch information
boecklim committed Jan 23, 2024
1 parent 8e255ec commit c3e7102
Showing 1 changed file with 48 additions and 7 deletions.
55 changes: 48 additions & 7 deletions Peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"github.com/ordishs/go-utils/batcher"
)

var (
pingInterval = 2 * time.Minute
)

const (
defaultMaximumMessageSize = 32 * 1024 * 1024
defaultBatchDelayMilliseconds = 200
Expand All @@ -40,6 +36,9 @@ const (

retryReadWriteMessageInterval = 10 * time.Second
retryReadWriteMessageAttempts = 5

pingInterval = 2 * time.Minute
connectionHealthTickerDuration = 3 * time.Minute
)

type Block struct {
Expand All @@ -62,13 +61,15 @@ type Peer struct {
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
}

// NewPeer returns a new bitcoin peer for the provided address and configuration.
Expand All @@ -86,6 +87,7 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
network: network,
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
Expand All @@ -104,6 +106,7 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw

func (p *Peer) initialize() {

go p.monitorConnectionHealth()
go p.pingHandler()
for i := 0; i < 10; i++ {
// start 10 workers that will write to the peer
Expand Down Expand Up @@ -311,6 +314,9 @@ func (p *Peer) readHandler() {
p.sentVerAck.Store(true)

case wire.CmdPing:
commandLogger.Debug(receivedMsg, slog.String(commandKey, strings.ToUpper(wire.CmdPing)))
p.pingPongAlive <- struct{}{}

pingMsg, ok := msg.(*wire.MsgPing)
if !ok {
continue
Expand Down Expand Up @@ -404,7 +410,12 @@ func (p *Peer) readHandler() {
commandLogger.Debug(receivedMsg)
p.receivedVerAck.Store(true)

case wire.CmdPong:
commandLogger.Debug(receivedMsg, slog.String(commandKey, strings.ToUpper(wire.CmdPong)))
p.pingPongAlive <- struct{}{}

default:

commandLogger.Debug("command ignored")
}
}
Expand Down Expand Up @@ -605,12 +616,11 @@ func (p *Peer) versionMessage(address string) *wire.MsgVersion {
return msg
}

// pingHandler periodically pings the peer. It must be run as a goroutine.
// pingHandler periodically pings the peer. It must be run as a goroutine.
func (p *Peer) pingHandler() {
pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop()

out:
for {
select {
case <-pingTicker.C:
Expand All @@ -622,7 +632,38 @@ out:
p.writeChan <- wire.NewMsgPing(nonce)

case <-p.quit:
break out
return
}
}
}

func (p *Peer) monitorConnectionHealth() {
// if no ping/pong signal is received for certain amount of time, mark peer as unhealthy
checkConnectionHealthTicker := time.NewTicker(connectionHealthTickerDuration)
defer checkConnectionHealthTicker.Stop()

for {
select {
case <-p.pingPongAlive:
p.mu.Lock()
p.isHealthy = true
p.mu.Unlock()

// if ping/pong is received signal reset the ticker
checkConnectionHealthTicker.Reset(connectionHealthTickerDuration)
case <-checkConnectionHealthTicker.C:
p.mu.Lock()
p.isHealthy = false
p.mu.Unlock()
case <-p.quit:
return
}
}
}

func (p *Peer) IsHealthy() bool {
p.mu.Lock()
defer p.mu.Unlock()

return p.isHealthy
}

0 comments on commit c3e7102

Please sign in to comment.