diff --git a/Peer.go b/Peer.go index 827e4d4..e084486 100644 --- a/Peer.go +++ b/Peer.go @@ -14,6 +14,7 @@ import ( "sync/atomic" "time" + "github.com/cenkalti/backoff/v4" "github.com/libsv/go-p2p/bsvutil" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" @@ -36,6 +37,9 @@ const ( sentMsg = "Sent" receivedMsg = "Recv" + + retryWriteMessageInterval = 10 * time.Second + retryWriteMessageAttempts = 5 ) type Block struct { @@ -483,6 +487,25 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) { } } +func (p *Peer) writeRetry(msg wire.Message) error { + policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryWriteMessageInterval), retryWriteMessageAttempts) + + operation := func() error { + return wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network) + } + + notifyAndReconnect := func(err error, nextTry time.Duration) { + p.logger.Error("Failed to write message", slog.Duration("next try", nextTry), slog.String(errKey, err.Error())) + + err = p.connect() + if err != nil { + p.logger.Error("Failed to reconnect", slog.Duration("next try", nextTry), slog.String(errKey, err.Error())) + } + } + + return backoff.RetryNotify(operation, policy, notifyAndReconnect) +} + func (p *Peer) writeChannelHandler() { for msg := range p.writeChan { // wait for the write connection to be ready @@ -496,12 +519,9 @@ func (p *Peer) writeChannelHandler() { } time.Sleep(100 * time.Millisecond) } - - if err := wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network); err != nil { - if errors.Is(err, io.EOF) { - panic("WRITE EOF") - } - p.logger.Error("Failed to write message", slog.String(errKey, err.Error())) + err := p.writeRetry(msg) + if err != nil { + p.logger.Error("Failed retrying to write message", slog.String(errKey, err.Error())) } go func(message wire.Message) { diff --git a/PeerManager_Mock.go b/PeerManager_Mock.go index 18bc16b..2c8196c 100644 --- a/PeerManager_Mock.go +++ b/PeerManager_Mock.go @@ -1,35 +1,9 @@ package p2p import ( - "context" - "log/slog" - "github.com/libsv/go-p2p/chaincfg/chainhash" ) -type TestLogger struct { -} - -func (h *TestLogger) Enabled(_ context.Context, _ slog.Level) bool { - return false -} - -func (h *TestLogger) Handle(_ context.Context, _ slog.Record) error { - return nil -} - -func (h *TestLogger) WithAttrs(_ []slog.Attr) slog.Handler { - return &TestLogger{} -} - -func (h *TestLogger) WithGroup(_ string) slog.Handler { - return &TestLogger{} -} - -func (h *TestLogger) Handler() slog.Handler { - return &TestLogger{} -} - type PeerManagerMock struct { Peers []PeerI AnnouncedTransactions []*chainhash.Hash diff --git a/PeerManager_test.go b/PeerManager_test.go index d935a38..4e599a5 100644 --- a/PeerManager_test.go +++ b/PeerManager_test.go @@ -3,6 +3,7 @@ package p2p import ( "fmt" "log/slog" + "os" "testing" "time" @@ -19,7 +20,7 @@ var ( func TestNewPeerManager(t *testing.T) { - logger := slog.New(&TestLogger{}) + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) t.Run("nil peers no error", func(t *testing.T) { pm := NewPeerManager(logger, wire.TestNet) @@ -64,7 +65,7 @@ func TestNewPeerManager(t *testing.T) { func TestAnnounceNewTransaction(t *testing.T) { t.Run("announce tx", func(t *testing.T) { - logger := slog.New(&TestLogger{}) + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) pm := NewPeerManager(logger, wire.TestNet, WithBatchDuration(1*time.Millisecond)) require.NotNil(t, pm) @@ -85,7 +86,7 @@ func TestAnnounceNewTransaction(t *testing.T) { }) t.Run("announce tx - multiple peers", func(t *testing.T) { - logger := slog.New(&TestLogger{}) + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) pm := NewPeerManager(logger, wire.TestNet, WithBatchDuration(1*time.Millisecond)) require.NotNil(t, pm) diff --git a/Peer_test.go b/Peer_test.go index 51e0d54..bd69216 100644 --- a/Peer_test.go +++ b/Peer_test.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "log/slog" "net" + "os" "testing" "time" @@ -251,7 +252,7 @@ func newTestPeer(t *testing.T) (net.Conn, *Peer, *MockPeerHandler) { peerConn, myConn := connutil.AsyncPipe() peerHandler := NewMockPeerHandler() - logger := slog.New(&TestLogger{}) + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) p, err := NewPeer( logger, "MockPeerHandler:0000", @@ -284,7 +285,7 @@ func newTestPeer(t *testing.T) (net.Conn, *Peer, *MockPeerHandler) { func newIncomingTestPeer(t *testing.T) (net.Conn, *Peer, *MockPeerHandler) { peerConn, myConn := connutil.AsyncPipe() peerHandler := NewMockPeerHandler() - logger := slog.New(&TestLogger{}) + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) p, err := NewPeer( logger, "MockPeerHandler:0000", diff --git a/go.mod b/go.mod index 7d55d5d..ea330ed 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,8 @@ require ( ) require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/lmittmann/tint v1.0.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 9d01b36..927f7c6 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,12 @@ github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3 h1:LRxW8pdmWmyhoNh+TxUjxsAinGtCsVGjsl3xg6zoRSs= github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3/go.mod h1:6jR2SzckGv8hIIS9zWJ160mzGVVOYp4AXZMDtacL6LE= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/lmittmann/tint v1.0.3 h1:W5PHeA2D8bBJVvabNfQD/XW9HPLZK1XoPZH0cq8NouQ= +github.com/lmittmann/tint v1.0.3/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/ordishs/go-utils v1.0.24 h1:QIVYaaN4LE5SnCRbLii7OOrkC2Qqvg4FVKe9zzA3Yd8= github.com/ordishs/go-utils v1.0.24/go.mod h1:k9G7Bbv2GwoOn9fwZx70yM5jwwIymkv+90FUKLudtyc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=