From 866db613e29b22714f2d79125525d579749cd266 Mon Sep 17 00:00:00 2001 From: Michael Boeckli <michael.boeckli@taal.com> Date: Wed, 20 Dec 2023 15:38:38 +0100 Subject: [PATCH] BPAAS-1159: Retry writing message with reconnect --- Peer.go | 32 ++++++++++++++++++++++++++------ go.mod | 2 ++ go.sum | 4 ++++ 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/Peer.go b/Peer.go index 827e4d4..e084486 100644 --- a/Peer.go +++ b/Peer.go @@ -14,6 +14,7 @@ import ( "sync/atomic" "time" + "github.com/cenkalti/backoff/v4" "github.com/libsv/go-p2p/bsvutil" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" @@ -36,6 +37,9 @@ const ( sentMsg = "Sent" receivedMsg = "Recv" + + retryWriteMessageInterval = 10 * time.Second + retryWriteMessageAttempts = 5 ) type Block struct { @@ -483,6 +487,25 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) { } } +func (p *Peer) writeRetry(msg wire.Message) error { + policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryWriteMessageInterval), retryWriteMessageAttempts) + + operation := func() error { + return wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network) + } + + notifyAndReconnect := func(err error, nextTry time.Duration) { + p.logger.Error("Failed to write message", slog.Duration("next try", nextTry), slog.String(errKey, err.Error())) + + err = p.connect() + if err != nil { + p.logger.Error("Failed to reconnect", slog.Duration("next try", nextTry), slog.String(errKey, err.Error())) + } + } + + return backoff.RetryNotify(operation, policy, notifyAndReconnect) +} + func (p *Peer) writeChannelHandler() { for msg := range p.writeChan { // wait for the write connection to be ready @@ -496,12 +519,9 @@ func (p *Peer) writeChannelHandler() { } time.Sleep(100 * time.Millisecond) } - - if err := wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network); err != nil { - if errors.Is(err, io.EOF) { - panic("WRITE EOF") - } - p.logger.Error("Failed to write message", slog.String(errKey, err.Error())) + err := p.writeRetry(msg) + if err != nil { + p.logger.Error("Failed retrying to write message", slog.String(errKey, err.Error())) } go func(message wire.Message) { diff --git a/go.mod b/go.mod index 7d55d5d..ea330ed 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,8 @@ require ( ) require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/lmittmann/tint v1.0.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 9d01b36..927f7c6 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,12 @@ github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3 h1:LRxW8pdmWmyhoNh+TxUjxsAinGtCsVGjsl3xg6zoRSs= github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3/go.mod h1:6jR2SzckGv8hIIS9zWJ160mzGVVOYp4AXZMDtacL6LE= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/lmittmann/tint v1.0.3 h1:W5PHeA2D8bBJVvabNfQD/XW9HPLZK1XoPZH0cq8NouQ= +github.com/lmittmann/tint v1.0.3/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/ordishs/go-utils v1.0.24 h1:QIVYaaN4LE5SnCRbLii7OOrkC2Qqvg4FVKe9zzA3Yd8= github.com/ordishs/go-utils v1.0.24/go.mod h1:k9G7Bbv2GwoOn9fwZx70yM5jwwIymkv+90FUKLudtyc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=