Skip to content

Commit

Permalink
perf: use msgio pooled buffers for received msgs
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Sep 22, 2022
1 parent 01ab84a commit ca3e349
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"io"
"time"

"github.com/gogo/protobuf/proto"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"

pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-msgio"
"github.com/libp2p/go-msgio/protoio"

"github.com/gogo/protobuf/proto"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

// get the initial RPC containing all of our subscriptions to send to new peers
Expand Down Expand Up @@ -60,11 +60,11 @@ func (p *PubSub) handleNewStream(s network.Stream) {
p.inboundStreamsMx.Unlock()
}()

r := protoio.NewDelimitedReader(s, p.maxMessageSize)
r := msgio.NewVarintReaderSize(s, p.maxMessageSize)
for {
rpc := new(RPC)
err := r.ReadMsg(&rpc.RPC)
msgbytes, err := r.ReadMsg()
if err != nil {
r.ReleaseMsg(msgbytes)
if err != io.EOF {
s.Reset()
log.Debugf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
Expand All @@ -77,6 +77,15 @@ func (p *PubSub) handleNewStream(s network.Stream) {
return
}

rpc := new(RPC)
err = rpc.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
s.Reset()
log.Warnf("bogus rpc from %s: %s", s.Conn().RemotePeer(), err)
return
}

rpc.from = peer
select {
case p.incoming <- rpc:
Expand Down

0 comments on commit ca3e349

Please sign in to comment.