Skip to content

Commit

Permalink
Log receiving both ping and pong signals. Write to ping pong alive ch…
Browse files Browse the repository at this point in the history
…annel
  • Loading branch information
boecklim committed Jan 23, 2024
1 parent 8e255ec commit 0277577
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions Peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (

retryReadWriteMessageInterval = 10 * time.Second
retryReadWriteMessageAttempts = 5

connectionHealthTickerDuration = time.Second * 60
)

type Block struct {
Expand All @@ -62,13 +64,15 @@ type Peer struct {
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
}

// NewPeer returns a new bitcoin peer for the provided address and configuration.
Expand All @@ -86,6 +90,7 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
network: network,
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
Expand All @@ -97,6 +102,8 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
option(p)
}

go p.monitorConnectionHealth()

p.initialize()

return p, nil
Expand Down Expand Up @@ -311,6 +318,9 @@ func (p *Peer) readHandler() {
p.sentVerAck.Store(true)

case wire.CmdPing:
commandLogger.Debug(receivedMsg, slog.String(commandKey, strings.ToUpper(wire.CmdPing)))
p.pingPongAlive <- struct{}{}

pingMsg, ok := msg.(*wire.MsgPing)
if !ok {
continue
Expand Down Expand Up @@ -404,7 +414,12 @@ func (p *Peer) readHandler() {
commandLogger.Debug(receivedMsg)
p.receivedVerAck.Store(true)

case wire.CmdPong:
commandLogger.Debug(receivedMsg, slog.String(commandKey, strings.ToUpper(wire.CmdPong)))
p.pingPongAlive <- struct{}{}

default:

commandLogger.Debug("command ignored")
}
}
Expand Down Expand Up @@ -626,3 +641,30 @@ out:
}
}
}

func (p *Peer) monitorConnectionHealth() {
// if no ping/pong signal is received for certain amount of time, mark peer as unhealthy
ticker := time.NewTicker(connectionHealthTickerDuration)
for {
select {
case <-p.pingPongAlive:
p.mu.Lock()
p.isHealthy = true
p.mu.Unlock()

// if ping/pong is received signal reset the ticker
ticker.Reset(connectionHealthTickerDuration)
case <-ticker.C:
p.mu.Lock()
p.isHealthy = false
p.mu.Unlock()
}
}
}

func (p *Peer) IsHealthy() bool {
p.mu.Lock()
defer p.mu.Unlock()

return p.isHealthy
}

0 comments on commit 0277577

Please sign in to comment.