Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/write reconnect #18

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
BAARC-136: With retry read write message interval option
boecklim committed Apr 4, 2024
commit 77dd5f5b5e15f5e0bf3632431310f0f10ec6b4dd
76 changes: 39 additions & 37 deletions peer.go
Original file line number Diff line number Diff line change
@@ -35,9 +35,9 @@ const (
sentMsg = "Sent"
receivedMsg = "Recv"

retryReadWriteMessageInterval = 1 * time.Second
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second
retryReadWriteMessageIntervalDefault = 1 * time.Second
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second

pingInterval = 2 * time.Minute
connectionHealthTickerDuration = 3 * time.Minute
@@ -53,29 +53,30 @@ type Block struct {
}

type Peer struct {
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
userAgentName *string
userAgentVersion *string
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
userAgentName *string
userAgentVersion *string
retryReadWriteMessageInterval time.Duration
}

// NewPeer returns a new bitcoin peer for the provided address and configuration.
@@ -90,15 +91,16 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
)

p := &Peer{
network: network,
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
maximumMessageSize: defaultMaximumMessageSize,
batchDelay: defaultBatchDelayMilliseconds * time.Millisecond,
network: network,
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
maximumMessageSize: defaultMaximumMessageSize,
batchDelay: defaultBatchDelayMilliseconds * time.Millisecond,
retryReadWriteMessageInterval: retryReadWriteMessageIntervalDefault,
}

var err error
@@ -273,7 +275,7 @@ func (p *Peer) String() string {
}

func (p *Peer) readRetry(ctx context.Context, r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(p.retryReadWriteMessageInterval), retryReadWriteMessageAttempts)

policyContext := backoff.WithContext(policy, ctx)

@@ -572,7 +574,7 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) {
}

func (p *Peer) writeRetry(ctx context.Context, msg wire.Message) error {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(p.retryReadWriteMessageInterval), retryReadWriteMessageAttempts)

policyContext := backoff.WithContext(policy, ctx)

7 changes: 7 additions & 0 deletions peer_options.go
Original file line number Diff line number Diff line change
@@ -47,3 +47,10 @@ func WithUserAgent(userAgentName string, userAgentVersion string) PeerOptions {
return nil
}
}

func WithRetryReadWriteMessageInterval(d time.Duration) PeerOptions {
return func(p *Peer) error {
p.retryReadWriteMessageInterval = d
return nil
}
}
1 change: 1 addition & 0 deletions peer_test.go
Original file line number Diff line number Diff line change
@@ -126,6 +126,7 @@ func TestReconnect(t *testing.T) {
WithDialer(func(network, address string) (net.Conn, error) {
return peerConn, nil
}),
WithRetryReadWriteMessageInterval(200*time.Millisecond),
)
require.NoError(t, err)