Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor peer health monitor #23

Merged
merged 10 commits into from
Jul 22, 2024
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ lint:
.PHONY: install
install:
go install honnef.co/go/tools/cmd/staticcheck@latest

.PHONY: gen_go
gen_go:
go generate ./...
1 change: 1 addition & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type PeerI interface {
RequestBlock(blockHash *chainhash.Hash)
Network() wire.BitcoinNet
IsHealthy() bool
IsUnhealthyCh() <-chan struct{}
Shutdown()
Restart()
}
Expand Down
76 changes: 40 additions & 36 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ const (
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second

pingInterval = 2 * time.Minute
connectionHealthTickerDuration = 3 * time.Minute
pingIntervalDefault = 2 * time.Minute
connectionHealthTickerDurationDefault = 3 * time.Minute
)

type Block struct {
Expand Down Expand Up @@ -71,13 +71,15 @@ type Peer struct {
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
isHealthy atomic.Bool
userAgentName *string
userAgentVersion *string
retryReadWriteMessageInterval time.Duration
nrWriteHandlers int

ctx context.Context
isUnhealthyCh chan struct{}
pingInterval time.Duration
connectionHealthThreshold time.Duration
ctx context.Context

cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
Expand Down Expand Up @@ -105,13 +107,16 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
isUnhealthyCh: make(chan struct{}),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
nrWriteHandlers: nrWriteHandlersDefault,
maximumMessageSize: defaultMaximumMessageSize,
batchDelay: defaultBatchDelayMilliseconds * time.Millisecond,
retryReadWriteMessageInterval: retryReadWriteMessageIntervalDefault,
pingInterval: pingIntervalDefault,
connectionHealthThreshold: connectionHealthTickerDurationDefault,
writerWg: &sync.WaitGroup{},
readerWg: &sync.WaitGroup{},
reconnectingWg: &sync.WaitGroup{},
Expand All @@ -132,6 +137,9 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
}

func (p *Peer) start() {

p.logger.Info("Starting peer")

ctx, cancelAll := context.WithCancel(context.Background())
p.cancelAll = cancelAll
p.ctx = ctx
Expand Down Expand Up @@ -788,15 +796,15 @@ func (p *Peer) versionMessage(address string) *wire.MsgVersion {
func (p *Peer) startMonitorPingPong() {
p.healthMonitorWg.Add(1)

pingTicker := time.NewTicker(pingInterval)
pingTicker := time.NewTicker(p.pingInterval)

go func() {
// if no ping/pong signal is received for certain amount of time, mark peer as unhealthy
checkConnectionHealthTicker := time.NewTicker(connectionHealthTickerDuration)
monitorConnectionTicker := time.NewTicker(p.connectionHealthThreshold)

defer func() {
p.healthMonitorWg.Done()
checkConnectionHealthTicker.Stop()
monitorConnectionTicker.Stop()
}()

for {
Expand All @@ -809,50 +817,42 @@ func (p *Peer) startMonitorPingPong() {
}
p.writeChan <- wire.NewMsgPing(nonce)
case <-p.pingPongAlive:
p.mu.Lock()
p.isHealthy = true
p.mu.Unlock()
// if ping/pong signal is received reset the ticker
monitorConnectionTicker.Reset(p.connectionHealthThreshold)
p.setHealthy()
case <-monitorConnectionTicker.C:

// if ping/pong is received signal reset the ticker
checkConnectionHealthTicker.Reset(connectionHealthTickerDuration)
case <-checkConnectionHealthTicker.C:
p.isHealthy.Store(false)

select {
case p.isUnhealthyCh <- struct{}{}:
default: // Do not block if nothing is ready from channel
}

p.mu.Lock()
p.isHealthy = false
p.logger.Warn("peer unhealthy")
p.mu.Unlock()
case <-p.ctx.Done():
return
}
}
}()
}

func (p *Peer) IsHealthy() bool {
p.mu.Lock()
defer p.mu.Unlock()

return p.isHealthy
func (p *Peer) IsUnhealthyCh() <-chan struct{} {
return p.isUnhealthyCh
}

func (p *Peer) stopReadHandler() {
if p.cancelReadHandler == nil {
func (p *Peer) setHealthy() {

if p.isHealthy.Load() {
return
}
p.logger.Debug("Cancelling read handlers")
p.cancelReadHandler()
p.logger.Debug("Waiting for read handlers to stop")
p.readerWg.Wait()

p.logger.Info("peer healthy")
p.isHealthy.Store(true)
}

func (p *Peer) stopWriteHandler() {
if p.cancelWriteHandler == nil {
return
}
p.logger.Debug("Cancelling write handlers")
p.cancelWriteHandler()
p.logger.Debug("Waiting for writer handlers to stop")
p.writerWg.Wait()
func (p *Peer) IsHealthy() bool {
return p.isHealthy.Load()
}

func (p *Peer) Restart() {
Expand All @@ -862,10 +862,14 @@ func (p *Peer) Restart() {
}

func (p *Peer) Shutdown() {
p.logger.Info("Shutting down")

p.cancelAll()

p.reconnectingWg.Wait()
p.healthMonitorWg.Wait()
p.writerWg.Wait()
p.readerWg.Wait()

p.logger.Info("Shutdown complete")
}
32 changes: 15 additions & 17 deletions peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type PeerManager struct {
logger *slog.Logger
ebs int64
restartUnhealthyPeers bool
monitorPeersInterval time.Duration
waitGroup sync.WaitGroup
cancelAll context.CancelFunc
ctx context.Context
Expand Down Expand Up @@ -84,6 +83,7 @@ func (pm *PeerManager) GetPeers() []PeerI {
}

func (pm *PeerManager) Shutdown() {
pm.logger.Info("Shutting down peer manager")

if pm.cancelAll != nil {
pm.cancelAll()
Expand All @@ -96,24 +96,22 @@ func (pm *PeerManager) Shutdown() {
}

func (pm *PeerManager) StartMonitorPeerHealth() {
ticker := time.NewTicker(pm.monitorPeersInterval)
pm.waitGroup.Add(1)
go func() {
defer pm.waitGroup.Done()
for {
select {
case <-pm.ctx.Done():
return
case <-ticker.C:
for _, peer := range pm.GetPeers() {
if !peer.IsHealthy() {
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", peer.String()), slog.Bool("connected", peer.Connected()))
peer.Restart()
}

for _, peer := range pm.peers {
pm.waitGroup.Add(1)
go func(p PeerI) {
defer pm.waitGroup.Done()
for {
select {
case <-pm.ctx.Done():
return
case <-p.IsUnhealthyCh():
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", p.String()), slog.Bool("connected", p.Connected()))
p.Restart()
}
}
}
}()
}(peer)
}
}

// AnnounceTransaction will send an INV message to the provided peers or to selected peers if peers is nil
Expand Down
3 changes: 1 addition & 2 deletions peer_manager_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ func WithExcessiveBlockSize(ebs int64) PeerManagerOptions {
}
}

func WithRestartUnhealthyPeers(monitorPeersInterval time.Duration) PeerManagerOptions {
func WithRestartUnhealthyPeers() PeerManagerOptions {
return func(p *PeerManager) {
p.restartUnhealthyPeers = true
p.monitorPeersInterval = monitorPeersInterval
}
}
6 changes: 2 additions & 4 deletions peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestNewPeerManager(t *testing.T) {
err = pm.AddPeer(peer)
require.NoError(t, err)
assert.Len(t, pm.GetPeers(), 1)
peer.Shutdown()
pm.Shutdown()
})

t.Run("1 peer - de dup", func(t *testing.T) {
Expand All @@ -64,9 +64,7 @@ func TestNewPeerManager(t *testing.T) {

assert.Len(t, pm.GetPeers(), 4)

for _, peer := range peers {
peer.Shutdown()
}
pm.Shutdown()
})
}

Expand Down
4 changes: 4 additions & 0 deletions peer_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (p *PeerMock) IsHealthy() bool {
return true
}

func (p *PeerMock) IsUnhealthyCh() <-chan struct{} {
return make(<-chan struct{})
}

func (p *PeerMock) Connected() bool {
return true
}
Expand Down
11 changes: 11 additions & 0 deletions peer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,14 @@ func WithNrOfWriteHandlers(NrWriteHandlers int) PeerOptions {
return nil
}
}

// WithPingInterval sets the optional time duration ping interval and connection health threshold
// ping interval is the time interval in which the peer sends a ping
// connection health threshold is the time duration after which the connection is marked unhealthy if no signal is received
func WithPingInterval(pingInterval time.Duration, connectionHealthThreshold time.Duration) PeerOptions {
return func(p *Peer) error {
p.pingInterval = pingInterval
p.connectionHealthThreshold = connectionHealthThreshold
return nil
}
}
24 changes: 22 additions & 2 deletions peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,10 @@ func TestReconnect(t *testing.T) {

if tc.cancelRead {
// cancel reader so that writer will disconnect
peer.stopReadHandler()
stopReadHandler(peer)
} else {
// cancel writer so that reader will disconnect
peer.stopWriteHandler()
stopWriteHandler(peer)
}

// break connection
Expand Down Expand Up @@ -637,3 +637,23 @@ func doHandshake(t *testing.T, p *Peer, myConn net.Conn) {
assert.NoError(t, err)
assert.Equal(t, wire.CmdVerAck, msg.Command())
}

func stopReadHandler(p *Peer) {
if p.cancelReadHandler == nil {
return
}
p.logger.Debug("Cancelling read handlers")
p.cancelReadHandler()
p.logger.Debug("Waiting for read handlers to stop")
p.readerWg.Wait()
}

func stopWriteHandler(p *Peer) {
if p.cancelWriteHandler == nil {
return
}
p.logger.Debug("Cancelling write handlers")
p.cancelWriteHandler()
p.logger.Debug("Waiting for writer handlers to stop")
p.writerWg.Wait()
}
38 changes: 22 additions & 16 deletions test/peer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,12 @@ func TestNewPeer(t *testing.T) {
t.Log("shutdown finished")
})

t.Run("announce transaction", func(t *testing.T) {
t.Run("restart", func(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))

pm := p2p.NewPeerManager(logger, wire.TestNet)
require.NotNil(t, pm)

peerHandler := &p2p.PeerHandlerIMock{
HandleTransactionsGetFunc: func(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) {
return [][]byte{TX1RawBytes}, nil
},
}

peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet)
require.NoError(t, err)
peerHandler := p2p.NewMockPeerHandler()

err = pm.AddPeer(peer)
peer, err := p2p.NewPeer(logger, "localhost:"+p2pPortBinding, peerHandler, wire.TestNet, p2p.WithUserAgent("agent", "0.0.1"))
require.NoError(t, err)

t.Log("expect that peer has connected")
Expand All @@ -175,14 +165,30 @@ func TestNewPeer(t *testing.T) {
break connectLoop
}
case <-time.NewTimer(5 * time.Second).C:
t.Fatal("peer did not disconnect")
t.Fatal("peer did not connect")
}
}

pm.AnnounceTransaction(TX1Hash, []p2p.PeerI{peer})
t.Log("restart peer")
peer.Restart()

time.Sleep(100 * time.Millisecond)
t.Log("expect that peer has re-established connection")
reconnectLoop:
for {
select {
case <-time.NewTicker(200 * time.Millisecond).C:
if peer.Connected() {
break reconnectLoop
}
case <-time.NewTimer(2 * time.Second).C:
t.Fatal("peer did not reconnect")
}
}

require.NoError(t, err)

t.Log("shutdown")
peer.Shutdown()
t.Log("shutdown finished")
})
}
Loading
Loading