Skip to content

Commit

Permalink
Merge pull request #6 from libsv/fix/peer-reconnect
Browse files Browse the repository at this point in the history
Fix/peer reconnect
  • Loading branch information
boecklim authored Dec 21, 2023
2 parents db55c37 + 866db61 commit 6f6e8ad
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 37 deletions.
32 changes: 26 additions & 6 deletions Peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/libsv/go-p2p/bsvutil"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
Expand All @@ -36,6 +37,9 @@ const (

sentMsg = "Sent"
receivedMsg = "Recv"

retryWriteMessageInterval = 10 * time.Second
retryWriteMessageAttempts = 5
)

type Block struct {
Expand Down Expand Up @@ -483,6 +487,25 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) {
}
}

func (p *Peer) writeRetry(msg wire.Message) error {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryWriteMessageInterval), retryWriteMessageAttempts)

operation := func() error {
return wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network)
}

notifyAndReconnect := func(err error, nextTry time.Duration) {
p.logger.Error("Failed to write message", slog.Duration("next try", nextTry), slog.String(errKey, err.Error()))

err = p.connect()
if err != nil {
p.logger.Error("Failed to reconnect", slog.Duration("next try", nextTry), slog.String(errKey, err.Error()))
}
}

return backoff.RetryNotify(operation, policy, notifyAndReconnect)
}

func (p *Peer) writeChannelHandler() {
for msg := range p.writeChan {
// wait for the write connection to be ready
Expand All @@ -496,12 +519,9 @@ func (p *Peer) writeChannelHandler() {
}
time.Sleep(100 * time.Millisecond)
}

if err := wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network); err != nil {
if errors.Is(err, io.EOF) {
panic("WRITE EOF")
}
p.logger.Error("Failed to write message", slog.String(errKey, err.Error()))
err := p.writeRetry(msg)
if err != nil {
p.logger.Error("Failed retrying to write message", slog.String(errKey, err.Error()))
}

go func(message wire.Message) {
Expand Down
26 changes: 0 additions & 26 deletions PeerManager_Mock.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,9 @@
package p2p

import (
"context"
"log/slog"

"github.com/libsv/go-p2p/chaincfg/chainhash"
)

type TestLogger struct {
}

func (h *TestLogger) Enabled(_ context.Context, _ slog.Level) bool {
return false
}

func (h *TestLogger) Handle(_ context.Context, _ slog.Record) error {
return nil
}

func (h *TestLogger) WithAttrs(_ []slog.Attr) slog.Handler {
return &TestLogger{}
}

func (h *TestLogger) WithGroup(_ string) slog.Handler {
return &TestLogger{}
}

func (h *TestLogger) Handler() slog.Handler {
return &TestLogger{}
}

type PeerManagerMock struct {
Peers []PeerI
AnnouncedTransactions []*chainhash.Hash
Expand Down
7 changes: 4 additions & 3 deletions PeerManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"fmt"
"log/slog"
"os"
"testing"
"time"

Expand All @@ -19,7 +20,7 @@ var (

func TestNewPeerManager(t *testing.T) {

logger := slog.New(&TestLogger{})
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))

t.Run("nil peers no error", func(t *testing.T) {
pm := NewPeerManager(logger, wire.TestNet)
Expand Down Expand Up @@ -64,7 +65,7 @@ func TestNewPeerManager(t *testing.T) {

func TestAnnounceNewTransaction(t *testing.T) {
t.Run("announce tx", func(t *testing.T) {
logger := slog.New(&TestLogger{})
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
pm := NewPeerManager(logger, wire.TestNet, WithBatchDuration(1*time.Millisecond))
require.NotNil(t, pm)

Expand All @@ -85,7 +86,7 @@ func TestAnnounceNewTransaction(t *testing.T) {
})

t.Run("announce tx - multiple peers", func(t *testing.T) {
logger := slog.New(&TestLogger{})
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
pm := NewPeerManager(logger, wire.TestNet, WithBatchDuration(1*time.Millisecond))
require.NotNil(t, pm)

Expand Down
5 changes: 3 additions & 2 deletions Peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"log/slog"
"net"
"os"
"testing"
"time"

Expand Down Expand Up @@ -251,7 +252,7 @@ func newTestPeer(t *testing.T) (net.Conn, *Peer, *MockPeerHandler) {
peerConn, myConn := connutil.AsyncPipe()

peerHandler := NewMockPeerHandler()
logger := slog.New(&TestLogger{})
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
p, err := NewPeer(
logger,
"MockPeerHandler:0000",
Expand Down Expand Up @@ -284,7 +285,7 @@ func newTestPeer(t *testing.T) (net.Conn, *Peer, *MockPeerHandler) {
func newIncomingTestPeer(t *testing.T) (net.Conn, *Peer, *MockPeerHandler) {
peerConn, myConn := connutil.AsyncPipe()
peerHandler := NewMockPeerHandler()
logger := slog.New(&TestLogger{})
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
p, err := NewPeer(
logger,
"MockPeerHandler:0000",
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ require (
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/lmittmann/tint v1.0.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3 h1:LRxW8pdmWmyhoNh+TxUjxsAinGtCsVGjsl3xg6zoRSs=
github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3/go.mod h1:6jR2SzckGv8hIIS9zWJ160mzGVVOYp4AXZMDtacL6LE=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/lmittmann/tint v1.0.3 h1:W5PHeA2D8bBJVvabNfQD/XW9HPLZK1XoPZH0cq8NouQ=
github.com/lmittmann/tint v1.0.3/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/ordishs/go-utils v1.0.24 h1:QIVYaaN4LE5SnCRbLii7OOrkC2Qqvg4FVKe9zzA3Yd8=
github.com/ordishs/go-utils v1.0.24/go.mod h1:k9G7Bbv2GwoOn9fwZx70yM5jwwIymkv+90FUKLudtyc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down

0 comments on commit 6f6e8ad

Please sign in to comment.