Skip to content

Commit

Permalink
BAARC-136: With retry read write message interval option
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Apr 4, 2024
1 parent 423c9e1 commit 77dd5f5
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 37 deletions.
76 changes: 39 additions & 37 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ const (
sentMsg = "Sent"
receivedMsg = "Recv"

retryReadWriteMessageInterval = 1 * time.Second
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second
retryReadWriteMessageIntervalDefault = 1 * time.Second
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second

pingInterval = 2 * time.Minute
connectionHealthTickerDuration = 3 * time.Minute
Expand All @@ -53,29 +53,30 @@ type Block struct {
}

type Peer struct {
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
userAgentName *string
userAgentVersion *string
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
userAgentName *string
userAgentVersion *string
retryReadWriteMessageInterval time.Duration
}

// NewPeer returns a new bitcoin peer for the provided address and configuration.
Expand All @@ -90,15 +91,16 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
)

p := &Peer{
network: network,
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
maximumMessageSize: defaultMaximumMessageSize,
batchDelay: defaultBatchDelayMilliseconds * time.Millisecond,
network: network,
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
maximumMessageSize: defaultMaximumMessageSize,
batchDelay: defaultBatchDelayMilliseconds * time.Millisecond,
retryReadWriteMessageInterval: retryReadWriteMessageIntervalDefault,
}

var err error
Expand Down Expand Up @@ -273,7 +275,7 @@ func (p *Peer) String() string {
}

func (p *Peer) readRetry(ctx context.Context, r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(p.retryReadWriteMessageInterval), retryReadWriteMessageAttempts)

policyContext := backoff.WithContext(policy, ctx)

Expand Down Expand Up @@ -572,7 +574,7 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) {
}

func (p *Peer) writeRetry(ctx context.Context, msg wire.Message) error {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(p.retryReadWriteMessageInterval), retryReadWriteMessageAttempts)

policyContext := backoff.WithContext(policy, ctx)

Expand Down
7 changes: 7 additions & 0 deletions peer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ func WithUserAgent(userAgentName string, userAgentVersion string) PeerOptions {
return nil
}
}

func WithRetryReadWriteMessageInterval(d time.Duration) PeerOptions {
return func(p *Peer) error {
p.retryReadWriteMessageInterval = d
return nil
}
}
1 change: 1 addition & 0 deletions peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestReconnect(t *testing.T) {
WithDialer(func(network, address string) (net.Conn, error) {
return peerConn, nil
}),
WithRetryReadWriteMessageInterval(200*time.Millisecond),
)
require.NoError(t, err)

Expand Down

0 comments on commit 77dd5f5

Please sign in to comment.