Skip to content

Commit

Permalink
perf: add get bytes and skip api
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie committed Jul 3, 2024
1 parent 2c16b97 commit 63b903c
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 7 deletions.
6 changes: 1 addition & 5 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,17 +478,13 @@ func (c *connection) flush() error {
return nil
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
var bs = c.outputBuffer.GetBytesAndSkip(c.outputBarrier.bs)
var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
if err != nil && err != syscall.EAGAIN {
return Exception(err, "when flush")
}
if n > 0 {
err = c.outputBuffer.Skip(n)
c.outputBuffer.Release()
if err != nil {
return Exception(err, "when flush")
}
}
// return if write all buffer.
if c.outputBuffer.IsEmpty() {
Expand Down
3 changes: 1 addition & 2 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,13 @@ func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
c.rw2r()
return rs, c.supportZeroCopy
}
rs = c.outputBuffer.GetBytes(vs)
rs = c.outputBuffer.GetBytesAndSkip(vs)
return rs, c.supportZeroCopy
}

// outputAck implements FDOperator.
func (c *connection) outputAck(n int) (err error) {
if n > 0 {
c.outputBuffer.Skip(n)
c.outputBuffer.Release()
}
if c.outputBuffer.IsEmpty() {
Expand Down
33 changes: 33 additions & 0 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,39 @@ func (b *UnsafeLinkBuffer) GetBytes(p [][]byte) (vs [][]byte) {
return p[:i]
}

// GetBytesAndSkip will read and fill the slice p as much as possible.
// If p is not passed, return all readable bytes.
// It will also skip n bytes which has read to the p.
func (b *UnsafeLinkBuffer) GetBytesAndSkip(p [][]byte) (vs [][]byte) {
node, flush := b.read, b.flush
if len(p) == 0 {
n := 0
for ; node != flush; node = node.next {
n++
}
node = b.read
p = make([][]byte, n)
}
var i, n int
for i = 0; node != flush && i < len(p); node = node.next {
if node.Len() > 0 {
p[i] = node.buf[node.off:]
i++
n += node.Len()
}
}
// read the flush node
if i < len(p) && node.Len() > 0 {
p[i] = node.buf[node.off:]
i++
n += node.Len()
node.off = len(node.buf)
}
b.read = node
b.recalLen(-n) // re-cal length
return p[:i]
}

// book will grow and malloc buffer to hold data.
//
// bookSize: The size of data that can be read at once.
Expand Down
55 changes: 55 additions & 0 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"math"
"runtime"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -111,6 +112,60 @@ func TestLinkBufferGetBytes(t *testing.T) {

}

func TestLinkBufferGetBytesAndSkip(t *testing.T) {
buf := NewLinkBuffer()
t.Run("without bs", func(t *testing.T) {
var (
num = 10
expectedLen = 0
)
for i := num - 1; i >= 0; i-- {
b := int(math.Pow10(i))
expectedLen += b
n, err := buf.WriteBinary(make([]byte, b))
MustNil(t, err)
Equal(t, n, b)
}
buf.Flush()
Equal(t, int(buf.length), expectedLen)

bs := buf.GetBytesAndSkip(nil)
actualLen := 0
for i := 0; i < len(bs); i++ {
actualLen += len(bs[i])
}
Equal(t, actualLen, expectedLen)
Equal(t, buf.Len(), 0)
Equal(t, buf.read, buf.flush)
Equal(t, buf.read.off, len(buf.read.buf))
})
t.Run("with bs", func(t *testing.T) {
var (
num = 10
expectedLen = 0
)
for i := num - 1; i >= 0; i-- {
b := int(math.Pow10(i))
expectedLen += b
n, err := buf.WriteBinary(make([]byte, b))
MustNil(t, err)
Equal(t, n, b)
}
buf.Flush()
Equal(t, int(buf.length), expectedLen)

bs := buf.GetBytesAndSkip(make([][]byte, 5))
actualLen := 0
for i := 0; i < len(bs); i++ {
actualLen += len(bs[i])
}
Equal(t, actualLen, 1111100000)
Equal(t, buf.Len(), 11111)
Assert(t, buf.read != buf.flush)
Assert(t, buf.read.off+10000 == len(buf.read.buf))
})
}

// TestLinkBufferWithZero test more case with n is invalid.
func TestLinkBufferWithInvalid(t *testing.T) {
// clean & new
Expand Down

0 comments on commit 63b903c

Please sign in to comment.