Skip to content

Commit

Permalink
BPAAS-1159: Retry writing message with reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Dec 20, 2023
1 parent 1dac581 commit 866db61
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
32 changes: 26 additions & 6 deletions Peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/libsv/go-p2p/bsvutil"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
Expand All @@ -36,6 +37,9 @@ const (

sentMsg = "Sent"
receivedMsg = "Recv"

retryWriteMessageInterval = 10 * time.Second
retryWriteMessageAttempts = 5
)

type Block struct {
Expand Down Expand Up @@ -483,6 +487,25 @@ func (p *Peer) sendDataBatch(batch []*chainhash.Hash) {
}
}

func (p *Peer) writeRetry(msg wire.Message) error {
policy := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryWriteMessageInterval), retryWriteMessageAttempts)

operation := func() error {
return wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network)
}

notifyAndReconnect := func(err error, nextTry time.Duration) {
p.logger.Error("Failed to write message", slog.Duration("next try", nextTry), slog.String(errKey, err.Error()))

err = p.connect()
if err != nil {
p.logger.Error("Failed to reconnect", slog.Duration("next try", nextTry), slog.String(errKey, err.Error()))
}
}

return backoff.RetryNotify(operation, policy, notifyAndReconnect)
}

func (p *Peer) writeChannelHandler() {
for msg := range p.writeChan {
// wait for the write connection to be ready
Expand All @@ -496,12 +519,9 @@ func (p *Peer) writeChannelHandler() {
}
time.Sleep(100 * time.Millisecond)
}

if err := wire.WriteMessage(p.writeConn, msg, wire.ProtocolVersion, p.network); err != nil {
if errors.Is(err, io.EOF) {
panic("WRITE EOF")
}
p.logger.Error("Failed to write message", slog.String(errKey, err.Error()))
err := p.writeRetry(msg)
if err != nil {
p.logger.Error("Failed retrying to write message", slog.String(errKey, err.Error()))
}

go func(message wire.Message) {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ require (
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/lmittmann/tint v1.0.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3 h1:LRxW8pdmWmyhoNh+TxUjxsAinGtCsVGjsl3xg6zoRSs=
github.com/cbeuw/connutil v0.0.0-20200411215123-966bfaa51ee3/go.mod h1:6jR2SzckGv8hIIS9zWJ160mzGVVOYp4AXZMDtacL6LE=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/lmittmann/tint v1.0.3 h1:W5PHeA2D8bBJVvabNfQD/XW9HPLZK1XoPZH0cq8NouQ=
github.com/lmittmann/tint v1.0.3/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/ordishs/go-utils v1.0.24 h1:QIVYaaN4LE5SnCRbLii7OOrkC2Qqvg4FVKe9zzA3Yd8=
github.com/ordishs/go-utils v1.0.24/go.mod h1:k9G7Bbv2GwoOn9fwZx70yM5jwwIymkv+90FUKLudtyc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down

0 comments on commit 866db61

Please sign in to comment.