From 77dd5f5b5e15f5e0bf3632431310f0f10ec6b4dd Mon Sep 17 00:00:00 2001 From: Michael Boeckli Date: Wed, 3 Apr 2024 17:05:07 +0200 Subject: [PATCH] BAARC-136: With retry read write message interval option --- peer.go | 76 +++++++++++++++++++++++++------------------------ peer_options.go | 7 +++++ peer_test.go | 1 + 3 files changed, 47 insertions(+), 37 deletions(-) diff --git a/peer.go b/peer.go index c9d01bb..a7c4dc0 100644 --- a/peer.go +++ b/peer.go @@ -35,9 +35,9 @@ const ( sentMsg = "Sent" receivedMsg = "Recv" - retryReadWriteMessageInterval = 1 * time.Second - retryReadWriteMessageAttempts = 5 - reconnectInterval = 10 * time.Second + retryReadWriteMessageIntervalDefault = 1 * time.Second + retryReadWriteMessageAttempts = 5 + reconnectInterval = 10 * time.Second pingInterval = 2 * time.Minute connectionHealthTickerDuration = 3 * time.Minute @@ -53,29 +53,30 @@ type Block struct { } type Peer struct { - address string - network wire.BitcoinNet - mu sync.RWMutex - readConn net.Conn - writeConn net.Conn - incomingConn net.Conn - dial func(network, address string) (net.Conn, error) - peerHandler PeerHandlerI - writeChan chan wire.Message - quit chan struct{} - pingPongAlive chan struct{} - logger *slog.Logger - sentVerAck atomic.Bool - receivedVerAck atomic.Bool - batchDelay time.Duration - invBatcher *batcher.Batcher[chainhash.Hash] - dataBatcher *batcher.Batcher[chainhash.Hash] - maximumMessageSize int64 - isHealthy bool - cancelReadHandler context.CancelFunc - cancelWriteHandler context.CancelFunc - userAgentName *string - userAgentVersion *string + address string + network wire.BitcoinNet + mu sync.RWMutex + readConn net.Conn + writeConn net.Conn + incomingConn net.Conn + dial func(network, address string) (net.Conn, error) + peerHandler PeerHandlerI + writeChan chan wire.Message + quit chan struct{} + pingPongAlive chan struct{} + logger *slog.Logger + sentVerAck atomic.Bool + receivedVerAck atomic.Bool + batchDelay time.Duration + invBatcher *batcher.Batcher[chainhash.Hash] + dataBatcher *batcher.Batcher[chainhash.Hash] + maximumMessageSize int64 + isHealthy bool + cancelReadHandler context.CancelFunc + cancelWriteHandler context.CancelFunc + userAgentName *string + userAgentVersion *string + retryReadWriteMessageInterval time.Duration } // NewPeer returns a new bitcoin peer for the provided address and configuration. @@ -90,15 +91,16 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw ) p := &Peer{ - network: network, - address: address, - writeChan: writeChan, - pingPongAlive: make(chan struct{}, 1), - peerHandler: peerHandler, - logger: peerLogger, - dial: net.Dial, - maximumMessageSize: defaultMaximumMessageSize, - batchDelay: defaultBatchDelayMilliseconds * time.Millisecond, + network: network, + address: address, + writeChan: writeChan, + pingPongAlive: make(chan struct{}, 1), + peerHandler: peerHandler, + logger: peerLogger, + dial: net.Dial, + maximumMessageSize: defaultMaximumMessageSize, + batchDelay: defaultBatchDelayMilliseconds * time.Millisecond, + retryReadWriteMessageInterval: retryReadWriteMessageIntervalDefault, } var err error @@ -273,7 +275,7 @@ func (p *Peer) String() string { } func (p *Peer) readRetry(ctx context.Context, r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) { - policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts) + policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(p.retryReadWriteMessageInterval), retryReadWriteMessageAttempts) policyContext := backoff.WithContext(policy, ctx) @@ -572,7 +574,7 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) { } func (p *Peer) writeRetry(ctx context.Context, msg wire.Message) error { - policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts) + policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(p.retryReadWriteMessageInterval), retryReadWriteMessageAttempts) policyContext := backoff.WithContext(policy, ctx) diff --git a/peer_options.go b/peer_options.go index 3889fbd..b492e7f 100644 --- a/peer_options.go +++ b/peer_options.go @@ -47,3 +47,10 @@ func WithUserAgent(userAgentName string, userAgentVersion string) PeerOptions { return nil } } + +func WithRetryReadWriteMessageInterval(d time.Duration) PeerOptions { + return func(p *Peer) error { + p.retryReadWriteMessageInterval = d + return nil + } +} diff --git a/peer_test.go b/peer_test.go index a099c70..0741610 100644 --- a/peer_test.go +++ b/peer_test.go @@ -126,6 +126,7 @@ func TestReconnect(t *testing.T) { WithDialer(func(network, address string) (net.Conn, error) { return peerConn, nil }), + WithRetryReadWriteMessageInterval(200*time.Millisecond), ) require.NoError(t, err)