Skip to content

Commit

Permalink
Merge pull request #18 from libsv/fix/write-reconnect
Browse files Browse the repository at this point in the history
Fix/write reconnect
  • Loading branch information
boecklim authored Apr 4, 2024
2 parents 1b0b884 + b12441d commit 7e4bd20
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 47 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ deps:

.PHONY: test
test:
go test -race -count=1 ./...
go test -race -v -count=1 ./...

.PHONY: lint
lint:
Expand Down
106 changes: 60 additions & 46 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ const (
sentMsg = "Sent"
receivedMsg = "Recv"

retryReadWriteMessageInterval = 1 * time.Second
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second
retryReadWriteMessageIntervalDefault = 1 * time.Second
retryReadWriteMessageAttempts = 5
reconnectInterval = 10 * time.Second

pingInterval = 2 * time.Minute
connectionHealthTickerDuration = 3 * time.Minute
Expand All @@ -53,29 +53,30 @@ type Block struct {
}

type Peer struct {
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
userAgentName *string
userAgentVersion *string
address string
network wire.BitcoinNet
mu sync.RWMutex
readConn net.Conn
writeConn net.Conn
incomingConn net.Conn
dial func(network, address string) (net.Conn, error)
peerHandler PeerHandlerI
writeChan chan wire.Message
quit chan struct{}
pingPongAlive chan struct{}
logger *slog.Logger
sentVerAck atomic.Bool
receivedVerAck atomic.Bool
batchDelay time.Duration
invBatcher *batcher.Batcher[chainhash.Hash]
dataBatcher *batcher.Batcher[chainhash.Hash]
maximumMessageSize int64
isHealthy bool
cancelReadHandler context.CancelFunc
cancelWriteHandler context.CancelFunc
userAgentName *string
userAgentVersion *string
retryReadWriteMessageInterval time.Duration
}

// NewPeer returns a new bitcoin peer for the provided address and configuration.
Expand All @@ -90,15 +91,16 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw
)

p := &Peer{
network: network,
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
maximumMessageSize: defaultMaximumMessageSize,
batchDelay: defaultBatchDelayMilliseconds * time.Millisecond,
network: network,
address: address,
writeChan: writeChan,
pingPongAlive: make(chan struct{}, 1),
peerHandler: peerHandler,
logger: peerLogger,
dial: net.Dial,
maximumMessageSize: defaultMaximumMessageSize,
batchDelay: defaultBatchDelayMilliseconds * time.Millisecond,
retryReadWriteMessageInterval: retryReadWriteMessageIntervalDefault,
}

var err error
Expand All @@ -116,17 +118,8 @@ func NewPeer(logger *slog.Logger, address string, peerHandler PeerHandlerI, netw

func (p *Peer) initialize() {

ctx, cancel := context.WithCancel(context.Background())
p.cancelWriteHandler = cancel

go p.monitorConnectionHealth()
go p.pingHandler()
for i := 0; i < 10; i++ {
// start 10 workers that will write to the peer
// locking is done in the net.write in the wire/message handler
// this reduces the wait on the writer when processing writes (for example HandleTransactionSent)
p.startWriteChannelHandler(ctx)
}

go func() {
err := p.connect()
Expand Down Expand Up @@ -163,6 +156,14 @@ func (p *Peer) disconnect() {
p.mu.Lock()
defer p.mu.Unlock()

if p.cancelReadHandler != nil {
p.cancelReadHandler()
}

if p.cancelWriteHandler != nil {
p.cancelWriteHandler()
}

p._disconnect()
}

Expand Down Expand Up @@ -204,6 +205,15 @@ func (p *Peer) connect() error {
p.readConn = conn
}

ctx, cancel := context.WithCancel(context.Background())
p.cancelWriteHandler = cancel
for i := 0; i < 10; i++ {
// start 10 workers that will write to the peer
// locking is done in the net.write in the wire/message handler
// this reduces the wait on the writer when processing writes (for example HandleTransactionSent)
p.startWriteChannelHandler(ctx)
}

p.startReadHandler()

// write version message to our peer directly and not through the write channel,
Expand Down Expand Up @@ -265,7 +275,7 @@ func (p *Peer) String() string {
}

func (p *Peer) readRetry(ctx context.Context, r io.Reader, pver uint32, bsvnet wire.BitcoinNet) (wire.Message, error) {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(p.retryReadWriteMessageInterval), retryReadWriteMessageAttempts)

policyContext := backoff.WithContext(policy, ctx)

Expand Down Expand Up @@ -564,7 +574,7 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) {
}

func (p *Peer) writeRetry(ctx context.Context, msg wire.Message) error {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryReadWriteMessageInterval), retryReadWriteMessageAttempts)
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(p.retryReadWriteMessageInterval), retryReadWriteMessageAttempts)

policyContext := backoff.WithContext(policy, ctx)

Expand Down Expand Up @@ -611,6 +621,10 @@ func (p *Peer) startWriteChannelHandler(ctx context.Context) {
}

p.logger.Error("Failed retrying to write message", slog.String(errKey, err.Error()))

p.disconnect()

return
}

go func(message wire.Message) {
Expand Down
7 changes: 7 additions & 0 deletions peer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ func WithUserAgent(userAgentName string, userAgentVersion string) PeerOptions {
return nil
}
}

func WithRetryReadWriteMessageInterval(d time.Duration) PeerOptions {
return func(p *Peer) error {
p.retryReadWriteMessageInterval = d
return nil
}
}
107 changes: 107 additions & 0 deletions peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,113 @@ func TestWriteMsg(t *testing.T) {
})
}

func TestReconnect(t *testing.T) {
tt := []struct {
name string
cancelRead bool
}{
{
name: "writer connection breaks - reconnect",
cancelRead: true,
},
{
name: "reader connection breaks - reconnect",
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
peerConn, myConn := connutil.AsyncPipe()

peerHandler := NewMockPeerHandler()
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
p, err := NewPeer(
logger,
"MockPeerHandler:0000",
peerHandler,
wire.MainNet,
WithDialer(func(network, address string) (net.Conn, error) {
return peerConn, nil
}),
WithRetryReadWriteMessageInterval(200*time.Millisecond),
)
require.NoError(t, err)

doHandshake(t, p, myConn)

// wait for the peer to be connected
count := 0
for {
if p.Connected() {
break
}
count++
if count >= 3 {
t.Error("peer not connected")
}
time.Sleep(10 * time.Millisecond)
}

invMsg := wire.NewMsgInv()
hash, err := chainhash.NewHashFromStr(tx1)
require.NoError(t, err)
err = invMsg.AddInvVect(wire.NewInvVect(wire.InvTypeTx, hash))
require.NoError(t, err)

if tc.cancelRead {
// cancel reader and ensure writer will disconnect
p.cancelReadHandler()
} else {
// cancel writer and ensure reader will disconnect
p.cancelWriteHandler()
}

// break connection
err = myConn.Close()
require.NoError(t, err)

err = p.WriteMsg(invMsg)
require.NoError(t, err)

// wait until peer is disconnected
for {
if !p.Connected() {
t.Log("disconnected")
break
}
count++
if count >= 50 {
t.Fatal("peer connection not broken")
}
time.Sleep(200 * time.Millisecond)
}

// recreate connection
peerConn, myConn = connutil.AsyncPipe()
t.Log("new connection created")
time.Sleep(5 * time.Second)

t.Log("handshake")
doHandshake(t, p, myConn)
for {
if p.Connected() {
break
}
count++
if count >= 20 {
t.Fatal("peer connection not established")
}
time.Sleep(100 * time.Millisecond)
}

t.Log("shutdown")
p.Shutdown()

time.Sleep(5 * time.Second)
})
}
}

func Test_readHandler(t *testing.T) {
t.Run("read message - inv tx", func(t *testing.T) {
myConn, _, peerHandler := newTestPeer(t)
Expand Down

0 comments on commit 7e4bd20

Please sign in to comment.