Skip to content

Commit

Permalink
Peer manager has option to monitor peer health and restart peer
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Jun 13, 2024
1 parent 3abd312 commit 4ac2ee5
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 10 deletions.
63 changes: 53 additions & 10 deletions peer_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package p2p

import (
"context"
"log/slog"
"sort"
"sync"
Expand All @@ -13,12 +14,17 @@ import (
const defaultExcessiveBlockSize = 4000000000

type PeerManager struct {
mu sync.RWMutex
peers []PeerI
network wire.BitcoinNet
batchDelay time.Duration
logger *slog.Logger
ebs int64
mu sync.RWMutex
peers []PeerI
network wire.BitcoinNet
batchDelay time.Duration
logger *slog.Logger
ebs int64
restartUnhealthyPeers bool
monitorPeersInterval time.Duration
waitGroup sync.WaitGroup
cancelAll context.CancelFunc
ctx context.Context
}

// NewPeerManager creates a new PeerManager
Expand All @@ -28,19 +34,29 @@ type PeerManager struct {
func NewPeerManager(logger *slog.Logger, network wire.BitcoinNet, options ...PeerManagerOptions) PeerManagerI {

pm := &PeerManager{
peers: make([]PeerI, 0),
network: network,
logger: logger,
ebs: defaultExcessiveBlockSize,
peers: make([]PeerI, 0),
network: network,
logger: logger,
ebs: defaultExcessiveBlockSize,
restartUnhealthyPeers: false,
waitGroup: sync.WaitGroup{},
}

for _, option := range options {
option(pm)
}

ctx, cancel := context.WithCancel(context.Background())
pm.ctx = ctx
pm.cancelAll = cancel

logger.Info("Excessive block size set to", slog.Int64("block size", pm.ebs))
wire.SetLimits(uint64(pm.ebs))

if pm.restartUnhealthyPeers {
pm.StartMonitorPeerHealth()
}

return pm
}

Expand Down Expand Up @@ -68,11 +84,38 @@ func (pm *PeerManager) GetPeers() []PeerI {
}

func (pm *PeerManager) Shutdown() {

if pm.cancelAll != nil {
pm.cancelAll()
pm.waitGroup.Wait()
}

for _, peer := range pm.peers {
peer.Shutdown()
}
}

func (pm *PeerManager) StartMonitorPeerHealth() {
ticker := time.NewTicker(pm.monitorPeersInterval)
pm.waitGroup.Add(1)
go func() {
defer pm.waitGroup.Done()
for {
select {
case <-pm.ctx.Done():
return
case <-ticker.C:
for _, peer := range pm.GetPeers() {
if !peer.IsHealthy() {
pm.logger.Warn("peer unhealthy - restarting", slog.String("address", peer.String()), slog.Bool("connected", peer.Connected()))
peer.Restart()
}
}
}
}
}()
}

// AnnounceTransaction will send an INV message to the provided peers or to selected peers if peers is nil
// it will return the peers that the transaction was actually announced to
func (pm *PeerManager) AnnounceTransaction(txHash *chainhash.Hash, peers []PeerI) []PeerI {
Expand Down
7 changes: 7 additions & 0 deletions peer_manager_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ func WithExcessiveBlockSize(ebs int64) PeerManagerOptions {
p.ebs = ebs
}
}

func WithRestartUnhealthyPeers(restartUnhealthyPeers bool, monitorPeersInterval time.Duration) PeerManagerOptions {
return func(p *PeerManager) {
p.restartUnhealthyPeers = restartUnhealthyPeers
p.monitorPeersInterval = monitorPeersInterval
}
}

0 comments on commit 4ac2ee5

Please sign in to comment.